commonware_runtime/utils/buffer/write.rs
1use crate::{buffer::tip::Buffer, Blob, Buf, Error, IoBufMut, IoBufs, IoBufsMut, RwLock};
2use std::{num::NonZeroUsize, sync::Arc};
3
4/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
5/// updating data.
6///
7/// # Example
8///
9/// ```
10/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
11/// use commonware_utils::NZUsize;
12///
13/// let executor = deterministic::Runner::default();
14/// executor.start(|context| async move {
15/// // Open a blob for writing
16/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
17/// assert_eq!(size, 0);
18///
19/// // Create a buffered writer with 16-byte buffer
20/// let mut blob = Write::new(blob, 0, NZUsize!(16));
21/// blob.write_at(0, b"hello").await.expect("write failed");
22/// blob.sync().await.expect("sync failed");
23///
24/// // Write more data in multiple flushes
25/// blob.write_at(5, b" world").await.expect("write failed");
26/// blob.write_at(11, b"!").await.expect("write failed");
27/// blob.sync().await.expect("sync failed");
28///
29/// // Read back the data to verify
30/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
31/// let mut reader = Read::new(blob, size, NZUsize!(8));
32/// let mut buf = vec![0u8; size as usize];
33/// reader.read_exact(&mut buf, size as usize).await.expect("read failed");
34/// assert_eq!(&buf, b"hello world!");
35/// });
36/// ```
37#[derive(Clone)]
38pub struct Write<B: Blob> {
39 /// The underlying blob to write to.
40 blob: B,
41
42 /// The buffer containing the data yet to be appended to the tip of the underlying blob.
43 buffer: Arc<RwLock<Buffer>>,
44}
45
46impl<B: Blob> Write<B> {
47 /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
48 /// of `blob` with the provided `size`.
49 ///
50 /// # Panics
51 ///
52 /// Panics if `capacity` is zero.
53 pub fn new(blob: B, size: u64, capacity: NonZeroUsize) -> Self {
54 Self {
55 blob,
56 buffer: Arc::new(RwLock::new(Buffer::new(size, capacity.get()))),
57 }
58 }
59
60 /// Returns the current logical size of the blob including any buffered data.
61 ///
62 /// This represents the total size of data that would be present after flushing.
63 #[allow(clippy::len_without_is_empty)]
64 pub async fn size(&self) -> u64 {
65 let buffer = self.buffer.read().await;
66 buffer.size()
67 }
68}
69
70impl<B: Blob> Blob for Write<B> {
71 async fn read_at(
72 &self,
73 offset: u64,
74 buf: impl Into<IoBufsMut> + Send,
75 ) -> Result<IoBufsMut, Error> {
76 let buf = buf.into();
77 let len = buf.len() as u64;
78
79 // Ensure the read doesn't overflow.
80 let end_offset = offset.checked_add(len).ok_or(Error::OffsetOverflow)?;
81
82 // Acquire a read lock on the buffer.
83 let buffer = self.buffer.read().await;
84
85 // If the data required is beyond the size of the blob, return an error.
86 if end_offset > buffer.size() {
87 return Err(Error::BlobInsufficientLength);
88 }
89
90 match buf {
91 // For single buffers, work directly to avoid copies.
92 IoBufsMut::Single(mut single) => {
93 // Extract any bytes from the buffer that overlap with the requested range.
94 let remaining = buffer.extract(single.as_mut(), offset);
95
96 // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
97 // of the range.
98 if remaining > 0 {
99 let blob_buf = IoBufMut::zeroed(remaining);
100 let blob_result = self.blob.read_at(offset, blob_buf).await?;
101 single.as_mut()[..remaining].copy_from_slice(blob_result.coalesce().as_ref());
102 }
103 Ok(IoBufsMut::Single(single))
104 }
105 // For chunked buffers, read into temp and copy back to preserve structure.
106 IoBufsMut::Chunked(mut chunks) => {
107 let mut temp = vec![0u8; len as usize];
108 // Extract any bytes from the buffer that overlap with the
109 // requested range, into a temporary contiguous buffer
110 let remaining = buffer.extract(&mut temp, offset);
111
112 // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
113 // of the range.
114 if remaining > 0 {
115 let blob_buf = IoBufMut::zeroed(remaining);
116 let blob_result = self.blob.read_at(offset, blob_buf).await?;
117 temp[..remaining].copy_from_slice(blob_result.coalesce().as_ref());
118 }
119 // Copy back to original chunks
120 let mut pos = 0;
121 for chunk in chunks.iter_mut() {
122 let chunk_len = chunk.len();
123 chunk.as_mut().copy_from_slice(&temp[pos..pos + chunk_len]);
124 pos += chunk_len;
125 }
126 Ok(IoBufsMut::Chunked(chunks))
127 }
128 }
129 }
130
131 async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
132 let mut buf = buf.into();
133
134 // Ensure the write doesn't overflow.
135 offset
136 .checked_add(buf.remaining() as u64)
137 .ok_or(Error::OffsetOverflow)?;
138
139 // Acquire a write lock on the buffer.
140 let mut buffer = self.buffer.write().await;
141
142 // Process each chunk of the input buffer, attempting to merge into the tip buffer
143 // or writing directly to the underlying blob.
144 let mut current_offset = offset;
145 while buf.has_remaining() {
146 let chunk = buf.chunk();
147 let chunk_len = chunk.len();
148
149 // Chunk falls entirely within the buffer's current range and can be merged.
150 if buffer.merge(chunk, current_offset) {
151 buf.advance(chunk_len);
152 current_offset += chunk_len as u64;
153 continue;
154 }
155
156 // Chunk cannot be merged, so flush the buffer if the range overlaps, and check
157 // if merge is possible after.
158 let chunk_end = current_offset + chunk_len as u64;
159 if buffer.offset < chunk_end {
160 if let Some((old_buf, old_offset)) = buffer.take() {
161 self.blob.write_at(old_offset, old_buf).await?;
162 if buffer.merge(chunk, current_offset) {
163 buf.advance(chunk_len);
164 current_offset += chunk_len as u64;
165 continue;
166 }
167 }
168 }
169
170 // Chunk could not be merged (exceeds buffer capacity or outside its range), so
171 // write directly. Note that we may end up writing an intersecting range twice:
172 // once when the buffer is flushed above, then again when we write the chunk
173 // below. Removing this inefficiency may not be worth the additional complexity.
174 self.blob.write_at(current_offset, chunk.to_vec()).await?;
175 buf.advance(chunk_len);
176 current_offset += chunk_len as u64;
177
178 // Maintain the "buffer at tip" invariant by advancing offset to the end of this
179 // write if it extended the underlying blob.
180 buffer.offset = buffer.offset.max(current_offset);
181 }
182
183 Ok(())
184 }
185
186 async fn resize(&self, len: u64) -> Result<(), Error> {
187 // Acquire a write lock on the buffer.
188 let mut buffer = self.buffer.write().await;
189
190 // Flush buffered data to the underlying blob.
191 //
192 // This can only happen if the new size is greater than the current size.
193 if let Some((buf, offset)) = buffer.resize(len) {
194 self.blob.write_at(offset, buf).await?;
195 }
196
197 // Resize the underlying blob.
198 self.blob.resize(len).await?;
199
200 Ok(())
201 }
202
203 async fn sync(&self) -> Result<(), Error> {
204 let mut buffer = self.buffer.write().await;
205 if let Some((buf, offset)) = buffer.take() {
206 self.blob.write_at(offset, buf).await?;
207 }
208 self.blob.sync().await
209 }
210}