commonware_runtime/utils/buffer/
write.rs

1use crate::{Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use std::sync::Arc;
4
5/// The internal state of a [Write] buffer.
6struct Inner<B: Blob> {
7    /// The underlying blob to write to.
8    blob: B,
9    /// The buffer storing data to be written to the blob.
10    ///
11    /// The buffer always represents data at the "tip" of the logical blob,
12    /// starting at `position` and extending for `buffer.len()` bytes.
13    buffer: Vec<u8>,
14    /// The offset in the blob where the buffered data starts.
15    ///
16    /// This represents the logical position in the blob where `buffer[0]` would be written.
17    /// The buffer is maintained at the "tip" to support efficient size calculation and appends.
18    position: u64,
19    /// The maximum size of the buffer.
20    capacity: usize,
21}
22
23impl<B: Blob> Inner<B> {
24    /// Returns the current logical size of the blob including any buffered data.
25    fn size(&self) -> u64 {
26        self.position + self.buffer.len() as u64
27    }
28
29    /// Appends bytes to the internal buffer, maintaining the "buffer at tip" invariant.
30    ///
31    /// If the buffer capacity would be exceeded, it is flushed first. If the data
32    /// is larger than the buffer capacity, it is written directly to the blob.
33    ///
34    /// Returns an error if the write to the underlying [Blob] fails (may be due to a `flush` of data not
35    /// related to the data being written).
36    async fn write<S: Into<StableBuf>>(&mut self, buf: S) -> Result<(), Error> {
37        // If the buffer capacity will be exceeded, flush the buffer first
38        let buf = buf.into();
39        let buf_len = buf.len();
40
41        // Flush buffer if adding this data would exceed capacity
42        if self.buffer.len() + buf_len > self.capacity {
43            self.flush().await?;
44        }
45
46        // Write directly to blob if data is larger than buffer capacity
47        if buf_len > self.capacity {
48            self.blob.write_at(buf, self.position).await?;
49            self.position += buf_len as u64;
50            return Ok(());
51        }
52
53        // Append to buffer (buffer is now guaranteed to have space)
54        self.buffer.extend_from_slice(buf.as_ref());
55        Ok(())
56    }
57
58    /// Flushes buffered data to the underlying [Blob] and advances the position.
59    ///
60    /// After flushing, the buffer is reset and positioned at the new tip.
61    /// Does nothing if the buffer is empty.
62    ///
63    /// # Returns
64    ///
65    /// An error if the write to the underlying [Blob] fails. On failure,
66    /// the buffer is reset and pending data is lost.
67    async fn flush(&mut self) -> Result<(), Error> {
68        if self.buffer.is_empty() {
69            return Ok(());
70        }
71
72        // Take the buffer contents and write to blob
73        let buf = std::mem::take(&mut self.buffer);
74        let len = buf.len() as u64;
75        self.blob.write_at(buf, self.position).await?;
76
77        // Advance position to the new tip and reset buffer
78        self.position += len;
79        self.buffer = Vec::with_capacity(self.capacity);
80        Ok(())
81    }
82
83    /// Flushes buffered data and ensures it is durably persisted to the underlying [Blob].
84    ///
85    /// Returns an error if either the flush or sync operation fails.
86    async fn sync(&mut self) -> Result<(), Error> {
87        self.flush().await?;
88        self.blob.sync().await
89    }
90
91    /// Closes the writer and ensures all buffered data is durably persisted to the underlying [Blob].
92    ///
93    /// Returns an error if the close operation fails.
94    async fn close(&mut self) -> Result<(), Error> {
95        // Ensure all buffered data is durably persisted
96        self.sync().await?;
97
98        // Close the underlying blob.
99        //
100        // We use clone here to ensure we retain the close semantics of the blob provided (if
101        // called multiple times, the blob determines whether to error).
102        self.blob.clone().close().await
103    }
104}
105
106/// A writer that buffers content to a [Blob] to optimize the performance
107/// of appending or updating data.
108///
109/// # Example
110///
111/// ```
112/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
113///
114/// let executor = deterministic::Runner::default();
115/// executor.start(|context| async move {
116///     // Open a blob for writing
117///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
118///     assert_eq!(size, 0);
119///
120///     // Create a buffered writer with 16-byte buffer
121///     let mut blob = Write::new(blob, 0, 16);
122///     blob.write_at(b"hello".to_vec(), 0).await.expect("write failed");
123///     blob.sync().await.expect("sync failed");
124///
125///     // Write more data in multiple flushes
126///     blob.write_at(b" world".to_vec(), 5).await.expect("write failed");
127///     blob.write_at(b"!".to_vec(), 11).await.expect("write failed");
128///     blob.sync().await.expect("sync failed");
129///
130///     // Read back the data to verify
131///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
132///     let mut reader = Read::new(blob, size, 8);
133///     let mut buf = vec![0u8; size as usize];
134///     reader.read_exact(&mut buf, size as usize).await.expect("read failed");
135///     assert_eq!(&buf, b"hello world!");
136/// });
137/// ```
138#[derive(Clone)]
139pub struct Write<B: Blob> {
140    inner: Arc<RwLock<Inner<B>>>,
141}
142
143impl<B: Blob> Write<B> {
144    /// Creates a new `Write` that buffers writes to a [Blob] with the provided size and buffer capacity.
145    ///
146    /// # Panics
147    ///
148    /// Panics if `capacity` is zero.
149    pub fn new(blob: B, size: u64, capacity: usize) -> Self {
150        assert!(capacity > 0, "buffer capacity must be greater than zero");
151
152        Self {
153            inner: Arc::new(RwLock::new(Inner {
154                blob,
155                buffer: Vec::with_capacity(capacity),
156                position: size,
157                capacity,
158            })),
159        }
160    }
161
162    /// Returns the current logical size of the blob including any buffered data.
163    ///
164    /// This represents the total size of data that would be present after flushing.
165    #[allow(clippy::len_without_is_empty)]
166    pub async fn size(&self) -> u64 {
167        let inner = self.inner.read().await;
168
169        inner.size()
170    }
171}
172
173impl<B: Blob> Blob for Write<B> {
174    async fn read_at(
175        &self,
176        buf: impl Into<StableBuf> + Send,
177        offset: u64,
178    ) -> Result<StableBuf, Error> {
179        // Acquire a read lock on the inner state
180        let inner = self.inner.read().await;
181
182        // Ensure offset read doesn't overflow
183        let mut buf = buf.into();
184        let data_len = buf.len();
185        let data_end = offset
186            .checked_add(data_len as u64)
187            .ok_or(Error::OffsetOverflow)?;
188
189        // If the data required is beyond the buffer end, return an error
190        let buffer_start = inner.position;
191        let buffer_end = buffer_start + inner.buffer.len() as u64;
192
193        // Ensure we don't read beyond the logical end of the blob
194        if data_end > buffer_end {
195            return Err(Error::BlobInsufficientLength);
196        }
197
198        // Case 1: Read entirely from the underlying blob (before buffer)
199        if data_end <= buffer_start {
200            return inner.blob.read_at(buf, offset).await;
201        }
202
203        // Case 2: Read entirely from the buffer
204        if offset >= buffer_start {
205            let buffer_offset = (offset - buffer_start) as usize;
206            let end_offset = buffer_offset + data_len;
207
208            if end_offset > inner.buffer.len() {
209                return Err(Error::BlobInsufficientLength);
210            }
211
212            buf.put_slice(&inner.buffer[buffer_offset..end_offset]);
213            return Ok(buf);
214        }
215
216        // Case 3: Read spans both blob and buffer
217        let blob_bytes = (buffer_start - offset) as usize;
218        let buffer_bytes = data_len - blob_bytes;
219
220        // Read from blob first
221        let blob_part = vec![0u8; blob_bytes];
222        let blob_part = inner.blob.read_at(blob_part, offset).await?;
223
224        // Copy blob data and buffer data to result
225        buf.as_mut()[..blob_bytes].copy_from_slice(blob_part.as_ref());
226        buf.as_mut()[blob_bytes..].copy_from_slice(&inner.buffer[..buffer_bytes]);
227
228        Ok(buf)
229    }
230
231    async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
232        // Acquire a write lock on the inner state
233        let mut inner = self.inner.write().await;
234
235        // Prepare the buf to be written
236        let buf = buf.into();
237        let data = buf.as_ref();
238        let data_len = data.len();
239
240        // Current state of the buffer in the blob
241        let buffer_start = inner.position;
242        let buffer_end = buffer_start + inner.buffer.len() as u64;
243
244        // Case 1: Simple append to buffered data (most common case)
245        if offset == buffer_end {
246            return inner.write(buf).await;
247        }
248
249        // Case 2: Write can be merged into existing buffer.
250        //
251        // This handles overwrites and extensions within the buffer's current capacity,
252        // including writes that create gaps (filled with zeros) in the buffer.
253        let can_merge_into_buffer = offset >= buffer_start
254            && (offset - buffer_start) + data_len as u64 <= inner.capacity as u64;
255        if can_merge_into_buffer {
256            let buffer_offset = (offset - buffer_start) as usize;
257            let required_len = buffer_offset + data_len;
258
259            // Expand buffer if necessary (fills with zeros)
260            if required_len > inner.buffer.len() {
261                inner.buffer.resize(required_len, 0);
262            }
263
264            // Copy data into buffer
265            inner.buffer[buffer_offset..required_len].copy_from_slice(data);
266            return Ok(());
267        }
268
269        // Case 3: Write cannot be merged - flush buffer and write directly.
270        //
271        // This includes: writes before the buffer, writes that would exceed capacity,
272        // or non-contiguous writes that can't be merged.
273        if !inner.buffer.is_empty() {
274            inner.flush().await?;
275        }
276        inner.blob.write_at(buf, offset).await?;
277
278        // Update position to maintain "buffer at tip" invariant.
279        //
280        // Position should advance to the end of this write if it extends the logical blob
281        let write_end = offset + data_len as u64;
282        if write_end > inner.position {
283            inner.position = write_end;
284        }
285
286        Ok(())
287    }
288
289    async fn truncate(&self, len: u64) -> Result<(), Error> {
290        // Acquire a write lock on the inner state
291        let mut inner = self.inner.write().await;
292
293        // Determine the current buffer boundaries
294        let buffer_start = inner.position;
295        let buffer_end = buffer_start + inner.buffer.len() as u64;
296
297        // Adjust buffer content based on truncation point
298        if len <= buffer_start {
299            // Truncation point is before or at the start of the buffer.
300            //
301            // All buffered data is now beyond the new length and should be discarded.
302            inner.buffer.clear();
303            inner.blob.truncate(len).await?;
304            inner.position = len;
305        } else if len < buffer_end {
306            // Truncation point is within the buffer.
307            //
308            // Keep only the portion of the buffer up to the truncation point.
309            let new_buffer_len = (len - buffer_start) as usize;
310            inner.buffer.truncate(new_buffer_len);
311        } else {
312            // Truncation point is at or after the end of the buffer.
313            //
314            // No changes needed to the buffer content.
315        }
316        Ok(())
317    }
318
319    async fn sync(&self) -> Result<(), Error> {
320        let mut inner = self.inner.write().await;
321        inner.sync().await
322    }
323
324    async fn close(self) -> Result<(), Error> {
325        let mut inner = self.inner.write().await;
326        inner.close().await
327    }
328}