commonware_runtime/utils/buffer/
read.rs

1use crate::{Blob, Error};
2use commonware_utils::StableBuf;
3
4/// A reader that buffers content from a [Blob] to optimize the performance
5/// of a full scan of contents.
6///
7/// # Example
8///
9/// ```
10/// use commonware_runtime::{Runner, buffer::Read, Blob, Error, Storage, deterministic};
11///
12/// let executor = deterministic::Runner::default();
13/// executor.start(|context| async move {
14///     // Open a blob and add some data (e.g., a journal file)
15///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
16///     let data = b"Hello, world! This is a test.".to_vec();
17///     let size = data.len() as u64;
18///     blob.write_at(data, 0).await.expect("unable to write data");
19///
20///     // Create a buffer
21///     let buffer = 64 * 1024;
22///     let mut reader = Read::new(blob, size, buffer);
23///
24///     // Read data sequentially
25///     let mut header = [0u8; 16];
26///     reader.read_exact(&mut header, 16).await.expect("unable to read data");
27///     println!("Read header: {:?}", header);
28///
29///     // Position is still at 16 (after header)
30///     assert_eq!(reader.position(), 16);
31/// });
32/// ```
33pub struct Read<B: Blob> {
34    /// The underlying blob to read from.
35    blob: B,
36    /// The buffer storing the data read from the blob.
37    buffer: StableBuf,
38    /// The current position in the blob from where the buffer was filled.
39    blob_position: u64,
40    /// The size of the blob.
41    blob_size: u64,
42    /// The current position within the buffer for reading.
43    buffer_position: usize,
44    /// The valid data length in the buffer.
45    buffer_valid_len: usize,
46    /// The maximum size of the buffer.
47    buffer_size: usize,
48}
49
50impl<B: Blob> Read<B> {
51    /// Creates a new `Read` that reads from the given blob with the specified buffer size.
52    ///
53    /// # Panics
54    ///
55    /// Panics if `buffer_size` is zero.
56    pub fn new(blob: B, blob_size: u64, buffer_size: usize) -> Self {
57        assert!(buffer_size > 0, "buffer size must be greater than zero");
58        Self {
59            blob,
60            buffer: vec![0; buffer_size].into(),
61            blob_position: 0,
62            blob_size,
63            buffer_position: 0,
64            buffer_valid_len: 0,
65            buffer_size,
66        }
67    }
68
69    /// Returns how many valid bytes are remaining in the buffer.
70    pub fn buffer_remaining(&self) -> usize {
71        self.buffer_valid_len - self.buffer_position
72    }
73
74    /// Returns how many bytes remain in the blob from the current position.
75    pub fn blob_remaining(&self) -> u64 {
76        self.blob_size
77            .saturating_sub(self.blob_position + self.buffer_position as u64)
78    }
79
80    /// Refills the buffer from the blob starting at the current blob position.
81    /// Returns the number of bytes read or an error if the read failed.
82    async fn refill(&mut self) -> Result<usize, Error> {
83        // Update blob position to account for consumed bytes
84        self.blob_position += self.buffer_position as u64;
85        self.buffer_position = 0;
86        self.buffer_valid_len = 0;
87
88        // Calculate how many bytes remain in the blob
89        let blob_remaining = self.blob_size.saturating_sub(self.blob_position);
90        if blob_remaining == 0 {
91            return Err(Error::BlobInsufficientLength);
92        }
93
94        // Calculate how much to read (minimum of buffer size and remaining bytes)
95        let bytes_to_read = std::cmp::min(self.buffer_size as u64, blob_remaining) as usize;
96
97        // Read the data - we only need a single read operation since we know exactly how much data is available
98        // Note that the last refill may cause `self.buffer` to have length < `self.buffer_size`
99        // because `bytes_to_read` < `self.buffer_size`.
100        let mut buffer = std::mem::take(&mut self.buffer);
101        buffer.truncate(bytes_to_read);
102        self.buffer = self.blob.read_at(buffer, self.blob_position).await?;
103        self.buffer_valid_len = bytes_to_read;
104        Ok(bytes_to_read)
105    }
106
107    /// Reads exactly `size` bytes into the provided buffer.
108    /// Returns an error if not enough bytes are available.
109    pub async fn read_exact(&mut self, buf: &mut [u8], size: usize) -> Result<(), Error> {
110        // Quick check if we have enough bytes total before attempting reads
111        if (self.buffer_remaining() + self.blob_remaining() as usize) < size {
112            return Err(Error::BlobInsufficientLength);
113        }
114
115        // Read until we have enough bytes
116        let mut bytes_read = 0;
117        while bytes_read < size {
118            // Check if we need to refill
119            if self.buffer_position >= self.buffer_valid_len {
120                self.refill().await?;
121            }
122
123            // Calculate how many bytes we can copy from the buffer
124            let bytes_to_copy = std::cmp::min(
125                size - bytes_read,
126                self.buffer_valid_len - self.buffer_position,
127            );
128
129            // Copy bytes from buffer to output
130            buf[bytes_read..(bytes_read + bytes_to_copy)].copy_from_slice(
131                &self.buffer.as_ref()[self.buffer_position..(self.buffer_position + bytes_to_copy)],
132            );
133
134            self.buffer_position += bytes_to_copy;
135            bytes_read += bytes_to_copy;
136        }
137
138        Ok(())
139    }
140
141    /// Returns the current absolute position in the blob.
142    pub fn position(&self) -> u64 {
143        self.blob_position + self.buffer_position as u64
144    }
145
146    /// Repositions the buffer to read from the specified position in the blob.
147    pub fn seek_to(&mut self, position: u64) -> Result<(), Error> {
148        // Check if the seek position is valid
149        if position > self.blob_size {
150            return Err(Error::BlobInsufficientLength);
151        }
152
153        // Check if the position is within the current buffer
154        let buffer_start = self.blob_position;
155        let buffer_end = self.blob_position + self.buffer_valid_len as u64;
156
157        if position >= buffer_start && position < buffer_end {
158            // Position is within the current buffer, adjust buffer_position
159            self.buffer_position = (position - self.blob_position) as usize;
160        } else {
161            // Position is outside the current buffer, reset buffer state
162            self.blob_position = position;
163            self.buffer_position = 0;
164            self.buffer_valid_len = 0;
165        }
166
167        Ok(())
168    }
169
170    /// Truncates the blob to the specified len and syncs the blob.
171    ///
172    /// This may be useful if reading some blob after unclean shutdown.
173    pub async fn truncate(self, len: u64) -> Result<(), Error> {
174        self.blob.truncate(len).await?;
175        self.blob.sync().await
176    }
177}