commonware_runtime/utils/buffer/read.rs
1use crate::{Blob, Error};
2use commonware_utils::StableBuf;
3use std::num::NonZeroUsize;
4
5/// A reader that buffers content from a [Blob] to optimize the performance
6/// of a full scan of contents.
7///
8/// # Example
9///
10/// ```
11/// use commonware_utils::NZUsize;
12/// use commonware_runtime::{Runner, buffer::Read, Blob, Error, Storage, deterministic};
13///
14/// let executor = deterministic::Runner::default();
15/// executor.start(|context| async move {
16/// // Open a blob and add some data (e.g., a journal file)
17/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
18/// let data = b"Hello, world! This is a test.".to_vec();
19/// let size = data.len() as u64;
20/// blob.write_at(data, 0).await.expect("unable to write data");
21///
22/// // Create a buffer
23/// let buffer = 64 * 1024;
24/// let mut reader = Read::new(blob, size, NZUsize!(buffer));
25///
26/// // Read data sequentially
27/// let mut header = [0u8; 16];
28/// reader.read_exact(&mut header, 16).await.expect("unable to read data");
29/// println!("Read header: {:?}", header);
30///
31/// // Position is still at 16 (after header)
32/// assert_eq!(reader.position(), 16);
33/// });
34/// ```
35pub struct Read<B: Blob> {
36 /// The underlying blob to read from.
37 blob: B,
38 /// The buffer storing the data read from the blob.
39 buffer: StableBuf,
40 /// The current position in the blob from where the buffer was filled.
41 blob_position: u64,
42 /// The size of the blob.
43 blob_size: u64,
44 /// The current position within the buffer for reading.
45 buffer_position: usize,
46 /// The valid data length in the buffer.
47 buffer_valid_len: usize,
48 /// The maximum size of the buffer.
49 buffer_size: usize,
50}
51
52impl<B: Blob> Read<B> {
53 /// Creates a new `Read` that reads from the given blob with the specified buffer size.
54 ///
55 /// # Panics
56 ///
57 /// Panics if `buffer_size` is zero.
58 pub fn new(blob: B, blob_size: u64, buffer_size: NonZeroUsize) -> Self {
59 Self {
60 blob,
61 buffer: vec![0; buffer_size.get()].into(),
62 blob_position: 0,
63 blob_size,
64 buffer_position: 0,
65 buffer_valid_len: 0,
66 buffer_size: buffer_size.get(),
67 }
68 }
69
70 /// Returns how many valid bytes are remaining in the buffer.
71 pub fn buffer_remaining(&self) -> usize {
72 self.buffer_valid_len - self.buffer_position
73 }
74
75 /// Returns how many bytes remain in the blob from the current position.
76 pub fn blob_remaining(&self) -> u64 {
77 self.blob_size
78 .saturating_sub(self.blob_position + self.buffer_position as u64)
79 }
80
81 /// Returns the number of bytes in the blob, as provided at construction.
82 pub fn blob_size(&self) -> u64 {
83 self.blob_size
84 }
85
86 /// Refills the buffer from the blob starting at the current blob position.
87 /// Returns the number of bytes read or an error if the read failed.
88 async fn refill(&mut self) -> Result<usize, Error> {
89 // Update blob position to account for consumed bytes
90 self.blob_position += self.buffer_position as u64;
91 self.buffer_position = 0;
92 self.buffer_valid_len = 0;
93
94 // Calculate how many bytes remain in the blob
95 let blob_remaining = self.blob_size.saturating_sub(self.blob_position);
96 if blob_remaining == 0 {
97 return Err(Error::BlobInsufficientLength);
98 }
99
100 // Calculate how much to read (minimum of buffer size and remaining bytes)
101 let bytes_to_read = std::cmp::min(self.buffer_size as u64, blob_remaining) as usize;
102
103 // Read the data - we only need a single read operation since we know exactly how much data is available.
104 if bytes_to_read < self.buffer_size {
105 // Read into a temp buffer for the end-of-blob case to avoid truncating underlying buffer.
106 let mut tmp_buffer = vec![0u8; bytes_to_read];
107 tmp_buffer = self
108 .blob
109 .read_at(tmp_buffer, self.blob_position)
110 .await?
111 .into();
112 self.buffer.as_mut()[0..bytes_to_read].copy_from_slice(&tmp_buffer[0..bytes_to_read]);
113 } else {
114 self.buffer = self
115 .blob
116 .read_at(std::mem::take(&mut self.buffer), self.blob_position)
117 .await?;
118 }
119 self.buffer_valid_len = bytes_to_read;
120
121 Ok(bytes_to_read)
122 }
123
124 /// Reads exactly `size` bytes into the provided buffer. Returns an error if not enough bytes
125 /// are available.
126 ///
127 /// # Panics
128 ///
129 /// Panics if `size` is greater than the length of `buf`.
130 pub async fn read_exact(&mut self, buf: &mut [u8], size: usize) -> Result<(), Error> {
131 assert!(
132 size <= buf.len(),
133 "provided buffer is too small for requested size"
134 );
135
136 // Quick check if we have enough bytes total before attempting reads
137 if (self.buffer_remaining() + self.blob_remaining() as usize) < size {
138 return Err(Error::BlobInsufficientLength);
139 }
140
141 // Read until we have enough bytes
142 let mut bytes_read = 0;
143 while bytes_read < size {
144 // Check if we need to refill
145 if self.buffer_position >= self.buffer_valid_len {
146 self.refill().await?;
147 }
148
149 // Calculate how many bytes we can copy from the buffer
150 let bytes_to_copy = std::cmp::min(size - bytes_read, self.buffer_remaining());
151
152 // Copy bytes from buffer to output
153 buf[bytes_read..(bytes_read + bytes_to_copy)].copy_from_slice(
154 &self.buffer.as_ref()[self.buffer_position..(self.buffer_position + bytes_to_copy)],
155 );
156
157 self.buffer_position += bytes_to_copy;
158 bytes_read += bytes_to_copy;
159 }
160
161 Ok(())
162 }
163
164 /// Returns the current absolute position in the blob.
165 pub fn position(&self) -> u64 {
166 self.blob_position + self.buffer_position as u64
167 }
168
169 /// Repositions the buffer to read from the specified position in the blob.
170 pub fn seek_to(&mut self, position: u64) -> Result<(), Error> {
171 // Check if the seek position is valid
172 if position > self.blob_size {
173 return Err(Error::BlobInsufficientLength);
174 }
175
176 // Check if the position is within the current buffer
177 let buffer_start = self.blob_position;
178 let buffer_end = self.blob_position + self.buffer_valid_len as u64;
179
180 if position >= buffer_start && position < buffer_end {
181 // Position is within the current buffer, adjust buffer_position
182 self.buffer_position = (position - self.blob_position) as usize;
183 } else {
184 // Position is outside the current buffer, reset buffer state
185 self.blob_position = position;
186 self.buffer_position = 0;
187 self.buffer_valid_len = 0;
188 }
189
190 Ok(())
191 }
192
193 /// Resizes the blob to the specified len and syncs the blob.
194 ///
195 /// This may be useful if reading some blob after unclean shutdown.
196 pub async fn resize(self, len: u64) -> Result<(), Error> {
197 self.blob.resize(len).await?;
198 self.blob.sync().await
199 }
200}