commonware_runtime/utils/buffer/read.rs
1use crate::{Blob, BufferPool, BufferPooler, Error, IoBuf, IoBufs};
2use std::num::NonZeroUsize;
3
4/// A reader that buffers content from a [Blob] to optimize the performance
5/// of a full scan of contents.
6///
7/// # Allocation Semantics
8///
9/// - The internal read buffer is allocated eagerly in [Self::new].
10/// - Refills try to reclaim mutable ownership of that same backing allocation.
11/// - If backing is still shared (for example, previously returned slices are alive), a pooled
12/// replacement is allocated and existing backing is left alive until all aliases drop.
13/// - [Self::read] returns zero-copy slices into refill buffers. Holding those slices may
14/// force allocation on subsequent refills.
15///
16/// # Example
17///
18/// ```
19/// use commonware_utils::NZUsize;
20/// use commonware_runtime::{Runner, buffer::Read, Blob, Error, Storage, deterministic, BufferPooler};
21///
22/// let executor = deterministic::Runner::default();
23/// executor.start(|context| async move {
24/// // Open a blob and add some data (e.g., a journal file)
25/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
26/// let data = b"Hello, world! This is a test.".to_vec();
27/// let size = data.len() as u64;
28/// blob.write_at(0, data).await.expect("unable to write data");
29///
30/// // Create a buffer
31/// let buffer = 64 * 1024;
32/// let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(buffer));
33///
34/// // Read data sequentially
35/// let header = reader.read(16).await.expect("unable to read data");
36/// println!("Read header: {:?}", header.coalesce().as_ref());
37///
38/// // Position is still at 16 (after header)
39/// assert_eq!(reader.position(), 16);
40/// });
41/// ```
42pub struct Read<B: Blob> {
43 /// The underlying blob to read from.
44 blob: B,
45 /// The buffer storing the data read from the blob.
46 buffer: IoBuf,
47 /// The current position in the blob from where the buffer was filled.
48 blob_position: u64,
49 /// The size of the blob.
50 blob_size: u64,
51 /// The current position within the buffer for reading.
52 buffer_position: usize,
53 /// The valid data length in the buffer.
54 buffer_valid_len: usize,
55 /// The maximum size of the buffer.
56 buffer_size: usize,
57 /// Buffer pool used for internal allocations.
58 pool: BufferPool,
59}
60
61impl<B: Blob> Read<B> {
62 /// Creates a new `Read` that reads from the given blob with the specified buffer size.
63 pub fn new(blob: B, blob_size: u64, buffer_size: NonZeroUsize, pool: BufferPool) -> Self {
64 Self {
65 blob,
66 buffer: pool.alloc(buffer_size.get()).freeze(),
67 blob_position: 0,
68 blob_size,
69 buffer_position: 0,
70 buffer_valid_len: 0,
71 buffer_size: buffer_size.get(),
72 pool,
73 }
74 }
75
76 /// Creates a new `Read`, extracting the storage [BufferPool] from a [BufferPooler].
77 pub fn from_pooler(
78 pooler: &impl BufferPooler,
79 blob: B,
80 blob_size: u64,
81 buffer_size: NonZeroUsize,
82 ) -> Self {
83 Self::new(
84 blob,
85 blob_size,
86 buffer_size,
87 pooler.storage_buffer_pool().clone(),
88 )
89 }
90
91 /// Returns how many valid bytes are remaining in the buffer.
92 pub const fn buffer_remaining(&self) -> usize {
93 self.buffer_valid_len - self.buffer_position
94 }
95
96 /// Returns how many bytes remain in the blob from the current position.
97 pub const fn blob_remaining(&self) -> u64 {
98 self.blob_size
99 .saturating_sub(self.blob_position + self.buffer_position as u64)
100 }
101
102 /// Returns the number of bytes in the blob, as provided at construction.
103 pub const fn blob_size(&self) -> u64 {
104 self.blob_size
105 }
106
107 /// Refills the buffer from the blob starting at the current blob position.
108 /// Returns the number of bytes read or an error if the read failed.
109 async fn refill(&mut self) -> Result<usize, Error> {
110 // Update blob position to account for consumed bytes
111 self.blob_position += self.buffer_position as u64;
112 self.buffer_position = 0;
113 self.buffer_valid_len = 0;
114
115 // Calculate how many bytes remain in the blob
116 let blob_remaining = self.blob_size.saturating_sub(self.blob_position);
117 if blob_remaining == 0 {
118 return Err(Error::BlobInsufficientLength);
119 }
120
121 // Calculate how much to read (minimum of buffer size and remaining bytes)
122 let bytes_to_read = std::cmp::min(self.buffer_size as u64, blob_remaining) as usize;
123
124 // Reuse existing allocation when uniquely owned. If readers still hold slices from
125 // previous reads, allocate a pooled replacement and leave old memory alive until dropped.
126 let current = std::mem::take(&mut self.buffer);
127 let buf = match current.try_into_mut() {
128 Ok(mut reusable) if reusable.capacity() >= bytes_to_read => {
129 reusable.clear();
130 reusable
131 }
132 Ok(_) | Err(_) => self.pool.alloc(bytes_to_read),
133 };
134 let read_result = self
135 .blob
136 .read_at_buf(self.blob_position, bytes_to_read, buf)
137 .await?;
138 self.buffer = read_result.coalesce_with_pool(&self.pool).freeze();
139 self.buffer_valid_len = self.buffer.len();
140
141 Ok(self.buffer_valid_len)
142 }
143
144 /// Reads exactly `len` bytes and returns them as immutable bytes.
145 ///
146 /// Returned bytes are composed of zero-copy slices from the internal read buffer.
147 /// Holding returned slices can keep the current backing shared, which may require
148 /// allocation on later refills.
149 ///
150 /// Returns an error if not enough bytes are available.
151 pub async fn read(&mut self, len: usize) -> Result<IoBufs, Error> {
152 if len == 0 {
153 return Ok(IoBufs::default());
154 }
155
156 // Quick check against total remaining bytes at current position.
157 if self.blob_remaining() < len as u64 {
158 return Err(Error::BlobInsufficientLength);
159 }
160
161 // Read until we have enough bytes
162 let mut remaining = len;
163 let mut out = IoBufs::default();
164 while remaining > 0 {
165 // Check if we need to refill
166 if self.buffer_position >= self.buffer_valid_len {
167 self.refill().await?;
168 }
169
170 // Calculate how many bytes we can take from the buffer
171 let bytes_to_take = std::cmp::min(remaining, self.buffer_remaining());
172
173 // Append bytes from buffer to output
174 out.append(
175 self.buffer
176 .slice(self.buffer_position..(self.buffer_position + bytes_to_take)),
177 );
178
179 self.buffer_position += bytes_to_take;
180 remaining -= bytes_to_take;
181 }
182
183 Ok(out)
184 }
185
186 /// Returns the current absolute position in the blob.
187 pub const fn position(&self) -> u64 {
188 self.blob_position + self.buffer_position as u64
189 }
190
191 /// Repositions the buffer to read from the specified position in the blob.
192 pub const fn seek_to(&mut self, position: u64) -> Result<(), Error> {
193 // Check if the seek position is valid
194 if position > self.blob_size {
195 return Err(Error::BlobInsufficientLength);
196 }
197
198 // Check if the position is within the current buffer
199 let buffer_start = self.blob_position;
200 let buffer_end = self.blob_position + self.buffer_valid_len as u64;
201
202 if position >= buffer_start && position < buffer_end {
203 // Position is within the current buffer, adjust buffer_position
204 self.buffer_position = (position - self.blob_position) as usize;
205 } else {
206 // Position is outside the current buffer, reset buffer state
207 self.blob_position = position;
208 self.buffer_position = 0;
209 self.buffer_valid_len = 0;
210 }
211
212 Ok(())
213 }
214
215 /// Resizes the blob to the specified len and syncs the blob.
216 ///
217 /// This may be useful if reading some blob after unclean shutdown.
218 pub async fn resize(self, len: u64) -> Result<(), Error> {
219 self.blob.resize(len).await?;
220 self.blob.sync().await
221 }
222}