commonware_runtime/utils/buffer/
write.rs

1use crate::{buffer::tip::Buffer, Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use std::sync::Arc;
4
5/// A writer that buffers content to a [Blob] to optimize the performance
6/// of appending or updating data.
7///
8/// # Example
9///
10/// ```
11/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
12///
13/// let executor = deterministic::Runner::default();
14/// executor.start(|context| async move {
15///     // Open a blob for writing
16///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
17///     assert_eq!(size, 0);
18///
19///     // Create a buffered writer with 16-byte buffer
20///     let mut blob = Write::new(blob, 0, 16);
21///     blob.write_at(b"hello".to_vec(), 0).await.expect("write failed");
22///     blob.sync().await.expect("sync failed");
23///
24///     // Write more data in multiple flushes
25///     blob.write_at(b" world".to_vec(), 5).await.expect("write failed");
26///     blob.write_at(b"!".to_vec(), 11).await.expect("write failed");
27///     blob.sync().await.expect("sync failed");
28///
29///     // Read back the data to verify
30///     let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
31///     let mut reader = Read::new(blob, size, 8);
32///     let mut buf = vec![0u8; size as usize];
33///     reader.read_exact(&mut buf, size as usize).await.expect("read failed");
34///     assert_eq!(&buf, b"hello world!");
35/// });
36/// ```
37#[derive(Clone)]
38pub struct Write<B: Blob> {
39    /// The underlying blob to write to.
40    blob: B,
41
42    /// The buffer containing the data yet to be appended to the tip of the underlying blob.
43    buffer: Arc<RwLock<Buffer>>,
44}
45
46impl<B: Blob> Write<B> {
47    /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
48    /// of `blob` with the provided `size`.
49    ///
50    /// # Panics
51    ///
52    /// Panics if `capacity` is zero.
53    pub fn new(blob: B, size: u64, capacity: usize) -> Self {
54        assert!(capacity > 0, "buffer capacity must be greater than zero");
55
56        Self {
57            blob,
58            buffer: Arc::new(RwLock::new(Buffer::new(size, capacity))),
59        }
60    }
61
62    /// Returns the current logical size of the blob including any buffered data.
63    ///
64    /// This represents the total size of data that would be present after flushing.
65    #[allow(clippy::len_without_is_empty)]
66    pub async fn size(&self) -> u64 {
67        let buffer = self.buffer.read().await;
68        buffer.size()
69    }
70}
71
72impl<B: Blob> Blob for Write<B> {
73    async fn read_at(
74        &self,
75        buf: impl Into<StableBuf> + Send,
76        offset: u64,
77    ) -> Result<StableBuf, Error> {
78        // Prepare `buf` to capture the read data.
79        let mut buf = buf.into();
80        let buf_len = buf.len(); // number of bytes to read
81
82        // Ensure the read doesn't overflow.
83        let end_offset = offset
84            .checked_add(buf_len as u64)
85            .ok_or(Error::OffsetOverflow)?;
86
87        // Acquire a read lock on the buffer.
88        let buffer = self.buffer.read().await;
89
90        // If the data required is beyond the size of the blob, return an error.
91        if end_offset > buffer.size() {
92            return Err(Error::BlobInsufficientLength);
93        }
94
95        // Extract any bytes from the buffer that overlap with the requested range.
96        let remaining = buffer.extract(buf.as_mut(), offset);
97        if remaining == 0 {
98            return Ok(buf);
99        }
100
101        // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
102        // of the range.
103        let blob_part = self.blob.read_at(vec![0u8; remaining], offset).await?;
104        buf.as_mut()[..remaining].copy_from_slice(blob_part.as_ref());
105
106        Ok(buf)
107    }
108
109    async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
110        // Prepare `buf` to be written.
111        let buf = buf.into();
112        let buf_len = buf.len(); // number of bytes to write
113
114        // Ensure the write doesn't overflow.
115        let end_offset = offset
116            .checked_add(buf_len as u64)
117            .ok_or(Error::OffsetOverflow)?;
118
119        // Acquire a write lock on the buffer.
120        let mut buffer = self.buffer.write().await;
121
122        // Write falls entirely within the buffer's current range and can be merged.
123        if buffer.merge(buf.as_ref(), offset) {
124            return Ok(());
125        }
126
127        // Write cannot be merged, so flush the buffer if the range overlaps, and check if merge is
128        // possible after.
129        if buffer.offset < end_offset {
130            if let Some((old_buf, old_offset)) = buffer.take() {
131                self.blob.write_at(old_buf, old_offset).await?;
132                if buffer.merge(buf.as_ref(), offset) {
133                    return Ok(());
134                }
135            }
136        }
137
138        // Write could not be merged (exceeds buffer capacity or outside its range), so write
139        // directly. Note that we end up writing an intersecting range twice: once when the buffer
140        // is flushed above, then again when we write the `buf` below. Removing this inefficiency
141        // may not be worth the additional complexity.
142        self.blob.write_at(buf, offset).await?;
143
144        // Maintain the "buffer at tip" invariant by advancing offset to the end of this write if it
145        // extended the underlying blob.
146        buffer.offset = buffer.offset.max(end_offset);
147
148        Ok(())
149    }
150
151    async fn resize(&self, len: u64) -> Result<(), Error> {
152        // Acquire a write lock on the buffer.
153        let mut buffer = self.buffer.write().await;
154
155        // Flush buffered data to the underlying blob.
156        //
157        // This can only happen if the new size is greater than the current size.
158        if let Some((buf, offset)) = buffer.resize(len) {
159            self.blob.write_at(buf, offset).await?;
160        }
161
162        // Resize the underlying blob.
163        self.blob.resize(len).await?;
164
165        Ok(())
166    }
167
168    async fn sync(&self) -> Result<(), Error> {
169        let mut buffer = self.buffer.write().await;
170        if let Some((buf, offset)) = buffer.take() {
171            self.blob.write_at(buf, offset).await?;
172        }
173        self.blob.sync().await
174    }
175
176    async fn close(self) -> Result<(), Error> {
177        self.sync().await?;
178        self.blob.close().await
179    }
180}