commonware_runtime/utils/buffer/write.rs
1use crate::{buffer::tip::Buffer, Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use std::sync::Arc;
4
5/// A writer that buffers content to a [Blob] to optimize the performance
6/// of appending or updating data.
7///
8/// # Example
9///
10/// ```
11/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
12///
13/// let executor = deterministic::Runner::default();
14/// executor.start(|context| async move {
15/// // Open a blob for writing
16/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
17/// assert_eq!(size, 0);
18///
19/// // Create a buffered writer with 16-byte buffer
20/// let mut blob = Write::new(blob, 0, 16);
21/// blob.write_at(b"hello".to_vec(), 0).await.expect("write failed");
22/// blob.sync().await.expect("sync failed");
23///
24/// // Write more data in multiple flushes
25/// blob.write_at(b" world".to_vec(), 5).await.expect("write failed");
26/// blob.write_at(b"!".to_vec(), 11).await.expect("write failed");
27/// blob.sync().await.expect("sync failed");
28///
29/// // Read back the data to verify
30/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
31/// let mut reader = Read::new(blob, size, 8);
32/// let mut buf = vec![0u8; size as usize];
33/// reader.read_exact(&mut buf, size as usize).await.expect("read failed");
34/// assert_eq!(&buf, b"hello world!");
35/// });
36/// ```
37#[derive(Clone)]
38pub struct Write<B: Blob> {
39 /// The underlying blob to write to.
40 blob: B,
41
42 /// The buffer containing the data yet to be appended to the tip of the underlying blob.
43 buffer: Arc<RwLock<Buffer>>,
44}
45
46impl<B: Blob> Write<B> {
47 /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
48 /// of `blob` with the provided `size`.
49 ///
50 /// # Panics
51 ///
52 /// Panics if `capacity` is zero.
53 pub fn new(blob: B, size: u64, capacity: usize) -> Self {
54 assert!(capacity > 0, "buffer capacity must be greater than zero");
55
56 Self {
57 blob,
58 buffer: Arc::new(RwLock::new(Buffer::new(size, capacity))),
59 }
60 }
61
62 /// Returns the current logical size of the blob including any buffered data.
63 ///
64 /// This represents the total size of data that would be present after flushing.
65 #[allow(clippy::len_without_is_empty)]
66 pub async fn size(&self) -> u64 {
67 let buffer = self.buffer.read().await;
68 buffer.size()
69 }
70}
71
72impl<B: Blob> Blob for Write<B> {
73 async fn read_at(
74 &self,
75 buf: impl Into<StableBuf> + Send,
76 offset: u64,
77 ) -> Result<StableBuf, Error> {
78 // Prepare `buf` to capture the read data.
79 let mut buf = buf.into();
80 let buf_len = buf.len(); // number of bytes to read
81
82 // Ensure the read doesn't overflow.
83 let end_offset = offset
84 .checked_add(buf_len as u64)
85 .ok_or(Error::OffsetOverflow)?;
86
87 // Acquire a read lock on the buffer.
88 let buffer = self.buffer.read().await;
89
90 // If the data required is beyond the size of the blob, return an error.
91 if end_offset > buffer.size() {
92 return Err(Error::BlobInsufficientLength);
93 }
94
95 // Extract any bytes from the buffer that overlap with the requested range.
96 let remaining = buffer.extract(buf.as_mut(), offset);
97 if remaining == 0 {
98 return Ok(buf);
99 }
100
101 // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
102 // of the range.
103 let blob_part = self.blob.read_at(vec![0u8; remaining], offset).await?;
104 buf.as_mut()[..remaining].copy_from_slice(blob_part.as_ref());
105
106 Ok(buf)
107 }
108
109 async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
110 // Prepare `buf` to be written.
111 let buf = buf.into();
112 let buf_len = buf.len(); // number of bytes to write
113
114 // Ensure the write doesn't overflow.
115 let end_offset = offset
116 .checked_add(buf_len as u64)
117 .ok_or(Error::OffsetOverflow)?;
118
119 // Acquire a write lock on the buffer.
120 let mut buffer = self.buffer.write().await;
121
122 // Write falls entirely within the buffer's current range and can be merged.
123 if buffer.merge(buf.as_ref(), offset) {
124 return Ok(());
125 }
126
127 // Write cannot be merged, so flush the buffer if the range overlaps, and check if merge is
128 // possible after.
129 if buffer.offset < end_offset {
130 if let Some((old_buf, old_offset)) = buffer.take() {
131 self.blob.write_at(old_buf, old_offset).await?;
132 if buffer.merge(buf.as_ref(), offset) {
133 return Ok(());
134 }
135 }
136 }
137
138 // Write could not be merged (exceeds buffer capacity or outside its range), so write
139 // directly. Note that we end up writing an intersecting range twice: once when the buffer
140 // is flushed above, then again when we write the `buf` below. Removing this inefficiency
141 // may not be worth the additional complexity.
142 self.blob.write_at(buf, offset).await?;
143
144 // Maintain the "buffer at tip" invariant by advancing offset to the end of this write if it
145 // extended the underlying blob.
146 buffer.offset = buffer.offset.max(end_offset);
147
148 Ok(())
149 }
150
151 async fn resize(&self, len: u64) -> Result<(), Error> {
152 // Acquire a write lock on the buffer.
153 let mut buffer = self.buffer.write().await;
154
155 // Flush buffered data to the underlying blob.
156 //
157 // This can only happen if the new size is greater than the current size.
158 if let Some((buf, offset)) = buffer.resize(len) {
159 self.blob.write_at(buf, offset).await?;
160 }
161
162 // Resize the underlying blob.
163 self.blob.resize(len).await?;
164
165 Ok(())
166 }
167
168 async fn sync(&self) -> Result<(), Error> {
169 let mut buffer = self.buffer.write().await;
170 if let Some((buf, offset)) = buffer.take() {
171 self.blob.write_at(buf, offset).await?;
172 }
173 self.blob.sync().await
174 }
175
176 async fn close(self) -> Result<(), Error> {
177 self.sync().await?;
178 self.blob.close().await
179 }
180}