commonware_runtime/utils/buffer/
write.rs

1use crate::{buffer::tip::Buffer, Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use std::{num::NonZeroUsize, sync::Arc};
4
5/// A writer that buffers content to a [Blob] to optimize the performance
6/// of appending or updating data.
7///
8/// # Example
9///
10/// ```
11/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
12/// use commonware_utils::NZUsize;
13///
14/// let executor = deterministic::Runner::default();
15/// executor.start(|context| async move {
16///     // Open a blob for writing
17///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
18///     assert_eq!(size, 0);
19///
20///     // Create a buffered writer with 16-byte buffer
21///     let mut blob = Write::new(blob, 0, NZUsize!(16));
22///     blob.write_at(b"hello".to_vec(), 0).await.expect("write failed");
23///     blob.sync().await.expect("sync failed");
24///
25///     // Write more data in multiple flushes
26///     blob.write_at(b" world".to_vec(), 5).await.expect("write failed");
27///     blob.write_at(b"!".to_vec(), 11).await.expect("write failed");
28///     blob.sync().await.expect("sync failed");
29///
30///     // Read back the data to verify
31///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
32///     let mut reader = Read::new(blob, size, NZUsize!(8));
33///     let mut buf = vec![0u8; size as usize];
34///     reader.read_exact(&mut buf, size as usize).await.expect("read failed");
35///     assert_eq!(&buf, b"hello world!");
36/// });
37/// ```
38#[derive(Clone)]
39pub struct Write<B: Blob> {
40    /// The underlying blob to write to.
41    blob: B,
42
43    /// The buffer containing the data yet to be appended to the tip of the underlying blob.
44    buffer: Arc<RwLock<Buffer>>,
45}
46
47impl<B: Blob> Write<B> {
48    /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
49    /// of `blob` with the provided `size`.
50    ///
51    /// # Panics
52    ///
53    /// Panics if `capacity` is zero.
54    pub fn new(blob: B, size: u64, capacity: NonZeroUsize) -> Self {
55        Self {
56            blob,
57            buffer: Arc::new(RwLock::new(Buffer::new(size, capacity))),
58        }
59    }
60
61    /// Returns the current logical size of the blob including any buffered data.
62    ///
63    /// This represents the total size of data that would be present after flushing.
64    #[allow(clippy::len_without_is_empty)]
65    pub async fn size(&self) -> u64 {
66        let buffer = self.buffer.read().await;
67        buffer.size()
68    }
69}
70
71impl<B: Blob> Blob for Write<B> {
72    async fn read_at(
73        &self,
74        buf: impl Into<StableBuf> + Send,
75        offset: u64,
76    ) -> Result<StableBuf, Error> {
77        // Prepare `buf` to capture the read data.
78        let mut buf = buf.into();
79        let buf_len = buf.len(); // number of bytes to read
80
81        // Ensure the read doesn't overflow.
82        let end_offset = offset
83            .checked_add(buf_len as u64)
84            .ok_or(Error::OffsetOverflow)?;
85
86        // Acquire a read lock on the buffer.
87        let buffer = self.buffer.read().await;
88
89        // If the data required is beyond the size of the blob, return an error.
90        if end_offset > buffer.size() {
91            return Err(Error::BlobInsufficientLength);
92        }
93
94        // Extract any bytes from the buffer that overlap with the requested range.
95        let remaining = buffer.extract(buf.as_mut(), offset);
96        if remaining == 0 {
97            return Ok(buf);
98        }
99
100        // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
101        // of the range.
102        let blob_part = self.blob.read_at(vec![0u8; remaining], offset).await?;
103        buf.as_mut()[..remaining].copy_from_slice(blob_part.as_ref());
104
105        Ok(buf)
106    }
107
108    async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
109        // Prepare `buf` to be written.
110        let buf = buf.into();
111        let buf_len = buf.len(); // number of bytes to write
112
113        // Ensure the write doesn't overflow.
114        let end_offset = offset
115            .checked_add(buf_len as u64)
116            .ok_or(Error::OffsetOverflow)?;
117
118        // Acquire a write lock on the buffer.
119        let mut buffer = self.buffer.write().await;
120
121        // Write falls entirely within the buffer's current range and can be merged.
122        if buffer.merge(buf.as_ref(), offset) {
123            return Ok(());
124        }
125
126        // Write cannot be merged, so flush the buffer if the range overlaps, and check if merge is
127        // possible after.
128        if buffer.offset < end_offset {
129            if let Some((old_buf, old_offset)) = buffer.take() {
130                self.blob.write_at(old_buf, old_offset).await?;
131                if buffer.merge(buf.as_ref(), offset) {
132                    return Ok(());
133                }
134            }
135        }
136
137        // Write could not be merged (exceeds buffer capacity or outside its range), so write
138        // directly. Note that we end up writing an intersecting range twice: once when the buffer
139        // is flushed above, then again when we write the `buf` below. Removing this inefficiency
140        // may not be worth the additional complexity.
141        self.blob.write_at(buf, offset).await?;
142
143        // Maintain the "buffer at tip" invariant by advancing offset to the end of this write if it
144        // extended the underlying blob.
145        buffer.offset = buffer.offset.max(end_offset);
146
147        Ok(())
148    }
149
150    async fn resize(&self, len: u64) -> Result<(), Error> {
151        // Acquire a write lock on the buffer.
152        let mut buffer = self.buffer.write().await;
153
154        // Flush buffered data to the underlying blob.
155        //
156        // This can only happen if the new size is greater than the current size.
157        if let Some((buf, offset)) = buffer.resize(len) {
158            self.blob.write_at(buf, offset).await?;
159        }
160
161        // Resize the underlying blob.
162        self.blob.resize(len).await?;
163
164        Ok(())
165    }
166
167    async fn sync(&self) -> Result<(), Error> {
168        let mut buffer = self.buffer.write().await;
169        if let Some((buf, offset)) = buffer.take() {
170            self.blob.write_at(buf, offset).await?;
171        }
172        self.blob.sync().await
173    }
174}