Skip to main content

commonware_runtime/utils/buffer/
write.rs

1use crate::{buffer::tip::Buffer, Blob, Buf, BufferPool, BufferPooler, Error, IoBufs};
2use commonware_utils::sync::AsyncRwLock;
3use std::{num::NonZeroUsize, sync::Arc};
4
5/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
6/// updating data.
7///
8/// # Allocation Semantics
9///
10/// - [Self::new] starts with a detached tip buffer and allocates backing on first buffered write.
11/// - Subsequent writes reuse that backing, copy-on-write allocation only occurs when buffered data
12///   is shared (for example, after handing out immutable views) or a merge needs more capacity.
13/// - Sparse writes merged into tip extend logical length and zero-fill any gap in-buffer.
14/// - Flush paths ([Self::sync], [Self::resize], overlap flushes in [Self::write_at]) hand drained
15///   bytes to the blob and leave the tip detached until the next buffered write.
16///
17/// # Example
18///
19/// ```
20/// use commonware_runtime::{Runner, BufferPooler, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
21/// use commonware_utils::NZUsize;
22///
23/// let executor = deterministic::Runner::default();
24/// executor.start(|context| async move {
25///     // Open a blob for writing
26///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
27///     assert_eq!(size, 0);
28///
29///     // Create a buffered writer with 16-byte buffer
30///     let mut blob = Write::from_pooler(&context, blob, 0, NZUsize!(16));
31///     blob.write_at(0, b"hello").await.expect("write failed");
32///     blob.sync().await.expect("sync failed");
33///
34///     // Write more data in multiple flushes
35///     blob.write_at(5, b" world").await.expect("write failed");
36///     blob.write_at(11, b"!").await.expect("write failed");
37///     blob.sync().await.expect("sync failed");
38///
39///     // Read back the data to verify
40///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
41///     let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
42///     let buf = reader.read(size as usize).await.expect("read failed");
43///     assert_eq!(buf.coalesce().as_ref(), b"hello world!");
44/// });
45/// ```
46#[derive(Clone)]
47pub struct Write<B: Blob> {
48    /// The underlying blob to write to.
49    blob: B,
50
51    /// The buffer containing the data yet to be appended to the tip of the underlying blob.
52    buffer: Arc<AsyncRwLock<Buffer>>,
53}
54
55impl<B: Blob> Write<B> {
56    /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
57    /// of `blob` with the provided `size`.
58    pub fn new(blob: B, size: u64, capacity: NonZeroUsize, pool: BufferPool) -> Self {
59        Self {
60            blob,
61            buffer: Arc::new(AsyncRwLock::new(Buffer::new(size, capacity.get(), pool))),
62        }
63    }
64
65    /// Creates a new [Write], extracting the storage [BufferPool] from a [BufferPooler].
66    pub fn from_pooler(
67        pooler: &impl BufferPooler,
68        blob: B,
69        size: u64,
70        capacity: NonZeroUsize,
71    ) -> Self {
72        Self::new(blob, size, capacity, pooler.storage_buffer_pool().clone())
73    }
74
75    /// Returns the current logical size of the blob including any buffered data.
76    ///
77    /// This represents the total size of data that would be present after flushing.
78    #[allow(clippy::len_without_is_empty)]
79    pub async fn size(&self) -> u64 {
80        let buffer = self.buffer.read().await;
81        buffer.size()
82    }
83
84    /// Read exactly `len` immutable bytes starting at `offset`.
85    pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
86        // Ensure the read doesn't overflow.
87        let end_offset = offset
88            .checked_add(len as u64)
89            .ok_or(Error::OffsetOverflow)?;
90
91        // Acquire a read lock on the buffer.
92        let buffer = self.buffer.read().await;
93
94        // If the data required is beyond the size of the blob, return an error.
95        if end_offset > buffer.size() {
96            return Err(Error::BlobInsufficientLength);
97        }
98
99        // Keep the zero-length fast path after the bounds check so offset > size still preserves
100        // the BlobInsufficientLength contract.
101        if len == 0 {
102            return Ok(IoBufs::default());
103        }
104
105        // Entirely in buffered tip.
106        if offset >= buffer.offset {
107            let start = (offset - buffer.offset) as usize;
108            let end = start + len;
109            return Ok(buffer.slice(start..end).into());
110        }
111
112        // Entirely in blob.
113        if end_offset <= buffer.offset {
114            return Ok(self.blob.read_at(offset, len).await?.freeze());
115        }
116
117        // Overlaps blob and buffered tip.
118        let blob_len = (buffer.offset - offset) as usize;
119        let tip_len = len - blob_len;
120        let tip = buffer.slice(..tip_len);
121
122        let mut blob = self.blob.read_at(offset, blob_len).await?.freeze();
123        blob.append(tip);
124        Ok(blob)
125    }
126
127    /// Write bytes from `buf` at `offset`.
128    ///
129    /// Data is merged into the in-memory tip buffer when possible, otherwise buffered data may be
130    /// flushed and chunks are written directly to the underlying blob.
131    ///
132    /// Returns [Error::OffsetOverflow] when `offset + bufs.len()` overflows.
133    pub async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
134        let mut bufs = bufs.into();
135
136        // Ensure the write doesn't overflow.
137        offset
138            .checked_add(bufs.remaining() as u64)
139            .ok_or(Error::OffsetOverflow)?;
140
141        // Acquire a write lock on the buffer.
142        let mut buffer = self.buffer.write().await;
143
144        // Process each chunk of the input buffer, attempting to merge into the tip buffer
145        // or writing directly to the underlying blob.
146        let mut current_offset = offset;
147        while bufs.has_remaining() {
148            let chunk = bufs.chunk();
149            let chunk_len = chunk.len();
150
151            // Chunk falls entirely within the buffer's current range and can be merged.
152            if buffer.merge(chunk, current_offset) {
153                bufs.advance(chunk_len);
154                current_offset += chunk_len as u64;
155                continue;
156            }
157
158            // Chunk cannot be merged, so flush the buffer if the range overlaps, and check
159            // if merge is possible after.
160            let chunk_end = current_offset + chunk_len as u64;
161            if buffer.offset < chunk_end {
162                if let Some((old_buf, old_offset)) = buffer.take() {
163                    self.blob.write_at(old_offset, old_buf).await?;
164                    if buffer.merge(chunk, current_offset) {
165                        bufs.advance(chunk_len);
166                        current_offset += chunk_len as u64;
167                        continue;
168                    }
169                }
170            }
171
172            // Chunk could not be merged (exceeds buffer capacity or outside its range), so
173            // write directly. Note that we may end up writing an intersecting range twice:
174            // once when the buffer is flushed above, then again when we write the chunk
175            // below. Removing this inefficiency may not be worth the additional complexity.
176            let direct = bufs.split_to(chunk_len);
177            self.blob.write_at(current_offset, direct).await?;
178            current_offset += chunk_len as u64;
179
180            // Maintain the "buffer at tip" invariant by advancing offset to the end of this
181            // write if it extended the underlying blob.
182            buffer.offset = buffer.offset.max(current_offset);
183        }
184
185        Ok(())
186    }
187
188    /// Resize the logical blob to `len`.
189    ///
190    /// If buffered data exists and the resize extends beyond current size, buffered data is flushed
191    /// before resizing the underlying blob.
192    pub async fn resize(&self, len: u64) -> Result<(), Error> {
193        // Acquire a write lock on the buffer.
194        let mut buffer = self.buffer.write().await;
195
196        // Flush buffered data to the underlying blob.
197        //
198        // This can only happen if the new size is greater than the current size.
199        if let Some((buf, offset)) = buffer.resize(len) {
200            self.blob.write_at(offset, buf).await?;
201        }
202
203        // Resize the underlying blob.
204        self.blob.resize(len).await?;
205
206        Ok(())
207    }
208
209    /// Flush buffered bytes and durably sync the underlying blob.
210    pub async fn sync(&self) -> Result<(), Error> {
211        let mut buffer = self.buffer.write().await;
212        if let Some((buf, offset)) = buffer.take() {
213            self.blob.write_at(offset, buf).await?;
214        }
215        self.blob.sync().await
216    }
217}