Skip to main content

commonware_runtime/utils/buffer/
write.rs

1use crate::{buffer::tip::Buffer, Blob, Buf, BufferPool, BufferPooler, Error, IoBufs};
2use commonware_utils::sync::AsyncRwLock;
3use std::{num::NonZeroUsize, sync::Arc};
4
5/// Shared writer state.
6struct State<B: Blob> {
7    /// The underlying blob to write to.
8    blob: B,
9
10    /// Buffered bytes at the logical tip of the blob.
11    buffer: Buffer,
12
13    /// Whether a prior plain mutation must be persisted with [`Blob::sync`].
14    ///
15    /// [`State::write_at_sync`] uses [`Blob::write_at_sync`] only when this is
16    /// false, otherwise it must use [`Blob::sync`] to cover earlier unsynced
17    /// mutations.
18    needs_sync: bool,
19}
20
21impl<B: Blob> State<B> {
22    /// Read bytes from the underlying blob.
23    async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
24        Ok(self.blob.read_at(offset, len).await?.freeze())
25    }
26
27    /// Write bytes to the underlying blob and mark them as needing sync.
28    async fn write_at(&mut self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
29        self.blob.write_at(offset, bufs).await?;
30        self.needs_sync = true;
31        Ok(())
32    }
33
34    /// Write bytes to the underlying blob and make them durable.
35    ///
36    /// Uses [`Blob::write_at_sync`] when there are no earlier unsynced
37    /// mutations. Otherwise, writes the bytes and then syncs the blob.
38    async fn write_at_sync(
39        &mut self,
40        offset: u64,
41        bufs: impl Into<IoBufs> + Send,
42    ) -> Result<(), Error> {
43        if self.needs_sync {
44            self.write_at(offset, bufs).await?;
45            self.sync().await
46        } else {
47            // If `write_at_sync` fails, a later sync must not treat the drained
48            // buffer as durable.
49            self.needs_sync = true;
50            self.blob.write_at_sync(offset, bufs).await?;
51            self.needs_sync = false;
52            Ok(())
53        }
54    }
55
56    /// Resize the underlying blob and mark the resize as needing sync.
57    async fn resize(&mut self, len: u64) -> Result<(), Error> {
58        self.blob.resize(len).await?;
59        self.needs_sync = true;
60        Ok(())
61    }
62
63    /// Sync the underlying blob if there are unsynced mutations.
64    async fn sync(&mut self) -> Result<(), Error> {
65        if !self.needs_sync {
66            return Ok(());
67        }
68        self.blob.sync().await?;
69        self.needs_sync = false;
70        Ok(())
71    }
72}
73
74/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
75/// updating data.
76///
77/// # Allocation Semantics
78///
79/// - [Self::new] starts with a detached tip buffer and allocates backing on first buffered write.
80/// - Subsequent writes reuse that backing, copy-on-write allocation only occurs when buffered data
81///   is shared (for example, after handing out immutable views) or a merge needs more capacity.
82/// - Sparse writes merged into tip extend logical length and zero-fill any gap in-buffer.
83/// - Flush paths ([Self::sync], [Self::resize], overlap flushes in [Self::write_at]) hand drained
84///   bytes to the blob and leave the tip detached until the next buffered write.
85///
86/// # Concurrent Access
87///
88/// [Write] owns mutation ordering and durability bookkeeping for the wrapped [Blob]. Cloned
89/// [Write] handles are safe to use concurrently because they share the same state. Raw [Blob]
90/// handles cloned before wrapping observe only flushed data and may not see the latest buffered
91/// writes until [Self::sync], [Self::resize], or an overlapping [Self::write_at] flushes them.
92/// Those raw handles must not be used to write, resize, or otherwise mutate the blob while a
93/// [Write] exists. External mutations bypass the buffer state and [Self::sync] may use
94/// [Blob::write_at_sync], which is not a durability barrier for those external mutations.
95///
96/// # Example
97///
98/// ```
99/// use commonware_runtime::{Runner, BufferPooler, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
100/// use commonware_utils::NZUsize;
101///
102/// let executor = deterministic::Runner::default();
103/// executor.start(|context| async move {
104///     // Open a blob for writing
105///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
106///     assert_eq!(size, 0);
107///
108///     // Create a buffered writer with 16-byte buffer
109///     let mut blob = Write::from_pooler(&context, blob, 0, NZUsize!(16));
110///     blob.write_at(0, b"hello").await.expect("write failed");
111///     blob.sync().await.expect("sync failed");
112///
113///     // Write more data in multiple flushes
114///     blob.write_at(5, b" world").await.expect("write failed");
115///     blob.write_at(11, b"!").await.expect("write failed");
116///     blob.sync().await.expect("sync failed");
117///
118///     // Read back the data to verify
119///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
120///     let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
121///     let buf = reader.read(size as usize).await.expect("read failed");
122///     assert_eq!(buf.coalesce().as_ref(), b"hello world!");
123/// });
124/// ```
125#[derive(Clone)]
126pub struct Write<B: Blob> {
127    /// Shared blob, tip buffer, and durability state.
128    state: Arc<AsyncRwLock<State<B>>>,
129}
130
131impl<B: Blob> Write<B> {
132    /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
133    /// of `blob` with the provided `size`.
134    pub fn new(blob: B, size: u64, capacity: NonZeroUsize, pool: BufferPool) -> Self {
135        Self {
136            state: Arc::new(AsyncRwLock::new(State {
137                blob,
138                buffer: Buffer::new(size, capacity.get(), pool),
139                needs_sync: true, // ensure pending writes on the wrapped blob are synced
140            })),
141        }
142    }
143
144    /// Creates a new [Write], extracting the storage [BufferPool] from a [BufferPooler].
145    pub fn from_pooler(
146        pooler: &impl BufferPooler,
147        blob: B,
148        size: u64,
149        capacity: NonZeroUsize,
150    ) -> Self {
151        Self::new(blob, size, capacity, pooler.storage_buffer_pool().clone())
152    }
153
154    /// Returns the current logical size of the blob including any buffered data.
155    ///
156    /// This represents the total size of data that would be present after flushing.
157    pub async fn size(&self) -> u64 {
158        let state = self.state.read().await;
159        state.buffer.size()
160    }
161
162    /// Read exactly `len` immutable bytes starting at `offset`.
163    pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
164        // Ensure the read doesn't overflow.
165        let end_offset = offset
166            .checked_add(len as u64)
167            .ok_or(Error::OffsetOverflow)?;
168
169        // Acquire a read lock on the buffer state.
170        let state = self.state.read().await;
171        let buffer = &state.buffer;
172
173        // If the data required is beyond the size of the blob, return an error.
174        if end_offset > buffer.size() {
175            return Err(Error::BlobInsufficientLength);
176        }
177
178        // Keep the zero-length fast path after the bounds check so offset > size still preserves
179        // the BlobInsufficientLength contract.
180        if len == 0 {
181            return Ok(IoBufs::default());
182        }
183
184        // Entirely in buffered tip.
185        if offset >= buffer.offset {
186            let start = (offset - buffer.offset) as usize;
187            let end = start + len;
188            return Ok(buffer.slice(start..end).into());
189        }
190
191        // Entirely in blob.
192        if end_offset <= buffer.offset {
193            return state.read_at(offset, len).await;
194        }
195
196        // Overlaps blob and buffered tip.
197        let blob_len = (buffer.offset - offset) as usize;
198        let tip_len = len - blob_len;
199        let tip = buffer.slice(..tip_len);
200
201        let mut blob = state.read_at(offset, blob_len).await?;
202        blob.append(tip);
203        Ok(blob)
204    }
205
206    /// Write bytes from `buf` at `offset`.
207    ///
208    /// Data is merged into the in-memory tip buffer when possible, otherwise buffered data may be
209    /// flushed and chunks are written directly to the underlying blob.
210    ///
211    /// Returns [Error::OffsetOverflow] when `offset + bufs.len()` overflows.
212    pub async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
213        let mut bufs = bufs.into();
214
215        // Ensure the write doesn't overflow.
216        offset
217            .checked_add(bufs.remaining() as u64)
218            .ok_or(Error::OffsetOverflow)?;
219
220        // Acquire a write lock on the buffer state.
221        let mut state = self.state.write().await;
222
223        // Process each chunk of the input buffer, attempting to merge into the tip buffer
224        // or writing directly to the underlying blob.
225        let mut current_offset = offset;
226        while bufs.has_remaining() {
227            let chunk = bufs.chunk();
228            let chunk_len = chunk.len();
229
230            // Chunk falls entirely within the buffer's current range and can be merged.
231            if state.buffer.merge(chunk, current_offset) {
232                bufs.advance(chunk_len);
233                current_offset += chunk_len as u64;
234                continue;
235            }
236
237            // Chunk cannot be merged, so flush the buffer if the range overlaps, and check
238            // if merge is possible after.
239            let chunk_end = current_offset + chunk_len as u64;
240            if state.buffer.offset < chunk_end {
241                if let Some((old_buf, old_offset)) = state.buffer.take() {
242                    state.write_at(old_offset, old_buf).await?;
243                    if state.buffer.merge(chunk, current_offset) {
244                        bufs.advance(chunk_len);
245                        current_offset += chunk_len as u64;
246                        continue;
247                    }
248                }
249            }
250
251            // Chunk could not be merged (exceeds buffer capacity or outside its range), so
252            // write directly. Note that we may end up writing an intersecting range twice:
253            // once when the buffer is flushed above, then again when we write the chunk
254            // below. Removing this inefficiency may not be worth the additional complexity.
255            let direct = bufs.split_to(chunk_len);
256            state.write_at(current_offset, direct).await?;
257            current_offset += chunk_len as u64;
258
259            // Maintain the "buffer at tip" invariant by advancing offset to the end of this
260            // write if it extended the underlying blob.
261            state.buffer.offset = state.buffer.offset.max(current_offset);
262        }
263
264        Ok(())
265    }
266
267    /// Resize the logical blob to `len`.
268    ///
269    /// If buffered data exists and the resize extends beyond current size, buffered data is flushed
270    /// before resizing the underlying blob.
271    pub async fn resize(&self, len: u64) -> Result<(), Error> {
272        // Acquire a write lock on the buffer state.
273        let mut state = self.state.write().await;
274
275        // Flush buffered data to the underlying blob.
276        //
277        // This can only happen if the new size is greater than the current size.
278        if let Some((buf, offset)) = state.buffer.resize(len) {
279            state.write_at(offset, buf).await?;
280        }
281
282        // Resize the underlying blob.
283        state.resize(len).await?;
284
285        Ok(())
286    }
287
288    /// Flush buffered bytes and durably sync mutations tracked by this writer.
289    pub async fn sync(&self) -> Result<(), Error> {
290        let mut state = self.state.write().await;
291        if let Some((buf, offset)) = state.buffer.take() {
292            return state.write_at_sync(offset, buf).await;
293        }
294
295        state.sync().await
296    }
297}