commonware_runtime/utils/buffer/write.rs
1use crate::{Blob, Error, RwLock};
2use commonware_utils::{StableBuf, StableBufMut};
3use std::sync::Arc;
4
5/// The internal state of a [Write] buffer.
6struct Inner<B: Blob> {
7 /// The underlying blob to write to.
8 blob: B,
9 /// The buffer storing data to be written to the blob.
10 buffer: Vec<u8>,
11 /// The offset in the blob where the data in `buffer` starts.
12 /// If `buffer` is empty, this is the position where the next appended byte would conceptually begin in the blob.
13 position: u64,
14 /// The maximum size of the buffer.
15 capacity: usize,
16}
17
18impl<B: Blob> Inner<B> {
19 /// Appends bytes to the internal buffer. If the buffer capacity is exceeded, it will be flushed to the
20 /// underlying [Blob].
21 ///
22 /// If the size of the provided bytes is larger than the buffer capacity, the bytes will be written
23 /// directly to the underlying [Blob]. If this occurs regularly, the buffer capacity should be increased (as
24 /// the buffer will not provide any benefit).
25 ///
26 /// Returns an error if the write to the underlying [Blob] fails (may be due to a `flush` of data not
27 /// related to the data being written).
28 async fn write<Buf: StableBuf>(&mut self, buf: Buf) -> Result<(), Error> {
29 // If the buffer capacity will be exceeded, flush the buffer first
30 let buf_len = buf.len();
31 if self.buffer.len() + buf_len > self.capacity {
32 self.flush().await?;
33 }
34
35 // Write directly to the blob (if the buffer is too small)
36 if buf_len > self.capacity {
37 self.blob.write_at(buf, self.position).await?;
38 self.position += buf_len as u64;
39 return Ok(());
40 }
41
42 // Append to the buffer
43 self.buffer.extend_from_slice(buf.as_ref());
44 Ok(())
45 }
46
47 /// Flushes buffered data to the underlying [Blob]. Does nothing if the buffer is empty.
48 ///
49 /// If the write to the underlying [Blob] fails, the buffer will be reset (and any pending data not yet
50 /// written will be lost).
51 async fn flush(&mut self) -> Result<(), Error> {
52 // If the buffer is empty, do nothing
53 if self.buffer.is_empty() {
54 return Ok(());
55 }
56
57 // Take the buffer and write it to the blob
58 let buf = std::mem::take(&mut self.buffer);
59 let len = buf.len() as u64;
60 self.blob.write_at(buf, self.position).await?;
61
62 // If successful, update the position and allocate a new buffer
63 self.position += len;
64 self.buffer = Vec::with_capacity(self.capacity);
65 Ok(())
66 }
67
68 /// Flushes buffered data and ensures it is durably persisted to the underlying [Blob].
69 ///
70 /// Returns an error if either the flush or sync operation fails.
71 async fn sync(&mut self) -> Result<(), Error> {
72 self.flush().await?;
73 self.blob.sync().await
74 }
75
76 /// Closes the writer and ensures all buffered data is durably persisted to the underlying [Blob].
77 ///
78 /// Returns an error if the close operation fails.
79 async fn close(&mut self) -> Result<(), Error> {
80 // Ensure all buffered data is durably persisted
81 self.sync().await?;
82
83 // Close the underlying blob.
84 //
85 // We use clone here to ensure we retain the close semantics of the blob provided (if
86 // called multiple times, the blob determines whether to error).
87 self.blob.clone().close().await
88 }
89}
90
91/// A writer that buffers content to a [Blob] to optimize the performance
92/// of appending or updating data.
93///
94/// # Example
95///
96/// ```
97/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
98///
99/// let executor = deterministic::Runner::default();
100/// executor.start(|context| async move {
101/// // Open a blob for writing
102/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
103/// assert_eq!(size, 0);
104///
105/// // Create a buffered writer with 16-byte buffer
106/// let mut blob = Write::new(blob, 0, 16);
107/// blob.write_at("hello".as_bytes(), 0).await.expect("write failed");
108/// blob.sync().await.expect("sync failed");
109///
110/// // Write more data in multiple flushes
111/// blob.write_at(" world".as_bytes(), 5).await.expect("write failed");
112/// blob.write_at("!".as_bytes(), 11).await.expect("write failed");
113/// blob.sync().await.expect("sync failed");
114///
115/// // Read back the data to verify
116/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
117/// let mut reader = Read::new(blob, size, 8);
118/// let mut buf = vec![0u8; size as usize];
119/// reader.read_exact(&mut buf, size as usize).await.expect("read failed");
120/// assert_eq!(&buf, b"hello world!");
121/// });
122/// ```
123#[derive(Clone)]
124pub struct Write<B: Blob> {
125 inner: Arc<RwLock<Inner<B>>>,
126}
127
128impl<B: Blob> Write<B> {
129 /// Creates a new `Write` that writes to the given blob starting at `position` with the specified buffer capacity.
130 ///
131 /// # Panics
132 ///
133 /// Panics if `capacity` is zero.
134 pub fn new(blob: B, position: u64, capacity: usize) -> Self {
135 assert!(capacity > 0, "buffer capacity must be greater than zero");
136 Self {
137 inner: Arc::new(RwLock::new(Inner {
138 blob,
139 buffer: Vec::with_capacity(capacity),
140 position,
141 capacity,
142 })),
143 }
144 }
145}
146
147impl<B: Blob> Blob for Write<B> {
148 async fn read_at<T: StableBufMut>(&self, mut buf: T, offset: u64) -> Result<T, Error> {
149 // Acquire a read lock on the inner state
150 let inner = self.inner.read().await;
151
152 // Ensure offset read doesn't overflow
153 let data_len = buf.len();
154 let data_end = offset
155 .checked_add(data_len as u64)
156 .ok_or(Error::OffsetOverflow)?;
157
158 // If the data required is beyond the buffer end, return an error
159 let buffer_start = inner.position;
160 let buffer_end = inner.position + inner.buffer.len() as u64;
161 if data_end > buffer_end {
162 return Err(Error::BlobInsufficientLength);
163 }
164
165 // If the data required is before the buffer start, read directly from the blob
166 if data_end <= buffer_start {
167 return inner.blob.read_at(buf, offset).await;
168 }
169
170 // If the data is entirely within the buffer, read it
171 if offset >= buffer_start {
172 let start = (offset - buffer_start) as usize;
173 if start + data_len > inner.buffer.len() {
174 return Err(Error::BlobInsufficientLength);
175 }
176 buf.put_slice(&inner.buffer[start..start + data_len]);
177 return Ok(buf);
178 }
179
180 // If the data is a combination of blob and buffer, read from both
181 let blob_bytes = (buffer_start - offset) as usize;
182 let blob_part = vec![0u8; blob_bytes];
183 let blob_part = inner.blob.read_at(blob_part, offset).await?;
184 buf.deref_mut()[..blob_bytes].copy_from_slice(&blob_part);
185 let buf_bytes = data_len - blob_bytes;
186 buf.deref_mut()[blob_bytes..].copy_from_slice(&inner.buffer[..buf_bytes]);
187 Ok(buf)
188 }
189
190 async fn write_at<T: StableBuf>(&self, buf: T, offset: u64) -> Result<(), Error> {
191 // Acquire a write lock on the inner state
192 let mut inner = self.inner.write().await;
193
194 // Prepare the buf to be written
195 let data = buf.as_ref();
196 let data_len = data.len();
197
198 // Current state of the buffer in the blob
199 let buffer_start = inner.position;
200 let buffer_end = buffer_start + inner.buffer.len() as u64;
201
202 // Simple append to the current buffered data
203 if offset == buffer_end {
204 return inner.write(buf).await;
205 }
206
207 // Write operation can be merged into the existing buffer if:
208 // a) Write starts at or after the buffer's starting position in the blob.
209 // b) The end of the write, relative to the buffer's start, fits within buffer's capacity.
210 let can_write_into_buffer = offset >= buffer_start
211 && (offset - buffer_start) + data_len as u64 <= inner.capacity as u64;
212 if can_write_into_buffer {
213 let buffer_internal_offset = (offset - buffer_start) as usize;
214 let required_buffer_len = buffer_internal_offset + data_len;
215 if required_buffer_len > inner.buffer.len() {
216 inner.buffer.resize(required_buffer_len, 0u8);
217 }
218 inner.buffer[buffer_internal_offset..required_buffer_len].copy_from_slice(data);
219 return Ok(());
220 }
221
222 // All other cases (e.g., write is before buffer, straddles, or would overflow capacity)
223 if !inner.buffer.is_empty() {
224 inner.flush().await?;
225 }
226 inner.blob.write_at(buf, offset).await?;
227 inner.position = offset + data_len as u64;
228 Ok(())
229 }
230
231 async fn truncate(&self, len: u64) -> Result<(), Error> {
232 // Acquire a write lock on the inner state
233 let mut inner = self.inner.write().await;
234
235 // Prepare the buffer boundaries
236 let buffer_start = inner.position;
237 let buffer_len = inner.buffer.len() as u64;
238 let buffer_end = buffer_start + buffer_len;
239
240 // Adjust buffer content based on `len`
241 if len <= buffer_start {
242 // Truncation point is before or exactly at the start of the buffer.
243 //
244 // All buffered data is now invalid/beyond the new length.
245 inner.buffer.clear();
246 inner.blob.truncate(len).await?;
247 inner.position = len;
248 } else if len < buffer_end {
249 // Truncation point is within the buffer.
250 //
251 // `len` is > `buffer_start_blob_offset` here.
252 // New length of data *within the buffer* is `len - buffer_start_blob_offset`.
253 let new_buffer_actual_len = (len - buffer_start) as usize;
254 inner.buffer.truncate(new_buffer_actual_len);
255 } else {
256 // Truncation point is at or after the end of the buffer, so no changes are needed
257 }
258 Ok(())
259 }
260
261 async fn sync(&self) -> Result<(), Error> {
262 let mut inner = self.inner.write().await;
263 inner.sync().await
264 }
265
266 async fn close(self) -> Result<(), Error> {
267 let mut inner = self.inner.write().await;
268 inner.close().await
269 }
270}