commonware_runtime/utils/buffer/append.rs
1use crate::{
2 buffer::{tip::Buffer, PoolRef},
3 Blob, Error, RwLock,
4};
5use commonware_utils::StableBuf;
6use std::sync::Arc;
7
8/// A [Blob] wrapper that supports appending new data that is both read and write cached, and
9/// provides buffer-pool managed read caching of older data.
10#[derive(Clone)]
11pub struct Append<B: Blob> {
12 /// The underlying blob being wrapped.
13 blob: B,
14
15 /// Unique id assigned by the buffer pool.
16 id: u64,
17
18 /// Buffer pool to consult for caching.
19 pool_ref: PoolRef,
20
21 /// The buffer containing the data yet to be appended to the tip of the underlying blob, as well
22 /// as up to the final page_size-1 bytes from the underlying blob (to ensure the buffer's offset
23 /// is always at a page boundary).
24 ///
25 /// # Invariants
26 ///
27 /// - The buffer's `offset` into the blob is always page aligned.
28 /// - The range of bytes in this buffer never overlaps with any page buffered by `pool`. (See
29 /// the warning in [Self::resize] for one uncommon exception.)
30 buffer: Arc<RwLock<Buffer>>,
31}
32
33impl<B: Blob> Append<B> {
34 /// Create a new [Append] of provided `size` using the provided `pool` for read caching, and a
35 /// write buffer with capacity `buffer_size`.
36 pub async fn new(
37 blob: B,
38 size: u64,
39 mut buffer_size: usize,
40 pool_ref: PoolRef,
41 ) -> Result<Self, Error> {
42 // Set a floor on the write buffer size to make sure we always write at least 1 page of new
43 // data with each flush. We multiply page_size by two here since we could be storing up to
44 // page_size-1 bytes of already written data in the append buffer to maintain page
45 // alignment.
46 buffer_size = buffer_size.max(pool_ref.page_size * 2);
47
48 // Initialize the append buffer to contain the last non-full page of bytes from the blob to
49 // ensure its offset into the blob is always page aligned.
50 let leftover_size = size % pool_ref.page_size as u64;
51 let page_aligned_size = size - leftover_size;
52 let mut buffer = Buffer::new(page_aligned_size, buffer_size);
53 if leftover_size != 0 {
54 let page_buf = vec![0; leftover_size as usize];
55 let buf = blob.read_at(page_buf, page_aligned_size).await?;
56 assert!(!buffer.append(buf.as_ref()));
57 }
58
59 Ok(Self {
60 blob,
61 id: pool_ref.next_id().await,
62 pool_ref,
63 buffer: Arc::new(RwLock::new(buffer)),
64 })
65 }
66
67 /// Append all bytes in `buf` to the tip of the blob.
68 pub async fn append(&self, buf: impl Into<StableBuf> + Send) -> Result<(), Error> {
69 // Prepare `buf` to be written.
70 let buf = buf.into();
71
72 // Acquire a write lock on the buffer.
73 let mut buffer = self.buffer.write().await;
74
75 // Ensure the write doesn't overflow.
76 buffer
77 .size()
78 .checked_add(buf.len() as u64)
79 .ok_or(Error::OffsetOverflow)?;
80
81 if buffer.append(buf.as_ref()) {
82 // Buffer is over capacity, flush it to the underlying blob.
83 return self.flush(&mut buffer).await;
84 }
85
86 Ok(())
87 }
88
89 /// Returns the current logical size of the blob including any buffered data.
90 ///
91 /// This represents the total size of data that would be present after flushing.
92 #[allow(clippy::len_without_is_empty)]
93 pub async fn size(&self) -> u64 {
94 let buffer = self.buffer.read().await;
95 buffer.size()
96 }
97
98 /// Flush the append buffer to the underlying blob, caching each page worth of written data in
99 /// the buffer pool.
100 async fn flush(&self, buffer: &mut Buffer) -> Result<(), Error> {
101 // Take the buffered data, if any.
102 let Some((buf, offset)) = buffer.take() else {
103 return Ok(());
104 };
105
106 // Insert the flushed data into the buffer pool. This step isn't just to ensure recently
107 // written data remains cached for future reads, but is in fact required to purge
108 // potentially stale cache data which might result from the edge the case of rewinding a
109 // blob across a page boundary.
110 let remaining = self.pool_ref.cache(self.id, &buf, offset).await;
111
112 // If there's any data left over that doesn't constitute an entire page, re-buffer it into
113 // the append buffer to maintain its page-boundary alignment.
114 if remaining != 0 {
115 buffer.offset -= remaining as u64;
116 buffer.data.extend_from_slice(&buf[buf.len() - remaining..])
117 }
118
119 // Write the data buffer to the underlying blob.
120 // TODO(https://github.com/commonwarexyz/monorepo/issues/1218): The implementation will
121 // unnecessarily rewrite the last (blob_size % page_size) "trailing bytes" of the underlying
122 // blob since the write's starting offset is always page aligned.
123 self.blob.write_at(buf, offset).await?;
124
125 Ok(())
126 }
127
128 /// Clones and returns the underlying blob.
129 pub fn clone_blob(&self) -> B {
130 self.blob.clone()
131 }
132}
133
134impl<B: Blob> Blob for Append<B> {
135 async fn read_at(
136 &self,
137 buf: impl Into<StableBuf> + Send,
138 offset: u64,
139 ) -> Result<StableBuf, Error> {
140 // Prepare `buf` to capture the read data.
141 let mut buf = buf.into();
142
143 // Ensure the read doesn't overflow.
144 let end_offset = offset
145 .checked_add(buf.len() as u64)
146 .ok_or(Error::OffsetOverflow)?;
147
148 // Acquire a read lock on the buffer.
149 let buffer = self.buffer.read().await;
150
151 // If the data required is beyond the size of the blob, return an error.
152 if end_offset > buffer.size() {
153 return Err(Error::BlobInsufficientLength);
154 }
155
156 // Extract any bytes from the buffer that overlap with the requested range.
157 let remaining = buffer.extract(buf.as_mut(), offset);
158 if remaining == 0 {
159 return Ok(buf);
160 }
161
162 // If there are bytes remaining to be read, use the buffer pool to get them.
163 self.pool_ref
164 .read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset)
165 .await?;
166
167 Ok(buf)
168 }
169
170 /// This [Blob] trait method is unimplemented by [Append] and unconditionally panics.
171 async fn write_at(&self, _buf: impl Into<StableBuf> + Send, _offset: u64) -> Result<(), Error> {
172 // TODO(<https://github.com/commonwarexyz/monorepo/issues/1207>): Extend the buffer pool to
173 // support arbitrary writes.
174 unimplemented!("append-only blob type does not support write_at")
175 }
176
177 async fn sync(&self) -> Result<(), Error> {
178 {
179 let mut buffer = self.buffer.write().await;
180 self.flush(&mut buffer).await?;
181 }
182 self.blob.sync().await
183 }
184
185 /// Resize the blob to the provided `size`.
186 async fn resize(&self, size: u64) -> Result<(), Error> {
187 // Implementation note: rewinding the blob across a page boundary potentially results in
188 // stale data remaining in the buffer pool's cache. We don't proactively purge the data
189 // within this function since it would be inaccessible anyway. Instead we ensure it is
190 // always updated should the blob grow back to the point where we have new data for the same
191 // page, if any old data hasn't expired naturally by then.
192
193 // Acquire a write lock on the buffer.
194 let mut buffer = self.buffer.write().await;
195
196 // Flush any buffered bytes to the underlying blob. (Note that a fancier implementation
197 // might avoid flushing those bytes that are backed up over by the next step, if any.)
198 self.flush(&mut buffer).await?;
199
200 // Resize the underlying blob.
201 self.blob.resize(size).await?;
202
203 // Reset the append buffer to the new size, ensuring its page alignment.
204 let leftover_size = size % self.pool_ref.page_size as u64;
205 buffer.offset = size - leftover_size; // page aligned size
206 buffer.data.clear();
207 if leftover_size != 0 {
208 let page_buf = vec![0; leftover_size as usize];
209 let buf = self.blob.read_at(page_buf, buffer.offset).await?;
210 assert!(!buffer.append(buf.as_ref()));
211 }
212
213 Ok(())
214 }
215
216 async fn close(self) -> Result<(), Error> {
217 self.sync().await?;
218 self.blob.close().await
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use crate::{deterministic, Runner, Storage as _};
226 use commonware_macros::test_traced;
227
228 const PAGE_SIZE: usize = 1024;
229 const BUFFER_SIZE: usize = PAGE_SIZE * 2;
230
231 #[test_traced]
232 #[should_panic(expected = "not implemented")]
233 fn test_append_blob_write_panics() {
234 // Initialize the deterministic context
235 let executor = deterministic::Runner::default();
236 // Start the test within the executor
237 executor.start(|context| async move {
238 let (blob, size) = context
239 .open("test", "blob".as_bytes())
240 .await
241 .expect("Failed to open blob");
242 let pool_ref = PoolRef::new(PAGE_SIZE, 10);
243 let blob = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
244 .await
245 .unwrap();
246 assert_eq!(blob.size().await, 0);
247 blob.write_at(vec![0], 0).await.unwrap();
248 });
249 }
250
251 #[test_traced]
252 fn test_append_blob_append() {
253 // Initialize the deterministic context
254 let executor = deterministic::Runner::default();
255 // Start the test within the executor
256 executor.start(|context| async move {
257 let (blob, size) = context
258 .open("test", "blob".as_bytes())
259 .await
260 .expect("Failed to open blob");
261 assert_eq!(size, 0);
262
263 // Wrap the blob, then append 11 consecutive pages of data.
264 let pool_ref = PoolRef::new(PAGE_SIZE, 10);
265 let blob = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
266 .await
267 .unwrap();
268 for i in 0..11 {
269 let buf = vec![i as u8; PAGE_SIZE];
270 blob.append(buf).await.unwrap();
271 }
272 assert_eq!(blob.size().await, 11 * PAGE_SIZE as u64);
273
274 blob.close().await.expect("Failed to close blob");
275
276 // Make sure blob has expected size when reopened.
277 let (blob, size) = context
278 .open("test", "blob".as_bytes())
279 .await
280 .expect("Failed to open blob");
281 assert_eq!(size, 11 * PAGE_SIZE as u64);
282 blob.close().await.expect("Failed to close blob");
283 });
284 }
285
286 #[test_traced]
287 fn test_append_blob_read() {
288 // Initialize the deterministic context
289 let executor = deterministic::Runner::default();
290 // Start the test within the executor
291 executor.start(|context| async move {
292 let (blob, size) = context
293 .open("test", "blob".as_bytes())
294 .await
295 .expect("Failed to open blob");
296 assert_eq!(size, 0);
297
298 let pool_ref = PoolRef::new(PAGE_SIZE, 10);
299 let blob = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
300 .await
301 .unwrap();
302
303 // Append one byte & sync to ensure we have "trailing bytes".
304 blob.append(vec![42]).await.unwrap();
305 blob.sync().await.unwrap();
306
307 // Append 11 consecutive pages of data.
308 for i in 0..11 {
309 let buf = vec![i as u8; PAGE_SIZE];
310 blob.append(buf).await.unwrap();
311 }
312
313 // Read from the blob across a page boundary but well outside any write buffered data.
314 let mut buf = vec![0; 100];
315 buf = blob
316 .read_at(buf, 1 + PAGE_SIZE as u64 - 50)
317 .await
318 .unwrap()
319 .into();
320 let mut expected = vec![0; 50];
321 expected.extend_from_slice(&[1; 50]);
322 assert_eq!(buf, expected);
323
324 // Read from the blob across a page boundary but within the write buffered data.
325 let mut buf = vec![0; 100];
326 buf = blob
327 .read_at(buf, 1 + (PAGE_SIZE as u64 * 10) - 50)
328 .await
329 .unwrap()
330 .into();
331 let mut expected = vec![9; 50];
332 expected.extend_from_slice(&[10; 50]);
333 assert_eq!(buf, expected);
334
335 // Read across read-only and write-buffered section, all the way up to the very last
336 // byte.
337 let buf_size = PAGE_SIZE * 4;
338 let mut buf = vec![0; buf_size];
339 buf = blob
340 .read_at(buf, blob.size().await - buf_size as u64)
341 .await
342 .unwrap()
343 .into();
344 let mut expected = vec![7; PAGE_SIZE];
345 expected.extend_from_slice(&[8; PAGE_SIZE]);
346 expected.extend_from_slice(&[9; PAGE_SIZE]);
347 expected.extend_from_slice(&[10; PAGE_SIZE]);
348 assert_eq!(buf, expected);
349
350 // Exercise more boundary conditions by reading every possible 2-byte slice.
351 for i in 0..blob.size().await - 1 {
352 let mut buf = vec![0; 2];
353 buf = blob.read_at(buf, i).await.unwrap().into();
354 let page_num = (i / PAGE_SIZE as u64) as u8;
355 if i == 0 {
356 assert_eq!(buf, &[42, 0]);
357 } else if i % PAGE_SIZE as u64 == 0 {
358 assert_eq!(buf, &[page_num - 1, page_num], "i = {i}");
359 } else {
360 assert_eq!(buf, &[page_num; 2], "i = {i}");
361 }
362 }
363
364 // Confirm all bytes are as expected after syncing the blob.
365 blob.sync().await.unwrap();
366 buf = blob.read_at(vec![0], 0).await.unwrap().into();
367 assert_eq!(buf, &[42]);
368
369 for i in 0..11 {
370 let mut buf = vec![0; PAGE_SIZE];
371 buf = blob
372 .read_at(buf, 1 + i * PAGE_SIZE as u64)
373 .await
374 .unwrap()
375 .into();
376 assert_eq!(buf, &[i as u8; PAGE_SIZE]);
377 }
378
379 blob.close().await.expect("Failed to close blob");
380 });
381 }
382}