commonware_runtime/utils/buffer/write.rs
1use crate::{Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use std::sync::Arc;
4
5/// The internal state of a [Write] buffer.
6struct Inner<B: Blob> {
7 /// The underlying blob to write to.
8 blob: B,
9 /// The buffer storing data to be written to the blob.
10 ///
11 /// The buffer always represents data at the "tip" of the logical blob,
12 /// starting at `position` and extending for `buffer.len()` bytes.
13 buffer: Vec<u8>,
14 /// The offset in the blob where the buffered data starts.
15 ///
16 /// This represents the logical position in the blob where `buffer[0]` would be written.
17 /// The buffer is maintained at the "tip" to support efficient size calculation and appends.
18 position: u64,
19 /// The maximum size of the buffer.
20 capacity: usize,
21}
22
23impl<B: Blob> Inner<B> {
24 /// Returns the current logical size of the blob including any buffered data.
25 fn size(&self) -> u64 {
26 self.position + self.buffer.len() as u64
27 }
28
29 /// Appends bytes to the internal buffer, maintaining the "buffer at tip" invariant.
30 ///
31 /// If the buffer capacity would be exceeded, it is flushed first. If the data
32 /// is larger than the buffer capacity, it is written directly to the blob.
33 ///
34 /// Returns an error if the write to the underlying [Blob] fails (may be due to a `flush` of data not
35 /// related to the data being written).
36 async fn write<S: Into<StableBuf>>(&mut self, buf: S) -> Result<(), Error> {
37 // If the buffer capacity will be exceeded, flush the buffer first
38 let buf = buf.into();
39 let buf_len = buf.len();
40
41 // Flush buffer if adding this data would exceed capacity
42 if self.buffer.len() + buf_len > self.capacity {
43 self.flush().await?;
44 }
45
46 // Write directly to blob if data is larger than buffer capacity
47 if buf_len > self.capacity {
48 self.blob.write_at(buf, self.position).await?;
49 self.position += buf_len as u64;
50 return Ok(());
51 }
52
53 // Append to buffer (buffer is now guaranteed to have space)
54 self.buffer.extend_from_slice(buf.as_ref());
55 Ok(())
56 }
57
58 /// Flushes buffered data to the underlying [Blob] and advances the position.
59 ///
60 /// After flushing, the buffer is reset and positioned at the new tip.
61 /// Does nothing if the buffer is empty.
62 ///
63 /// # Returns
64 ///
65 /// An error if the write to the underlying [Blob] fails. On failure,
66 /// the buffer is reset and pending data is lost.
67 async fn flush(&mut self) -> Result<(), Error> {
68 if self.buffer.is_empty() {
69 return Ok(());
70 }
71
72 // Take the buffer contents and write to blob
73 let buf = std::mem::take(&mut self.buffer);
74 let len = buf.len() as u64;
75 self.blob.write_at(buf, self.position).await?;
76
77 // Advance position to the new tip and reset buffer
78 self.position += len;
79 self.buffer = Vec::with_capacity(self.capacity);
80 Ok(())
81 }
82
83 /// Flushes buffered data and ensures it is durably persisted to the underlying [Blob].
84 ///
85 /// Returns an error if either the flush or sync operation fails.
86 async fn sync(&mut self) -> Result<(), Error> {
87 self.flush().await?;
88 self.blob.sync().await
89 }
90
91 /// Closes the writer and ensures all buffered data is durably persisted to the underlying [Blob].
92 ///
93 /// Returns an error if the close operation fails.
94 async fn close(&mut self) -> Result<(), Error> {
95 // Ensure all buffered data is durably persisted
96 self.sync().await?;
97
98 // Close the underlying blob.
99 //
100 // We use clone here to ensure we retain the close semantics of the blob provided (if
101 // called multiple times, the blob determines whether to error).
102 self.blob.clone().close().await
103 }
104}
105
106/// A writer that buffers content to a [Blob] to optimize the performance
107/// of appending or updating data.
108///
109/// # Example
110///
111/// ```
112/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
113///
114/// let executor = deterministic::Runner::default();
115/// executor.start(|context| async move {
116/// // Open a blob for writing
117/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
118/// assert_eq!(size, 0);
119///
120/// // Create a buffered writer with 16-byte buffer
121/// let mut blob = Write::new(blob, 0, 16);
122/// blob.write_at(b"hello".to_vec(), 0).await.expect("write failed");
123/// blob.sync().await.expect("sync failed");
124///
125/// // Write more data in multiple flushes
126/// blob.write_at(b" world".to_vec(), 5).await.expect("write failed");
127/// blob.write_at(b"!".to_vec(), 11).await.expect("write failed");
128/// blob.sync().await.expect("sync failed");
129///
130/// // Read back the data to verify
131/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
132/// let mut reader = Read::new(blob, size, 8);
133/// let mut buf = vec![0u8; size as usize];
134/// reader.read_exact(&mut buf, size as usize).await.expect("read failed");
135/// assert_eq!(&buf, b"hello world!");
136/// });
137/// ```
138#[derive(Clone)]
139pub struct Write<B: Blob> {
140 inner: Arc<RwLock<Inner<B>>>,
141}
142
143impl<B: Blob> Write<B> {
144 /// Creates a new `Write` that buffers writes to a [Blob] with the provided size and buffer capacity.
145 ///
146 /// # Panics
147 ///
148 /// Panics if `capacity` is zero.
149 pub fn new(blob: B, size: u64, capacity: usize) -> Self {
150 assert!(capacity > 0, "buffer capacity must be greater than zero");
151
152 Self {
153 inner: Arc::new(RwLock::new(Inner {
154 blob,
155 buffer: Vec::with_capacity(capacity),
156 position: size,
157 capacity,
158 })),
159 }
160 }
161
162 /// Returns the current logical size of the blob including any buffered data.
163 ///
164 /// This represents the total size of data that would be present after flushing.
165 #[allow(clippy::len_without_is_empty)]
166 pub async fn size(&self) -> u64 {
167 let inner = self.inner.read().await;
168
169 inner.size()
170 }
171}
172
173impl<B: Blob> Blob for Write<B> {
174 async fn read_at(
175 &self,
176 buf: impl Into<StableBuf> + Send,
177 offset: u64,
178 ) -> Result<StableBuf, Error> {
179 // Acquire a read lock on the inner state
180 let inner = self.inner.read().await;
181
182 // Ensure offset read doesn't overflow
183 let mut buf = buf.into();
184 let data_len = buf.len();
185 let data_end = offset
186 .checked_add(data_len as u64)
187 .ok_or(Error::OffsetOverflow)?;
188
189 // If the data required is beyond the buffer end, return an error
190 let buffer_start = inner.position;
191 let buffer_end = buffer_start + inner.buffer.len() as u64;
192
193 // Ensure we don't read beyond the logical end of the blob
194 if data_end > buffer_end {
195 return Err(Error::BlobInsufficientLength);
196 }
197
198 // Case 1: Read entirely from the underlying blob (before buffer)
199 if data_end <= buffer_start {
200 return inner.blob.read_at(buf, offset).await;
201 }
202
203 // Case 2: Read entirely from the buffer
204 if offset >= buffer_start {
205 let buffer_offset = (offset - buffer_start) as usize;
206 let end_offset = buffer_offset + data_len;
207
208 if end_offset > inner.buffer.len() {
209 return Err(Error::BlobInsufficientLength);
210 }
211
212 buf.put_slice(&inner.buffer[buffer_offset..end_offset]);
213 return Ok(buf);
214 }
215
216 // Case 3: Read spans both blob and buffer
217 let blob_bytes = (buffer_start - offset) as usize;
218 let buffer_bytes = data_len - blob_bytes;
219
220 // Read from blob first
221 let blob_part = vec![0u8; blob_bytes];
222 let blob_part = inner.blob.read_at(blob_part, offset).await?;
223
224 // Copy blob data and buffer data to result
225 buf.as_mut()[..blob_bytes].copy_from_slice(blob_part.as_ref());
226 buf.as_mut()[blob_bytes..].copy_from_slice(&inner.buffer[..buffer_bytes]);
227
228 Ok(buf)
229 }
230
231 async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
232 // Acquire a write lock on the inner state
233 let mut inner = self.inner.write().await;
234
235 // Prepare the buf to be written
236 let buf = buf.into();
237 let data = buf.as_ref();
238 let data_len = data.len();
239
240 // Current state of the buffer in the blob
241 let buffer_start = inner.position;
242 let buffer_end = buffer_start + inner.buffer.len() as u64;
243
244 // Case 1: Simple append to buffered data (most common case)
245 if offset == buffer_end {
246 return inner.write(buf).await;
247 }
248
249 // Case 2: Write can be merged into existing buffer.
250 //
251 // This handles overwrites and extensions within the buffer's current capacity,
252 // including writes that create gaps (filled with zeros) in the buffer.
253 let can_merge_into_buffer = offset >= buffer_start
254 && (offset - buffer_start) + data_len as u64 <= inner.capacity as u64;
255 if can_merge_into_buffer {
256 let buffer_offset = (offset - buffer_start) as usize;
257 let required_len = buffer_offset + data_len;
258
259 // Expand buffer if necessary (fills with zeros)
260 if required_len > inner.buffer.len() {
261 inner.buffer.resize(required_len, 0);
262 }
263
264 // Copy data into buffer
265 inner.buffer[buffer_offset..required_len].copy_from_slice(data);
266 return Ok(());
267 }
268
269 // Case 3: Write cannot be merged - flush buffer and write directly.
270 //
271 // This includes: writes before the buffer, writes that would exceed capacity,
272 // or non-contiguous writes that can't be merged.
273 if !inner.buffer.is_empty() {
274 inner.flush().await?;
275 }
276 inner.blob.write_at(buf, offset).await?;
277
278 // Update position to maintain "buffer at tip" invariant.
279 //
280 // Position should advance to the end of this write if it extends the logical blob
281 let write_end = offset + data_len as u64;
282 if write_end > inner.position {
283 inner.position = write_end;
284 }
285
286 Ok(())
287 }
288
289 async fn truncate(&self, len: u64) -> Result<(), Error> {
290 // Acquire a write lock on the inner state
291 let mut inner = self.inner.write().await;
292
293 // Determine the current buffer boundaries
294 let buffer_start = inner.position;
295 let buffer_end = buffer_start + inner.buffer.len() as u64;
296
297 // Adjust buffer content based on truncation point
298 if len <= buffer_start {
299 // Truncation point is before or at the start of the buffer.
300 //
301 // All buffered data is now beyond the new length and should be discarded.
302 inner.buffer.clear();
303 inner.blob.truncate(len).await?;
304 inner.position = len;
305 } else if len < buffer_end {
306 // Truncation point is within the buffer.
307 //
308 // Keep only the portion of the buffer up to the truncation point.
309 let new_buffer_len = (len - buffer_start) as usize;
310 inner.buffer.truncate(new_buffer_len);
311 } else {
312 // Truncation point is at or after the end of the buffer.
313 //
314 // No changes needed to the buffer content.
315 }
316 Ok(())
317 }
318
319 async fn sync(&self) -> Result<(), Error> {
320 let mut inner = self.inner.write().await;
321 inner.sync().await
322 }
323
324 async fn close(self) -> Result<(), Error> {
325 let mut inner = self.inner.write().await;
326 inner.close().await
327 }
328}