commonware_runtime/utils/buffer/write.rs
1use crate::{buffer::tip::Buffer, Blob, Error, RwLock};
2use commonware_utils::StableBuf;
3use std::{num::NonZeroUsize, sync::Arc};
4
5/// A writer that buffers content to a [Blob] to optimize the performance
6/// of appending or updating data.
7///
8/// # Example
9///
10/// ```
11/// use commonware_runtime::{Runner, buffer::{Write, Read}, Blob, Error, Storage, deterministic};
12/// use commonware_utils::NZUsize;
13///
14/// let executor = deterministic::Runner::default();
15/// executor.start(|context| async move {
16/// // Open a blob for writing
17/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to open blob");
18/// assert_eq!(size, 0);
19///
20/// // Create a buffered writer with 16-byte buffer
21/// let mut blob = Write::new(blob, 0, NZUsize!(16));
22/// blob.write_at(b"hello".to_vec(), 0).await.expect("write failed");
23/// blob.sync().await.expect("sync failed");
24///
25/// // Write more data in multiple flushes
26/// blob.write_at(b" world".to_vec(), 5).await.expect("write failed");
27/// blob.write_at(b"!".to_vec(), 11).await.expect("write failed");
28/// blob.sync().await.expect("sync failed");
29///
30/// // Read back the data to verify
31/// let (blob, size) = context.open("my_partition", b"my_data").await.expect("unable to reopen blob");
32/// let mut reader = Read::new(blob, size, NZUsize!(8));
33/// let mut buf = vec![0u8; size as usize];
34/// reader.read_exact(&mut buf, size as usize).await.expect("read failed");
35/// assert_eq!(&buf, b"hello world!");
36/// });
37/// ```
38#[derive(Clone)]
39pub struct Write<B: Blob> {
40 /// The underlying blob to write to.
41 blob: B,
42
43 /// The buffer containing the data yet to be appended to the tip of the underlying blob.
44 buffer: Arc<RwLock<Buffer>>,
45}
46
47impl<B: Blob> Write<B> {
48 /// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
49 /// of `blob` with the provided `size`.
50 ///
51 /// # Panics
52 ///
53 /// Panics if `capacity` is zero.
54 pub fn new(blob: B, size: u64, capacity: NonZeroUsize) -> Self {
55 Self {
56 blob,
57 buffer: Arc::new(RwLock::new(Buffer::new(size, capacity))),
58 }
59 }
60
61 /// Returns the current logical size of the blob including any buffered data.
62 ///
63 /// This represents the total size of data that would be present after flushing.
64 #[allow(clippy::len_without_is_empty)]
65 pub async fn size(&self) -> u64 {
66 let buffer = self.buffer.read().await;
67 buffer.size()
68 }
69}
70
71impl<B: Blob> Blob for Write<B> {
72 async fn read_at(
73 &self,
74 buf: impl Into<StableBuf> + Send,
75 offset: u64,
76 ) -> Result<StableBuf, Error> {
77 // Prepare `buf` to capture the read data.
78 let mut buf = buf.into();
79 let buf_len = buf.len(); // number of bytes to read
80
81 // Ensure the read doesn't overflow.
82 let end_offset = offset
83 .checked_add(buf_len as u64)
84 .ok_or(Error::OffsetOverflow)?;
85
86 // Acquire a read lock on the buffer.
87 let buffer = self.buffer.read().await;
88
89 // If the data required is beyond the size of the blob, return an error.
90 if end_offset > buffer.size() {
91 return Err(Error::BlobInsufficientLength);
92 }
93
94 // Extract any bytes from the buffer that overlap with the requested range.
95 let remaining = buffer.extract(buf.as_mut(), offset);
96 if remaining == 0 {
97 return Ok(buf);
98 }
99
100 // If bytes remain, read directly from the blob. Any remaining bytes reside at the beginning
101 // of the range.
102 let blob_part = self.blob.read_at(vec![0u8; remaining], offset).await?;
103 buf.as_mut()[..remaining].copy_from_slice(blob_part.as_ref());
104
105 Ok(buf)
106 }
107
108 async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
109 // Prepare `buf` to be written.
110 let buf = buf.into();
111 let buf_len = buf.len(); // number of bytes to write
112
113 // Ensure the write doesn't overflow.
114 let end_offset = offset
115 .checked_add(buf_len as u64)
116 .ok_or(Error::OffsetOverflow)?;
117
118 // Acquire a write lock on the buffer.
119 let mut buffer = self.buffer.write().await;
120
121 // Write falls entirely within the buffer's current range and can be merged.
122 if buffer.merge(buf.as_ref(), offset) {
123 return Ok(());
124 }
125
126 // Write cannot be merged, so flush the buffer if the range overlaps, and check if merge is
127 // possible after.
128 if buffer.offset < end_offset {
129 if let Some((old_buf, old_offset)) = buffer.take() {
130 self.blob.write_at(old_buf, old_offset).await?;
131 if buffer.merge(buf.as_ref(), offset) {
132 return Ok(());
133 }
134 }
135 }
136
137 // Write could not be merged (exceeds buffer capacity or outside its range), so write
138 // directly. Note that we end up writing an intersecting range twice: once when the buffer
139 // is flushed above, then again when we write the `buf` below. Removing this inefficiency
140 // may not be worth the additional complexity.
141 self.blob.write_at(buf, offset).await?;
142
143 // Maintain the "buffer at tip" invariant by advancing offset to the end of this write if it
144 // extended the underlying blob.
145 buffer.offset = buffer.offset.max(end_offset);
146
147 Ok(())
148 }
149
150 async fn resize(&self, len: u64) -> Result<(), Error> {
151 // Acquire a write lock on the buffer.
152 let mut buffer = self.buffer.write().await;
153
154 // Flush buffered data to the underlying blob.
155 //
156 // This can only happen if the new size is greater than the current size.
157 if let Some((buf, offset)) = buffer.resize(len) {
158 self.blob.write_at(buf, offset).await?;
159 }
160
161 // Resize the underlying blob.
162 self.blob.resize(len).await?;
163
164 Ok(())
165 }
166
167 async fn sync(&self) -> Result<(), Error> {
168 let mut buffer = self.buffer.write().await;
169 if let Some((buf, offset)) = buffer.take() {
170 self.blob.write_at(buf, offset).await?;
171 }
172 self.blob.sync().await
173 }
174}