commonware_runtime/utils/buffer/write.rs
1use crate::{buffer::tip::Buffer, Blob, Buf, BufferPool, BufferPooler, Error, IoBufs};
2use commonware_utils::sync::AsyncRwLock;
3use std::{num::NonZeroUsize, sync::Arc};
4
5/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
6/// updating data.
7///
8/// # Allocation Semantics
9///
10/// - [Self::new] starts with a detached tip buffer and allocates backing on first buffered write.
11/// - Subsequent writes reuse that backing, copy-on-write allocation only occurs when buffered data
12/// is shared (for example, after handing out immutable views) or a merge needs more capacity.
13/// - Sparse writes merged into tip extend logical length and zero-fill any gap in-buffer.
14/// - Flush paths ([Self::sync], [Self::resize], overlap flushes in [Self::write_at]) hand drained
15/// bytes to the blob and leave the tip detached until the next buffered write.
16///
17/// # Example
18///
19/// ```
20/// use commonware_runtime::{Runner, BufferPooler, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
21/// use commonware_utils::NZUsize;
22///
23/// let executor = deterministic::Runner::default();
24/// executor.start(|context| async move {
25/// // Open a blob for writing
26/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
27/// assert_eq!(size, 0);
28///
29/// // Create a buffered writer with 16-byte buffer
30/// let mut blob = Write::from_pooler(&context, blob, 0, NZUsize!(16));
31/// blob.write_at(0, b"hello").await.expect("write failed");
32/// blob.sync().await.expect("sync failed");
33///
34/// // Write more data in multiple flushes
35/// blob.write_at(5, b" world").await.expect("write failed");
36/// blob.write_at(11, b"!").await.expect("write failed");
37/// blob.sync().await.expect("sync failed");
38///
39/// // Read back the data to verify
40/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
41/// let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
42/// let buf = reader.read(size as usize).await.expect("read failed");
43/// assert_eq!(buf.coalesce().as_ref(), b"hello world!");
44/// });
45/// ```
46#[derive(Clone)]
47pub struct Write<B: Blob> {
48 /// The underlying blob to write to.
49 blob: B,
50
51 /// The buffer containing the data yet to be appended to the tip of the underlying blob.
52 buffer: Arc<AsyncRwLock<Buffer>>,
53}
54
55impl<B: Blob> Write<B> {
56 /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
57 /// of `blob` with the provided `size`.
58 pub fn new(blob: B, size: u64, capacity: NonZeroUsize, pool: BufferPool) -> Self {
59 Self {
60 blob,
61 buffer: Arc::new(AsyncRwLock::new(Buffer::new(size, capacity.get(), pool))),
62 }
63 }
64
65 /// Creates a new [Write], extracting the storage [BufferPool] from a [BufferPooler].
66 pub fn from_pooler(
67 pooler: &impl BufferPooler,
68 blob: B,
69 size: u64,
70 capacity: NonZeroUsize,
71 ) -> Self {
72 Self::new(blob, size, capacity, pooler.storage_buffer_pool().clone())
73 }
74
75 /// Returns the current logical size of the blob including any buffered data.
76 ///
77 /// This represents the total size of data that would be present after flushing.
78 #[allow(clippy::len_without_is_empty)]
79 pub async fn size(&self) -> u64 {
80 let buffer = self.buffer.read().await;
81 buffer.size()
82 }
83
84 /// Read exactly `len` immutable bytes starting at `offset`.
85 pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
86 // Ensure the read doesn't overflow.
87 let end_offset = offset
88 .checked_add(len as u64)
89 .ok_or(Error::OffsetOverflow)?;
90
91 // Acquire a read lock on the buffer.
92 let buffer = self.buffer.read().await;
93
94 // If the data required is beyond the size of the blob, return an error.
95 if end_offset > buffer.size() {
96 return Err(Error::BlobInsufficientLength);
97 }
98
99 // Keep the zero-length fast path after the bounds check so offset > size still preserves
100 // the BlobInsufficientLength contract.
101 if len == 0 {
102 return Ok(IoBufs::default());
103 }
104
105 // Entirely in buffered tip.
106 if offset >= buffer.offset {
107 let start = (offset - buffer.offset) as usize;
108 let end = start + len;
109 return Ok(buffer.slice(start..end).into());
110 }
111
112 // Entirely in blob.
113 if end_offset <= buffer.offset {
114 return Ok(self.blob.read_at(offset, len).await?.freeze());
115 }
116
117 // Overlaps blob and buffered tip.
118 let blob_len = (buffer.offset - offset) as usize;
119 let tip_len = len - blob_len;
120 let tip = buffer.slice(..tip_len);
121
122 let mut blob = self.blob.read_at(offset, blob_len).await?.freeze();
123 blob.append(tip);
124 Ok(blob)
125 }
126
127 /// Write bytes from `buf` at `offset`.
128 ///
129 /// Data is merged into the in-memory tip buffer when possible, otherwise buffered data may be
130 /// flushed and chunks are written directly to the underlying blob.
131 ///
132 /// Returns [Error::OffsetOverflow] when `offset + bufs.len()` overflows.
133 pub async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
134 let mut bufs = bufs.into();
135
136 // Ensure the write doesn't overflow.
137 offset
138 .checked_add(bufs.remaining() as u64)
139 .ok_or(Error::OffsetOverflow)?;
140
141 // Acquire a write lock on the buffer.
142 let mut buffer = self.buffer.write().await;
143
144 // Process each chunk of the input buffer, attempting to merge into the tip buffer
145 // or writing directly to the underlying blob.
146 let mut current_offset = offset;
147 while bufs.has_remaining() {
148 let chunk = bufs.chunk();
149 let chunk_len = chunk.len();
150
151 // Chunk falls entirely within the buffer's current range and can be merged.
152 if buffer.merge(chunk, current_offset) {
153 bufs.advance(chunk_len);
154 current_offset += chunk_len as u64;
155 continue;
156 }
157
158 // Chunk cannot be merged, so flush the buffer if the range overlaps, and check
159 // if merge is possible after.
160 let chunk_end = current_offset + chunk_len as u64;
161 if buffer.offset < chunk_end {
162 if let Some((old_buf, old_offset)) = buffer.take() {
163 self.blob.write_at(old_offset, old_buf).await?;
164 if buffer.merge(chunk, current_offset) {
165 bufs.advance(chunk_len);
166 current_offset += chunk_len as u64;
167 continue;
168 }
169 }
170 }
171
172 // Chunk could not be merged (exceeds buffer capacity or outside its range), so
173 // write directly. Note that we may end up writing an intersecting range twice:
174 // once when the buffer is flushed above, then again when we write the chunk
175 // below. Removing this inefficiency may not be worth the additional complexity.
176 let direct = bufs.split_to(chunk_len);
177 self.blob.write_at(current_offset, direct).await?;
178 current_offset += chunk_len as u64;
179
180 // Maintain the "buffer at tip" invariant by advancing offset to the end of this
181 // write if it extended the underlying blob.
182 buffer.offset = buffer.offset.max(current_offset);
183 }
184
185 Ok(())
186 }
187
188 /// Resize the logical blob to `len`.
189 ///
190 /// If buffered data exists and the resize extends beyond current size, buffered data is flushed
191 /// before resizing the underlying blob.
192 pub async fn resize(&self, len: u64) -> Result<(), Error> {
193 // Acquire a write lock on the buffer.
194 let mut buffer = self.buffer.write().await;
195
196 // Flush buffered data to the underlying blob.
197 //
198 // This can only happen if the new size is greater than the current size.
199 if let Some((buf, offset)) = buffer.resize(len) {
200 self.blob.write_at(offset, buf).await?;
201 }
202
203 // Resize the underlying blob.
204 self.blob.resize(len).await?;
205
206 Ok(())
207 }
208
209 /// Flush buffered bytes and durably sync the underlying blob.
210 pub async fn sync(&self) -> Result<(), Error> {
211 let mut buffer = self.buffer.write().await;
212 if let Some((buf, offset)) = buffer.take() {
213 self.blob.write_at(offset, buf).await?;
214 }
215 self.blob.sync().await
216 }
217}