commonware_runtime/utils/buffer/write.rs
1use crate::{buffer::tip::Buffer, Blob, Buf, BufferPool, BufferPooler, Error, IoBufs};
2use commonware_utils::sync::AsyncRwLock;
3use std::{num::NonZeroUsize, sync::Arc};
4
5/// Shared writer state.
6struct State<B: Blob> {
7 /// The underlying blob to write to.
8 blob: B,
9
10 /// Buffered bytes at the logical tip of the blob.
11 buffer: Buffer,
12
13 /// Whether a prior plain mutation must be persisted with [`Blob::sync`].
14 ///
15 /// [`State::write_at_sync`] uses [`Blob::write_at_sync`] only when this is
16 /// false, otherwise it must use [`Blob::sync`] to cover earlier unsynced
17 /// mutations.
18 needs_sync: bool,
19}
20
21impl<B: Blob> State<B> {
22 /// Read bytes from the underlying blob.
23 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
24 Ok(self.blob.read_at(offset, len).await?.freeze())
25 }
26
27 /// Write bytes to the underlying blob and mark them as needing sync.
28 async fn write_at(&mut self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
29 self.blob.write_at(offset, bufs).await?;
30 self.needs_sync = true;
31 Ok(())
32 }
33
34 /// Write bytes to the underlying blob and make them durable.
35 ///
36 /// Uses [`Blob::write_at_sync`] when there are no earlier unsynced
37 /// mutations. Otherwise, writes the bytes and then syncs the blob.
38 async fn write_at_sync(
39 &mut self,
40 offset: u64,
41 bufs: impl Into<IoBufs> + Send,
42 ) -> Result<(), Error> {
43 if self.needs_sync {
44 self.write_at(offset, bufs).await?;
45 self.sync().await
46 } else {
47 // If `write_at_sync` fails, a later sync must not treat the drained
48 // buffer as durable.
49 self.needs_sync = true;
50 self.blob.write_at_sync(offset, bufs).await?;
51 self.needs_sync = false;
52 Ok(())
53 }
54 }
55
56 /// Resize the underlying blob and mark the resize as needing sync.
57 async fn resize(&mut self, len: u64) -> Result<(), Error> {
58 self.blob.resize(len).await?;
59 self.needs_sync = true;
60 Ok(())
61 }
62
63 /// Sync the underlying blob if there are unsynced mutations.
64 async fn sync(&mut self) -> Result<(), Error> {
65 if !self.needs_sync {
66 return Ok(());
67 }
68 self.blob.sync().await?;
69 self.needs_sync = false;
70 Ok(())
71 }
72}
73
74/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
75/// updating data.
76///
77/// # Allocation Semantics
78///
79/// - [Self::new] starts with a detached tip buffer and allocates backing on first buffered write.
80/// - Subsequent writes reuse that backing, copy-on-write allocation only occurs when buffered data
81/// is shared (for example, after handing out immutable views) or a merge needs more capacity.
82/// - Sparse writes merged into tip extend logical length and zero-fill any gap in-buffer.
83/// - Flush paths ([Self::sync], [Self::resize], overlap flushes in [Self::write_at]) hand drained
84/// bytes to the blob and leave the tip detached until the next buffered write.
85///
86/// # Concurrent Access
87///
88/// [Write] owns mutation ordering and durability bookkeeping for the wrapped [Blob]. Cloned
89/// [Write] handles are safe to use concurrently because they share the same state. Raw [Blob]
90/// handles cloned before wrapping observe only flushed data and may not see the latest buffered
91/// writes until [Self::sync], [Self::resize], or an overlapping [Self::write_at] flushes them.
92/// Those raw handles must not be used to write, resize, or otherwise mutate the blob while a
93/// [Write] exists. External mutations bypass the buffer state and [Self::sync] may use
94/// [Blob::write_at_sync], which is not a durability barrier for those external mutations.
95///
96/// # Example
97///
98/// ```
99/// use commonware_runtime::{Runner, BufferPooler, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
100/// use commonware_utils::NZUsize;
101///
102/// let executor = deterministic::Runner::default();
103/// executor.start(|context| async move {
104/// // Open a blob for writing
105/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
106/// assert_eq!(size, 0);
107///
108/// // Create a buffered writer with 16-byte buffer
109/// let mut blob = Write::from_pooler(&context, blob, 0, NZUsize!(16));
110/// blob.write_at(0, b"hello").await.expect("write failed");
111/// blob.sync().await.expect("sync failed");
112///
113/// // Write more data in multiple flushes
114/// blob.write_at(5, b" world").await.expect("write failed");
115/// blob.write_at(11, b"!").await.expect("write failed");
116/// blob.sync().await.expect("sync failed");
117///
118/// // Read back the data to verify
119/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
120/// let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
121/// let buf = reader.read(size as usize).await.expect("read failed");
122/// assert_eq!(buf.coalesce().as_ref(), b"hello world!");
123/// });
124/// ```
125#[derive(Clone)]
126pub struct Write<B: Blob> {
127 /// Shared blob, tip buffer, and durability state.
128 state: Arc<AsyncRwLock<State<B>>>,
129}
130
131impl<B: Blob> Write<B> {
132 /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
133 /// of `blob` with the provided `size`.
134 pub fn new(blob: B, size: u64, capacity: NonZeroUsize, pool: BufferPool) -> Self {
135 Self {
136 state: Arc::new(AsyncRwLock::new(State {
137 blob,
138 buffer: Buffer::new(size, capacity.get(), pool),
139 needs_sync: true, // ensure pending writes on the wrapped blob are synced
140 })),
141 }
142 }
143
144 /// Creates a new [Write], extracting the storage [BufferPool] from a [BufferPooler].
145 pub fn from_pooler(
146 pooler: &impl BufferPooler,
147 blob: B,
148 size: u64,
149 capacity: NonZeroUsize,
150 ) -> Self {
151 Self::new(blob, size, capacity, pooler.storage_buffer_pool().clone())
152 }
153
154 /// Returns the current logical size of the blob including any buffered data.
155 ///
156 /// This represents the total size of data that would be present after flushing.
157 pub async fn size(&self) -> u64 {
158 let state = self.state.read().await;
159 state.buffer.size()
160 }
161
162 /// Read exactly `len` immutable bytes starting at `offset`.
163 pub async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
164 // Ensure the read doesn't overflow.
165 let end_offset = offset
166 .checked_add(len as u64)
167 .ok_or(Error::OffsetOverflow)?;
168
169 // Acquire a read lock on the buffer state.
170 let state = self.state.read().await;
171 let buffer = &state.buffer;
172
173 // If the data required is beyond the size of the blob, return an error.
174 if end_offset > buffer.size() {
175 return Err(Error::BlobInsufficientLength);
176 }
177
178 // Keep the zero-length fast path after the bounds check so offset > size still preserves
179 // the BlobInsufficientLength contract.
180 if len == 0 {
181 return Ok(IoBufs::default());
182 }
183
184 // Entirely in buffered tip.
185 if offset >= buffer.offset {
186 let start = (offset - buffer.offset) as usize;
187 let end = start + len;
188 return Ok(buffer.slice(start..end).into());
189 }
190
191 // Entirely in blob.
192 if end_offset <= buffer.offset {
193 return state.read_at(offset, len).await;
194 }
195
196 // Overlaps blob and buffered tip.
197 let blob_len = (buffer.offset - offset) as usize;
198 let tip_len = len - blob_len;
199 let tip = buffer.slice(..tip_len);
200
201 let mut blob = state.read_at(offset, blob_len).await?;
202 blob.append(tip);
203 Ok(blob)
204 }
205
206 /// Write bytes from `buf` at `offset`.
207 ///
208 /// Data is merged into the in-memory tip buffer when possible, otherwise buffered data may be
209 /// flushed and chunks are written directly to the underlying blob.
210 ///
211 /// Returns [Error::OffsetOverflow] when `offset + bufs.len()` overflows.
212 pub async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
213 let mut bufs = bufs.into();
214
215 // Ensure the write doesn't overflow.
216 offset
217 .checked_add(bufs.remaining() as u64)
218 .ok_or(Error::OffsetOverflow)?;
219
220 // Acquire a write lock on the buffer state.
221 let mut state = self.state.write().await;
222
223 // Process each chunk of the input buffer, attempting to merge into the tip buffer
224 // or writing directly to the underlying blob.
225 let mut current_offset = offset;
226 while bufs.has_remaining() {
227 let chunk = bufs.chunk();
228 let chunk_len = chunk.len();
229
230 // Chunk falls entirely within the buffer's current range and can be merged.
231 if state.buffer.merge(chunk, current_offset) {
232 bufs.advance(chunk_len);
233 current_offset += chunk_len as u64;
234 continue;
235 }
236
237 // Chunk cannot be merged, so flush the buffer if the range overlaps, and check
238 // if merge is possible after.
239 let chunk_end = current_offset + chunk_len as u64;
240 if state.buffer.offset < chunk_end {
241 if let Some((old_buf, old_offset)) = state.buffer.take() {
242 state.write_at(old_offset, old_buf).await?;
243 if state.buffer.merge(chunk, current_offset) {
244 bufs.advance(chunk_len);
245 current_offset += chunk_len as u64;
246 continue;
247 }
248 }
249 }
250
251 // Chunk could not be merged (exceeds buffer capacity or outside its range), so
252 // write directly. Note that we may end up writing an intersecting range twice:
253 // once when the buffer is flushed above, then again when we write the chunk
254 // below. Removing this inefficiency may not be worth the additional complexity.
255 let direct = bufs.split_to(chunk_len);
256 state.write_at(current_offset, direct).await?;
257 current_offset += chunk_len as u64;
258
259 // Maintain the "buffer at tip" invariant by advancing offset to the end of this
260 // write if it extended the underlying blob.
261 state.buffer.offset = state.buffer.offset.max(current_offset);
262 }
263
264 Ok(())
265 }
266
267 /// Resize the logical blob to `len`.
268 ///
269 /// If buffered data exists and the resize extends beyond current size, buffered data is flushed
270 /// before resizing the underlying blob.
271 pub async fn resize(&self, len: u64) -> Result<(), Error> {
272 // Acquire a write lock on the buffer state.
273 let mut state = self.state.write().await;
274
275 // Flush buffered data to the underlying blob.
276 //
277 // This can only happen if the new size is greater than the current size.
278 if let Some((buf, offset)) = state.buffer.resize(len) {
279 state.write_at(offset, buf).await?;
280 }
281
282 // Resize the underlying blob.
283 state.resize(len).await?;
284
285 Ok(())
286 }
287
288 /// Flush buffered bytes and durably sync mutations tracked by this writer.
289 pub async fn sync(&self) -> Result<(), Error> {
290 let mut state = self.state.write().await;
291 if let Some((buf, offset)) = state.buffer.take() {
292 return state.write_at_sync(offset, buf).await;
293 }
294
295 state.sync().await
296 }
297}