Skip to main content

commonware_runtime/utils/buffer/
write.rs

1use crate::{buffer::tip::Buffer, Blob, Buf, Error, IoBufMut, IoBufs, IoBufsMut, RwLock};
2use std::{num::NonZeroUsize, sync::Arc};
3
4/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
5/// updating data.
6///
7/// # Example
8///
9/// ```
10/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
11/// use commonware_utils::NZUsize;
12///
13/// let executor = deterministic::Runner::default();
14/// executor.start(|context| async move {
15///     // Open a blob for writing
16///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
17///     assert_eq!(size, 0);
18///
19///     // Create a buffered writer with 16-byte buffer
20///     let mut blob = Write::new(blob, 0, NZUsize!(16));
21///     blob.write_at(0, b"hello").await.expect("write failed");
22///     blob.sync().await.expect("sync failed");
23///
24///     // Write more data in multiple flushes
25///     blob.write_at(5, b" world").await.expect("write failed");
26///     blob.write_at(11, b"!").await.expect("write failed");
27///     blob.sync().await.expect("sync failed");
28///
29///     // Read back the data to verify
30///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
31///     let mut reader = Read::new(blob, size, NZUsize!(8));
32///     let mut buf = vec![0u8; size as usize];
33///     reader.read_exact(&mut buf, size as usize).await.expect("read failed");
34///     assert_eq!(&buf, b"hello world!");
35/// });
36/// ```
37#[derive(Clone)]
38pub struct Write<B: Blob> {
39    /// The underlying blob to write to.
40    blob: B,
41
42    /// The buffer containing the data yet to be appended to the tip of the underlying blob.
43    buffer: Arc<RwLock<Buffer>>,
44}
45
46impl<B: Blob> Write<B> {
47    /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
48    /// of `blob` with the provided `size`.
49    ///
50    /// # Panics
51    ///
52    /// Panics if `capacity` is zero.
53    pub fn new(blob: B, size: u64, capacity: NonZeroUsize) -> Self {
54        Self {
55            blob,
56            buffer: Arc::new(RwLock::new(Buffer::new(size, capacity.get()))),
57        }
58    }
59
60    /// Returns the current logical size of the blob including any buffered data.
61    ///
62    /// This represents the total size of data that would be present after flushing.
63    #[allow(clippy::len_without_is_empty)]
64    pub async fn size(&self) -> u64 {
65        let buffer = self.buffer.read().await;
66        buffer.size()
67    }
68}
69
70impl<B: Blob> Blob for Write<B> {
71    async fn read_at(
72        &self,
73        offset: u64,
74        buf: impl Into<IoBufsMut> + Send,
75    ) -> Result<IoBufsMut, Error> {
76        let buf = buf.into();
77        let len = buf.len() as u64;
78
79        // Ensure the read doesn't overflow.
80        let end_offset = offset.checked_add(len).ok_or(Error::OffsetOverflow)?;
81
82        // Acquire a read lock on the buffer.
83        let buffer = self.buffer.read().await;
84
85        // If the data required is beyond the size of the blob, return an error.
86        if end_offset > buffer.size() {
87            return Err(Error::BlobInsufficientLength);
88        }
89
90        match buf {
91            // For single buffers, work directly to avoid copies.
92            IoBufsMut::Single(mut single) => {
93                // Extract any bytes from the buffer that overlap with the requested range.
94                let remaining = buffer.extract(single.as_mut(), offset);
95
96                // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
97                // of the range.
98                if remaining > 0 {
99                    let blob_buf = IoBufMut::zeroed(remaining);
100                    let blob_result = self.blob.read_at(offset, blob_buf).await?;
101                    single.as_mut()[..remaining].copy_from_slice(blob_result.coalesce().as_ref());
102                }
103                Ok(IoBufsMut::Single(single))
104            }
105            // For chunked buffers, read into temp and copy back to preserve structure.
106            IoBufsMut::Chunked(mut chunks) => {
107                let mut temp = vec![0u8; len as usize];
108                // Extract any bytes from the buffer that overlap with the
109                // requested range, into a temporary contiguous buffer
110                let remaining = buffer.extract(&mut temp, offset);
111
112                // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
113                // of the range.
114                if remaining > 0 {
115                    let blob_buf = IoBufMut::zeroed(remaining);
116                    let blob_result = self.blob.read_at(offset, blob_buf).await?;
117                    temp[..remaining].copy_from_slice(blob_result.coalesce().as_ref());
118                }
119                // Copy back to original chunks
120                let mut pos = 0;
121                for chunk in chunks.iter_mut() {
122                    let chunk_len = chunk.len();
123                    chunk.as_mut().copy_from_slice(&temp[pos..pos + chunk_len]);
124                    pos += chunk_len;
125                }
126                Ok(IoBufsMut::Chunked(chunks))
127            }
128        }
129    }
130
131    async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
132        let mut buf = buf.into();
133
134        // Ensure the write doesn't overflow.
135        offset
136            .checked_add(buf.remaining() as u64)
137            .ok_or(Error::OffsetOverflow)?;
138
139        // Acquire a write lock on the buffer.
140        let mut buffer = self.buffer.write().await;
141
142        // Process each chunk of the input buffer, attempting to merge into the tip buffer
143        // or writing directly to the underlying blob.
144        let mut current_offset = offset;
145        while buf.has_remaining() {
146            let chunk = buf.chunk();
147            let chunk_len = chunk.len();
148
149            // Chunk falls entirely within the buffer's current range and can be merged.
150            if buffer.merge(chunk, current_offset) {
151                buf.advance(chunk_len);
152                current_offset += chunk_len as u64;
153                continue;
154            }
155
156            // Chunk cannot be merged, so flush the buffer if the range overlaps, and check
157            // if merge is possible after.
158            let chunk_end = current_offset + chunk_len as u64;
159            if buffer.offset < chunk_end {
160                if let Some((old_buf, old_offset)) = buffer.take() {
161                    self.blob.write_at(old_offset, old_buf).await?;
162                    if buffer.merge(chunk, current_offset) {
163                        buf.advance(chunk_len);
164                        current_offset += chunk_len as u64;
165                        continue;
166                    }
167                }
168            }
169
170            // Chunk could not be merged (exceeds buffer capacity or outside its range), so
171            // write directly. Note that we may end up writing an intersecting range twice:
172            // once when the buffer is flushed above, then again when we write the chunk
173            // below. Removing this inefficiency may not be worth the additional complexity.
174            self.blob.write_at(current_offset, chunk.to_vec()).await?;
175            buf.advance(chunk_len);
176            current_offset += chunk_len as u64;
177
178            // Maintain the "buffer at tip" invariant by advancing offset to the end of this
179            // write if it extended the underlying blob.
180            buffer.offset = buffer.offset.max(current_offset);
181        }
182
183        Ok(())
184    }
185
186    async fn resize(&self, len: u64) -> Result<(), Error> {
187        // Acquire a write lock on the buffer.
188        let mut buffer = self.buffer.write().await;
189
190        // Flush buffered data to the underlying blob.
191        //
192        // This can only happen if the new size is greater than the current size.
193        if let Some((buf, offset)) = buffer.resize(len) {
194            self.blob.write_at(offset, buf).await?;
195        }
196
197        // Resize the underlying blob.
198        self.blob.resize(len).await?;
199
200        Ok(())
201    }
202
203    async fn sync(&self) -> Result<(), Error> {
204        let mut buffer = self.buffer.write().await;
205        if let Some((buf, offset)) = buffer.take() {
206            self.blob.write_at(offset, buf).await?;
207        }
208        self.blob.sync().await
209    }
210}