commonware_runtime/utils/buffer/
write.rs

1use crate::{Blob, Error, RwLock};
2use commonware_utils::{StableBuf, StableBufMut};
3use std::sync::Arc;
4
5/// The internal state of a [Write] buffer.
6struct Inner<B: Blob> {
7    /// The underlying blob to write to.
8    blob: B,
9    /// The buffer storing data to be written to the blob.
10    buffer: Vec<u8>,
11    /// The offset in the blob where the data in `buffer` starts.
12    /// If `buffer` is empty, this is the position where the next appended byte would conceptually begin in the blob.
13    position: u64,
14    /// The maximum size of the buffer.
15    capacity: usize,
16}
17
18impl<B: Blob> Inner<B> {
19    /// Appends bytes to the internal buffer. If the buffer capacity is exceeded, it will be flushed to the
20    /// underlying [Blob].
21    ///
22    /// If the size of the provided bytes is larger than the buffer capacity, the bytes will be written
23    /// directly to the underlying [Blob]. If this occurs regularly, the buffer capacity should be increased (as
24    /// the buffer will not provide any benefit).
25    ///
26    /// Returns an error if the write to the underlying [Blob] fails (may be due to a `flush` of data not
27    /// related to the data being written).
28    async fn write<Buf: StableBuf>(&mut self, buf: Buf) -> Result<(), Error> {
29        // If the buffer capacity will be exceeded, flush the buffer first
30        let buf_len = buf.len();
31        if self.buffer.len() + buf_len > self.capacity {
32            self.flush().await?;
33        }
34
35        // Write directly to the blob (if the buffer is too small)
36        if buf_len > self.capacity {
37            self.blob.write_at(buf, self.position).await?;
38            self.position += buf_len as u64;
39            return Ok(());
40        }
41
42        // Append to the buffer
43        self.buffer.extend_from_slice(buf.as_ref());
44        Ok(())
45    }
46
47    /// Flushes buffered data to the underlying [Blob]. Does nothing if the buffer is empty.
48    ///
49    /// If the write to the underlying [Blob] fails, the buffer will be reset (and any pending data not yet
50    /// written will be lost).
51    async fn flush(&mut self) -> Result<(), Error> {
52        // If the buffer is empty, do nothing
53        if self.buffer.is_empty() {
54            return Ok(());
55        }
56
57        // Take the buffer and write it to the blob
58        let buf = std::mem::take(&mut self.buffer);
59        let len = buf.len() as u64;
60        self.blob.write_at(buf, self.position).await?;
61
62        // If successful, update the position and allocate a new buffer
63        self.position += len;
64        self.buffer = Vec::with_capacity(self.capacity);
65        Ok(())
66    }
67
68    /// Flushes buffered data and ensures it is durably persisted to the underlying [Blob].
69    ///
70    /// Returns an error if either the flush or sync operation fails.
71    async fn sync(&mut self) -> Result<(), Error> {
72        self.flush().await?;
73        self.blob.sync().await
74    }
75
76    /// Closes the writer and ensures all buffered data is durably persisted to the underlying [Blob].
77    ///
78    /// Returns an error if the close operation fails.
79    async fn close(&mut self) -> Result<(), Error> {
80        // Ensure all buffered data is durably persisted
81        self.sync().await?;
82
83        // Close the underlying blob.
84        //
85        // We use clone here to ensure we retain the close semantics of the blob provided (if
86        // called multiple times, the blob determines whether to error).
87        self.blob.clone().close().await
88    }
89}
90
91/// A writer that buffers content to a [Blob] to optimize the performance
92/// of appending or updating data.
93///
94/// # Example
95///
96/// ```
97/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
98///
99/// let executor = deterministic::Runner::default();
100/// executor.start(|context| async move {
101///     // Open a blob for writing
102///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
103///     assert_eq!(size, 0);
104///
105///     // Create a buffered writer with 16-byte buffer
106///     let mut blob = Write::new(blob, 0, 16);
107///     blob.write_at("hello".as_bytes(), 0).await.expect("write failed");
108///     blob.sync().await.expect("sync failed");
109///
110///     // Write more data in multiple flushes
111///     blob.write_at(" world".as_bytes(), 5).await.expect("write failed");
112///     blob.write_at("!".as_bytes(), 11).await.expect("write failed");
113///     blob.sync().await.expect("sync failed");
114///
115///     // Read back the data to verify
116///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
117///     let mut reader = Read::new(blob, size, 8);
118///     let mut buf = vec![0u8; size as usize];
119///     reader.read_exact(&mut buf, size as usize).await.expect("read failed");
120///     assert_eq!(&buf, b"hello world!");
121/// });
122/// ```
123#[derive(Clone)]
124pub struct Write<B: Blob> {
125    inner: Arc<RwLock<Inner<B>>>,
126}
127
128impl<B: Blob> Write<B> {
129    /// Creates a new `Write` that writes to the given blob starting at `position` with the specified buffer capacity.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `capacity` is zero.
134    pub fn new(blob: B, position: u64, capacity: usize) -> Self {
135        assert!(capacity > 0, "buffer capacity must be greater than zero");
136        Self {
137            inner: Arc::new(RwLock::new(Inner {
138                blob,
139                buffer: Vec::with_capacity(capacity),
140                position,
141                capacity,
142            })),
143        }
144    }
145}
146
147impl<B: Blob> Blob for Write<B> {
148    async fn read_at<T: StableBufMut>(&self, mut buf: T, offset: u64) -> Result<T, Error> {
149        // Acquire a read lock on the inner state
150        let inner = self.inner.read().await;
151
152        // Ensure offset read doesn't overflow
153        let data_len = buf.len();
154        let data_end = offset
155            .checked_add(data_len as u64)
156            .ok_or(Error::OffsetOverflow)?;
157
158        // If the data required is beyond the buffer end, return an error
159        let buffer_start = inner.position;
160        let buffer_end = inner.position + inner.buffer.len() as u64;
161        if data_end > buffer_end {
162            return Err(Error::BlobInsufficientLength);
163        }
164
165        // If the data required is before the buffer start, read directly from the blob
166        if data_end <= buffer_start {
167            return inner.blob.read_at(buf, offset).await;
168        }
169
170        // If the data is entirely within the buffer, read it
171        if offset >= buffer_start {
172            let start = (offset - buffer_start) as usize;
173            if start + data_len > inner.buffer.len() {
174                return Err(Error::BlobInsufficientLength);
175            }
176            buf.put_slice(&inner.buffer[start..start + data_len]);
177            return Ok(buf);
178        }
179
180        // If the data is a combination of blob and buffer, read from both
181        let blob_bytes = (buffer_start - offset) as usize;
182        let blob_part = vec![0u8; blob_bytes];
183        let blob_part = inner.blob.read_at(blob_part, offset).await?;
184        buf.deref_mut()[..blob_bytes].copy_from_slice(&blob_part);
185        let buf_bytes = data_len - blob_bytes;
186        buf.deref_mut()[blob_bytes..].copy_from_slice(&inner.buffer[..buf_bytes]);
187        Ok(buf)
188    }
189
190    async fn write_at<T: StableBuf>(&self, buf: T, offset: u64) -> Result<(), Error> {
191        // Acquire a write lock on the inner state
192        let mut inner = self.inner.write().await;
193
194        // Prepare the buf to be written
195        let data = buf.as_ref();
196        let data_len = data.len();
197
198        // Current state of the buffer in the blob
199        let buffer_start = inner.position;
200        let buffer_end = buffer_start + inner.buffer.len() as u64;
201
202        // Simple append to the current buffered data
203        if offset == buffer_end {
204            return inner.write(buf).await;
205        }
206
207        // Write operation can be merged into the existing buffer if:
208        // a) Write starts at or after the buffer's starting position in the blob.
209        // b) The end of the write, relative to the buffer's start, fits within buffer's capacity.
210        let can_write_into_buffer = offset >= buffer_start
211            && (offset - buffer_start) + data_len as u64 <= inner.capacity as u64;
212        if can_write_into_buffer {
213            let buffer_internal_offset = (offset - buffer_start) as usize;
214            let required_buffer_len = buffer_internal_offset + data_len;
215            if required_buffer_len > inner.buffer.len() {
216                inner.buffer.resize(required_buffer_len, 0u8);
217            }
218            inner.buffer[buffer_internal_offset..required_buffer_len].copy_from_slice(data);
219            return Ok(());
220        }
221
222        // All other cases (e.g., write is before buffer, straddles, or would overflow capacity)
223        if !inner.buffer.is_empty() {
224            inner.flush().await?;
225        }
226        inner.blob.write_at(buf, offset).await?;
227        inner.position = offset + data_len as u64;
228        Ok(())
229    }
230
231    async fn truncate(&self, len: u64) -> Result<(), Error> {
232        // Acquire a write lock on the inner state
233        let mut inner = self.inner.write().await;
234
235        // Prepare the buffer boundaries
236        let buffer_start = inner.position;
237        let buffer_len = inner.buffer.len() as u64;
238        let buffer_end = buffer_start + buffer_len;
239
240        // Adjust buffer content based on `len`
241        if len <= buffer_start {
242            // Truncation point is before or exactly at the start of the buffer.
243            //
244            // All buffered data is now invalid/beyond the new length.
245            inner.buffer.clear();
246            inner.blob.truncate(len).await?;
247            inner.position = len;
248        } else if len < buffer_end {
249            // Truncation point is within the buffer.
250            //
251            // `len` is > `buffer_start_blob_offset` here.
252            // New length of data *within the buffer* is `len - buffer_start_blob_offset`.
253            let new_buffer_actual_len = (len - buffer_start) as usize;
254            inner.buffer.truncate(new_buffer_actual_len);
255        } else {
256            // Truncation point is at or after the end of the buffer, so no changes are needed
257        }
258        Ok(())
259    }
260
261    async fn sync(&self) -> Result<(), Error> {
262        let mut inner = self.inner.write().await;
263        inner.sync().await
264    }
265
266    async fn close(self) -> Result<(), Error> {
267        let mut inner = self.inner.write().await;
268        inner.close().await
269    }
270}