Skip to main content

commonware_runtime/utils/buffer/
read.rs

1use crate::{Blob, Error, IoBufMut};
2use std::num::NonZeroUsize;
3
4/// A reader that buffers content from a [Blob] to optimize the performance
5/// of a full scan of contents.
6///
7/// # Example
8///
9/// ```
10/// use commonware_utils::NZUsize;
11/// use commonware_runtime::{Runner, buffer::Read, Blob, Error, Storage, deterministic};
12///
13/// let executor = deterministic::Runner::default();
14/// executor.start(|context| async move {
15///     // Open a blob and add some data (e.g., a journal file)
16///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
17///     let data = b"Hello, world! This is a test.".to_vec();
18///     let size = data.len() as u64;
19///     blob.write_at(0, data).await.expect("unable to write data");
20///
21///     // Create a buffer
22///     let buffer = 64 * 1024;
23///     let mut reader = Read::new(blob, size, NZUsize!(buffer));
24///
25///     // Read data sequentially
26///     let mut header = [0u8; 16];
27///     reader.read_exact(&mut header, 16).await.expect("unable to read data");
28///     println!("Read header: {:?}", header);
29///
30///     // Position is still at 16 (after header)
31///     assert_eq!(reader.position(), 16);
32/// });
33/// ```
34pub struct Read<B: Blob> {
35    /// The underlying blob to read from.
36    blob: B,
37    /// The buffer storing the data read from the blob.
38    buffer: IoBufMut,
39    /// The current position in the blob from where the buffer was filled.
40    blob_position: u64,
41    /// The size of the blob.
42    blob_size: u64,
43    /// The current position within the buffer for reading.
44    buffer_position: usize,
45    /// The valid data length in the buffer.
46    buffer_valid_len: usize,
47    /// The maximum size of the buffer.
48    buffer_size: usize,
49}
50
51impl<B: Blob> Read<B> {
52    /// Creates a new `Read` that reads from the given blob with the specified buffer size.
53    ///
54    /// # Panics
55    ///
56    /// Panics if `buffer_size` is zero.
57    pub fn new(blob: B, blob_size: u64, buffer_size: NonZeroUsize) -> Self {
58        Self {
59            blob,
60            buffer: IoBufMut::with_capacity(buffer_size.get()),
61            blob_position: 0,
62            blob_size,
63            buffer_position: 0,
64            buffer_valid_len: 0,
65            buffer_size: buffer_size.get(),
66        }
67    }
68
69    /// Returns how many valid bytes are remaining in the buffer.
70    pub const fn buffer_remaining(&self) -> usize {
71        self.buffer_valid_len - self.buffer_position
72    }
73
74    /// Returns how many bytes remain in the blob from the current position.
75    pub const fn blob_remaining(&self) -> u64 {
76        self.blob_size
77            .saturating_sub(self.blob_position + self.buffer_position as u64)
78    }
79
80    /// Returns the number of bytes in the blob, as provided at construction.
81    pub const fn blob_size(&self) -> u64 {
82        self.blob_size
83    }
84
85    /// Refills the buffer from the blob starting at the current blob position.
86    /// Returns the number of bytes read or an error if the read failed.
87    async fn refill(&mut self) -> Result<usize, Error> {
88        // Update blob position to account for consumed bytes
89        self.blob_position += self.buffer_position as u64;
90        self.buffer_position = 0;
91        self.buffer_valid_len = 0;
92
93        // Calculate how many bytes remain in the blob
94        let blob_remaining = self.blob_size.saturating_sub(self.blob_position);
95        if blob_remaining == 0 {
96            return Err(Error::BlobInsufficientLength);
97        }
98
99        // Calculate how much to read (minimum of buffer size and remaining bytes)
100        let bytes_to_read = std::cmp::min(self.buffer_size as u64, blob_remaining) as usize;
101
102        // Reuse existing buffer if it has enough capacity, otherwise allocate new
103        let mut buf = std::mem::take(&mut self.buffer);
104        buf.clear();
105        let buf = if buf.capacity() >= bytes_to_read {
106            // SAFETY: We just cleared the buffer and verified capacity.
107            // The bytes will be overwritten by read_at before being read.
108            unsafe { buf.set_len(bytes_to_read) };
109            buf
110        } else {
111            IoBufMut::zeroed(bytes_to_read)
112        };
113        let read_result = self.blob.read_at(self.blob_position, buf).await?;
114        self.buffer = read_result.coalesce();
115        self.buffer_valid_len = bytes_to_read;
116
117        Ok(bytes_to_read)
118    }
119
120    /// Reads exactly `size` bytes into the provided buffer. Returns an error if not enough bytes
121    /// are available.
122    ///
123    /// # Panics
124    ///
125    /// Panics if `size` is greater than the length of `buf`.
126    pub async fn read_exact(&mut self, buf: &mut [u8], size: usize) -> Result<(), Error> {
127        assert!(
128            size <= buf.len(),
129            "provided buffer is too small for requested size"
130        );
131
132        // Quick check if we have enough bytes total before attempting reads
133        if (self.buffer_remaining() + self.blob_remaining() as usize) < size {
134            return Err(Error::BlobInsufficientLength);
135        }
136
137        // Read until we have enough bytes
138        let mut bytes_read = 0;
139        while bytes_read < size {
140            // Check if we need to refill
141            if self.buffer_position >= self.buffer_valid_len {
142                self.refill().await?;
143            }
144
145            // Calculate how many bytes we can copy from the buffer
146            let bytes_to_copy = std::cmp::min(size - bytes_read, self.buffer_remaining());
147
148            // Copy bytes from buffer to output
149            buf[bytes_read..(bytes_read + bytes_to_copy)].copy_from_slice(
150                &self.buffer.as_ref()[self.buffer_position..(self.buffer_position + bytes_to_copy)],
151            );
152
153            self.buffer_position += bytes_to_copy;
154            bytes_read += bytes_to_copy;
155        }
156
157        Ok(())
158    }
159
160    /// Returns the current absolute position in the blob.
161    pub const fn position(&self) -> u64 {
162        self.blob_position + self.buffer_position as u64
163    }
164
165    /// Repositions the buffer to read from the specified position in the blob.
166    pub const fn seek_to(&mut self, position: u64) -> Result<(), Error> {
167        // Check if the seek position is valid
168        if position > self.blob_size {
169            return Err(Error::BlobInsufficientLength);
170        }
171
172        // Check if the position is within the current buffer
173        let buffer_start = self.blob_position;
174        let buffer_end = self.blob_position + self.buffer_valid_len as u64;
175
176        if position >= buffer_start && position < buffer_end {
177            // Position is within the current buffer, adjust buffer_position
178            self.buffer_position = (position - self.blob_position) as usize;
179        } else {
180            // Position is outside the current buffer, reset buffer state
181            self.blob_position = position;
182            self.buffer_position = 0;
183            self.buffer_valid_len = 0;
184        }
185
186        Ok(())
187    }
188
189    /// Resizes the blob to the specified len and syncs the blob.
190    ///
191    /// This may be useful if reading some blob after unclean shutdown.
192    pub async fn resize(self, len: u64) -> Result<(), Error> {
193        self.blob.resize(len).await?;
194        self.blob.sync().await
195    }
196}