commonware_runtime/utils/buffer/
read.rs

1use crate::{Blob, Error};
2
3/// A reader that buffers content from a [Blob] to optimize the performance
4/// of a full scan of contents.
5///
6/// # Example
7///
8/// ```
9/// use commonware_runtime::{Runner, buffer::Read, Blob, Error, Storage, deterministic};
10///
11/// let executor = deterministic::Runner::default();
12/// executor.start(|context| async move {
13///     // Open a blob and add some data (e.g., a journal file)
14///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
15///     let data = b"Hello, world! This is a test.".to_vec();
16///     let size = data.len() as u64;
17///     blob.write_at(data, 0).await.expect("unable to write data");
18///
19///     // Create a buffer
20///     let buffer = 64 * 1024;
21///     let mut reader = Read::new(blob, size, buffer);
22///
23///     // Read data sequentially
24///     let mut header = [0u8; 16];
25///     reader.read_exact(&mut header, 16).await.expect("unable to read data");
26///     println!("Read header: {:?}", header);
27///
28///     // Position is still at 16 (after header)
29///     assert_eq!(reader.position(), 16);
30/// });
31/// ```
32pub struct Read<B: Blob> {
33    /// The underlying blob to read from.
34    blob: B,
35    /// The buffer storing the data read from the blob.
36    buffer: Vec<u8>,
37    /// The current position in the blob from where the buffer was filled.
38    blob_position: u64,
39    /// The size of the blob.
40    blob_size: u64,
41    /// The current position within the buffer for reading.
42    buffer_position: usize,
43    /// The valid data length in the buffer.
44    buffer_valid_len: usize,
45    /// The maximum size of the buffer.
46    buffer_size: usize,
47}
48
49impl<B: Blob> Read<B> {
50    /// Creates a new `Read` that reads from the given blob with the specified buffer size.
51    ///
52    /// # Panics
53    ///
54    /// Panics if `buffer_size` is zero.
55    pub fn new(blob: B, blob_size: u64, buffer_size: usize) -> Self {
56        assert!(buffer_size > 0, "buffer size must be greater than zero");
57        Self {
58            blob,
59            buffer: vec![0; buffer_size],
60            blob_position: 0,
61            blob_size,
62            buffer_position: 0,
63            buffer_valid_len: 0,
64            buffer_size,
65        }
66    }
67
68    /// Returns how many valid bytes are remaining in the buffer.
69    pub fn buffer_remaining(&self) -> usize {
70        self.buffer_valid_len - self.buffer_position
71    }
72
73    /// Returns how many bytes remain in the blob from the current position.
74    pub fn blob_remaining(&self) -> u64 {
75        self.blob_size
76            .saturating_sub(self.blob_position + self.buffer_position as u64)
77    }
78
79    /// Refills the buffer from the blob starting at the current blob position.
80    /// Returns the number of bytes read or an error if the read failed.
81    pub async fn refill(&mut self) -> Result<usize, Error> {
82        // Update blob position to account for consumed bytes
83        self.blob_position += self.buffer_position as u64;
84        self.buffer_position = 0;
85        self.buffer_valid_len = 0;
86
87        // Calculate how many bytes remain in the blob
88        let blob_remaining = self.blob_size.saturating_sub(self.blob_position);
89        if blob_remaining == 0 {
90            return Err(Error::BlobInsufficientLength);
91        }
92
93        // Calculate how much to read (minimum of buffer size and remaining bytes)
94        let bytes_to_read = std::cmp::min(self.buffer_size as u64, blob_remaining) as usize;
95
96        // Read the data - we only need a single read operation since we know exactly how much data is available
97        // Note that the last refill may cause `self.buffer` to have length < `self.buffer_size`
98        // because `bytes_to_read` < `self.buffer_size`.
99        let mut buffer = std::mem::take(&mut self.buffer);
100        buffer.truncate(bytes_to_read);
101        self.buffer = self.blob.read_at(buffer, self.blob_position).await?;
102        self.buffer_valid_len = bytes_to_read;
103        Ok(bytes_to_read)
104    }
105
106    /// Reads exactly `size` bytes into the provided buffer.
107    /// Returns an error if not enough bytes are available.
108    pub async fn read_exact(&mut self, buf: &mut [u8], size: usize) -> Result<(), Error> {
109        // Quick check if we have enough bytes total before attempting reads
110        if (self.buffer_remaining() + self.blob_remaining() as usize) < size {
111            return Err(Error::BlobInsufficientLength);
112        }
113
114        // Read until we have enough bytes
115        let mut bytes_read = 0;
116        while bytes_read < size {
117            // Check if we need to refill
118            if self.buffer_position >= self.buffer_valid_len {
119                self.refill().await?;
120            }
121
122            // Calculate how many bytes we can copy from the buffer
123            let bytes_to_copy = std::cmp::min(
124                size - bytes_read,
125                self.buffer_valid_len - self.buffer_position,
126            );
127
128            // Copy bytes from buffer to output
129            buf[bytes_read..(bytes_read + bytes_to_copy)].copy_from_slice(
130                &self.buffer[self.buffer_position..(self.buffer_position + bytes_to_copy)],
131            );
132
133            self.buffer_position += bytes_to_copy;
134            bytes_read += bytes_to_copy;
135        }
136
137        Ok(())
138    }
139
140    /// Returns the current absolute position in the blob.
141    pub fn position(&self) -> u64 {
142        self.blob_position + self.buffer_position as u64
143    }
144
145    /// Repositions the buffer to read from the specified position in the blob.
146    pub fn seek_to(&mut self, position: u64) -> Result<(), Error> {
147        // Check if the seek position is valid
148        if position > self.blob_size {
149            return Err(Error::BlobInsufficientLength);
150        }
151
152        // Reset buffer state
153        self.blob_position = position;
154        self.buffer_position = 0;
155        self.buffer_valid_len = 0;
156
157        Ok(())
158    }
159
160    /// Truncates the blob to the specified len.
161    ///
162    /// This may be useful if reading some blob after unclean shutdown.
163    pub async fn truncate(self, len: u64) -> Result<(), Error> {
164        self.blob.truncate(len).await?;
165        self.blob.sync().await
166    }
167}