commonware_runtime/utils/buffer/
append.rs

1use crate::{
2    buffer::{tip::Buffer, PoolRef},
3    Blob, Error, RwLock,
4};
5use commonware_utils::{NZUsize, StableBuf};
6use std::{num::NonZeroUsize, sync::Arc};
7
8/// A [Blob] wrapper that supports appending new data that is both read and write cached, and
9/// provides buffer-pool managed read caching of older data.
10#[derive(Clone)]
11pub struct Append<B: Blob> {
12    /// The underlying blob being wrapped.
13    blob: B,
14
15    /// Unique id assigned by the buffer pool.
16    id: u64,
17
18    /// Buffer pool to consult for caching.
19    pool_ref: PoolRef,
20
21    /// The buffer containing the data yet to be appended to the tip of the underlying blob, as well
22    /// as up to the final page_size-1 bytes from the underlying blob (to ensure the buffer's offset
23    /// is always at a page boundary), paired with the actual size of the underlying blob on disk.
24    ///
25    /// # Invariants
26    ///
27    /// - The buffer's `offset` into the blob is always page aligned.
28    /// - The range of bytes in this buffer never overlaps with any page buffered by `pool`. (See
29    ///   the warning in [Self::resize] for one uncommon exception.)
30    buffer: Arc<RwLock<(Buffer, u64)>>,
31}
32
33impl<B: Blob> Append<B> {
34    /// Create a new [Append] of provided `size` using the provided `pool` for read caching, and a
35    /// write buffer with capacity `buffer_size`.
36    pub async fn new(
37        blob: B,
38        size: u64,
39        buffer_size: NonZeroUsize,
40        pool_ref: PoolRef,
41    ) -> Result<Self, Error> {
42        // Set a floor on the write buffer size to make sure we always write at least 1 page of new
43        // data with each flush. We multiply page_size by two here since we could be storing up to
44        // page_size-1 bytes of already written data in the append buffer to maintain page
45        // alignment.
46        let mut buffer_size = buffer_size.get();
47        buffer_size = buffer_size.max(pool_ref.page_size * 2);
48
49        // Initialize the append buffer to contain the last non-full page of bytes from the blob to
50        // ensure its offset into the blob is always page aligned.
51        let leftover_size = size % pool_ref.page_size as u64;
52        let page_aligned_size = size - leftover_size;
53        let mut buffer = Buffer::new(page_aligned_size, NZUsize!(buffer_size));
54        if leftover_size != 0 {
55            let page_buf = vec![0; leftover_size as usize];
56            let buf = blob.read_at(page_buf, page_aligned_size).await?;
57            assert!(!buffer.append(buf.as_ref()));
58        }
59
60        Ok(Self {
61            blob,
62            id: pool_ref.next_id().await,
63            pool_ref,
64            buffer: Arc::new(RwLock::new((buffer, size))),
65        })
66    }
67
68    /// Append all bytes in `buf` to the tip of the blob.
69    pub async fn append(&self, buf: impl Into<StableBuf> + Send) -> Result<(), Error> {
70        // Prepare `buf` to be written.
71        let buf = buf.into();
72
73        // Acquire a write lock on the buffer and blob_size.
74        let (buffer, blob_size) = &mut *self.buffer.write().await;
75
76        // Ensure the write doesn't overflow.
77        buffer
78            .size()
79            .checked_add(buf.len() as u64)
80            .ok_or(Error::OffsetOverflow)?;
81
82        if buffer.append(buf.as_ref()) {
83            // Buffer is over capacity, flush it to the underlying blob.
84            return self.flush(buffer, blob_size).await;
85        }
86
87        Ok(())
88    }
89
90    /// Returns the current logical size of the blob including any buffered data.
91    ///
92    /// This represents the total size of data that would be present after flushing.
93    #[allow(clippy::len_without_is_empty)]
94    pub async fn size(&self) -> u64 {
95        self.buffer.read().await.0.size()
96    }
97
98    /// Flush the append buffer to the underlying blob, caching each page worth of written data in
99    /// the buffer pool.
100    async fn flush(&self, buffer: &mut Buffer, blob_size: &mut u64) -> Result<(), Error> {
101        // Take the buffered data, if any.
102        let Some((mut buf, offset)) = buffer.take() else {
103            return Ok(());
104        };
105
106        // Insert the flushed data into the buffer pool. This step isn't just to ensure recently
107        // written data remains cached for future reads, but is in fact required to purge
108        // potentially stale cache data which might result from the edge the case of rewinding a
109        // blob across a page boundary.
110        let remaining = self.pool_ref.cache(self.id, &buf, offset).await;
111
112        // If there's any data left over that doesn't constitute an entire page, re-buffer it into
113        // the append buffer to maintain its page-boundary alignment.
114        if remaining != 0 {
115            buffer.offset -= remaining as u64;
116            buffer.data.extend_from_slice(&buf[buf.len() - remaining..])
117        }
118
119        // Calculate where new data starts in the buffer to skip already-written trailing bytes.
120        let new_data_start = blob_size.saturating_sub(offset) as usize;
121
122        // Early exit if there's no new data to write.
123        if new_data_start >= buf.len() {
124            return Ok(());
125        }
126
127        if new_data_start > 0 {
128            buf.drain(0..new_data_start);
129        }
130        let new_data_len = buf.len() as u64;
131        self.blob.write_at(buf, *blob_size).await?;
132        *blob_size += new_data_len;
133
134        Ok(())
135    }
136
137    /// Clones and returns the underlying blob.
138    pub fn clone_blob(&self) -> B {
139        self.blob.clone()
140    }
141}
142
143impl<B: Blob> Blob for Append<B> {
144    async fn read_at(
145        &self,
146        buf: impl Into<StableBuf> + Send,
147        offset: u64,
148    ) -> Result<StableBuf, Error> {
149        // Prepare `buf` to capture the read data.
150        let mut buf = buf.into();
151
152        // Ensure the read doesn't overflow.
153        let end_offset = offset
154            .checked_add(buf.len() as u64)
155            .ok_or(Error::OffsetOverflow)?;
156
157        // Acquire a read lock on the buffer.
158        let (buffer, _) = &*self.buffer.read().await;
159
160        // If the data required is beyond the size of the blob, return an error.
161        if end_offset > buffer.size() {
162            return Err(Error::BlobInsufficientLength);
163        }
164
165        // Extract any bytes from the buffer that overlap with the requested range.
166        let remaining = buffer.extract(buf.as_mut(), offset);
167        if remaining == 0 {
168            return Ok(buf);
169        }
170
171        // If there are bytes remaining to be read, use the buffer pool to get them.
172        self.pool_ref
173            .read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset)
174            .await?;
175
176        Ok(buf)
177    }
178
179    /// This [Blob] trait method is unimplemented by [Append] and unconditionally panics.
180    async fn write_at(&self, _buf: impl Into<StableBuf> + Send, _offset: u64) -> Result<(), Error> {
181        // TODO(<https://github.com/commonwarexyz/monorepo/issues/1207>): Extend the buffer pool to
182        // support arbitrary writes.
183        unimplemented!("append-only blob type does not support write_at")
184    }
185
186    async fn sync(&self) -> Result<(), Error> {
187        {
188            let (buffer, blob_size) = &mut *self.buffer.write().await;
189            self.flush(buffer, blob_size).await?;
190        }
191        self.blob.sync().await
192    }
193
194    /// Resize the blob to the provided `size`.
195    async fn resize(&self, size: u64) -> Result<(), Error> {
196        // Implementation note: rewinding the blob across a page boundary potentially results in
197        // stale data remaining in the buffer pool's cache. We don't proactively purge the data
198        // within this function since it would be inaccessible anyway. Instead we ensure it is
199        // always updated should the blob grow back to the point where we have new data for the same
200        // page, if any old data hasn't expired naturally by then.
201
202        // Acquire a write lock on the buffer.
203        let (buffer, blob_size) = &mut *self.buffer.write().await;
204
205        // Flush any buffered bytes to the underlying blob. (Note that a fancier implementation
206        // might avoid flushing those bytes that are backed up over by the next step, if any.)
207        self.flush(buffer, blob_size).await?;
208
209        // Resize the underlying blob.
210        self.blob.resize(size).await?;
211
212        // Update the physical blob size.
213        *blob_size = size;
214
215        // Reset the append buffer to the new size, ensuring its page alignment.
216        let leftover_size = size % self.pool_ref.page_size as u64;
217        buffer.offset = size - leftover_size; // page aligned size
218        buffer.data.clear();
219        if leftover_size != 0 {
220            let page_buf = vec![0; leftover_size as usize];
221            let buf = self.blob.read_at(page_buf, buffer.offset).await?;
222            assert!(!buffer.append(buf.as_ref()));
223        }
224
225        Ok(())
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::{deterministic, Runner, Storage as _};
233    use commonware_macros::test_traced;
234    use commonware_utils::NZUsize;
235
236    const PAGE_SIZE: usize = 1024;
237    const BUFFER_SIZE: usize = PAGE_SIZE * 2;
238
239    #[test_traced]
240    #[should_panic(expected = "not implemented")]
241    fn test_append_blob_write_panics() {
242        // Initialize the deterministic context
243        let executor = deterministic::Runner::default();
244        // Start the test within the executor
245        executor.start(|context| async move {
246            let (blob, size) = context
247                .open("test", "blob".as_bytes())
248                .await
249                .expect("Failed to open blob");
250            let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
251            let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
252                .await
253                .unwrap();
254            assert_eq!(blob.size().await, 0);
255            blob.write_at(vec![0], 0).await.unwrap();
256        });
257    }
258
259    #[test_traced]
260    fn test_append_blob_append() {
261        // Initialize the deterministic context
262        let executor = deterministic::Runner::default();
263        // Start the test within the executor
264        executor.start(|context| async move {
265            let (blob, size) = context
266                .open("test", "blob".as_bytes())
267                .await
268                .expect("Failed to open blob");
269            assert_eq!(size, 0);
270
271            // Wrap the blob, then append 11 consecutive pages of data.
272            let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
273            let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
274                .await
275                .unwrap();
276            for i in 0..11 {
277                let buf = vec![i as u8; PAGE_SIZE];
278                blob.append(buf).await.unwrap();
279            }
280            assert_eq!(blob.size().await, 11 * PAGE_SIZE as u64);
281
282            blob.sync().await.expect("Failed to sync blob");
283
284            // Make sure blob has expected size when reopened.
285            let (blob, size) = context
286                .open("test", "blob".as_bytes())
287                .await
288                .expect("Failed to open blob");
289            assert_eq!(size, 11 * PAGE_SIZE as u64);
290            blob.sync().await.expect("Failed to sync blob");
291        });
292    }
293
294    #[test_traced]
295    fn test_append_blob_read() {
296        // Initialize the deterministic context
297        let executor = deterministic::Runner::default();
298        // Start the test within the executor
299        executor.start(|context| async move {
300            let (blob, size) = context
301                .open("test", "blob".as_bytes())
302                .await
303                .expect("Failed to open blob");
304            assert_eq!(size, 0);
305
306            let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
307            let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
308                .await
309                .unwrap();
310
311            // Append one byte & sync to ensure we have "trailing bytes".
312            blob.append(vec![42]).await.unwrap();
313            blob.sync().await.unwrap();
314
315            // Append 11 consecutive pages of data.
316            for i in 0..11 {
317                let buf = vec![i as u8; PAGE_SIZE];
318                blob.append(buf).await.unwrap();
319            }
320
321            // Read from the blob across a page boundary but well outside any write buffered data.
322            let mut buf = vec![0; 100];
323            buf = blob
324                .read_at(buf, 1 + PAGE_SIZE as u64 - 50)
325                .await
326                .unwrap()
327                .into();
328            let mut expected = vec![0; 50];
329            expected.extend_from_slice(&[1; 50]);
330            assert_eq!(buf, expected);
331
332            // Read from the blob across a page boundary but within the write buffered data.
333            let mut buf = vec![0; 100];
334            buf = blob
335                .read_at(buf, 1 + (PAGE_SIZE as u64 * 10) - 50)
336                .await
337                .unwrap()
338                .into();
339            let mut expected = vec![9; 50];
340            expected.extend_from_slice(&[10; 50]);
341            assert_eq!(buf, expected);
342
343            // Read across read-only and write-buffered section, all the way up to the very last
344            // byte.
345            let buf_size = PAGE_SIZE * 4;
346            let mut buf = vec![0; buf_size];
347            buf = blob
348                .read_at(buf, blob.size().await - buf_size as u64)
349                .await
350                .unwrap()
351                .into();
352            let mut expected = vec![7; PAGE_SIZE];
353            expected.extend_from_slice(&[8; PAGE_SIZE]);
354            expected.extend_from_slice(&[9; PAGE_SIZE]);
355            expected.extend_from_slice(&[10; PAGE_SIZE]);
356            assert_eq!(buf, expected);
357
358            // Exercise more boundary conditions by reading every possible 2-byte slice.
359            for i in 0..blob.size().await - 1 {
360                let mut buf = vec![0; 2];
361                buf = blob.read_at(buf, i).await.unwrap().into();
362                let page_num = (i / PAGE_SIZE as u64) as u8;
363                if i == 0 {
364                    assert_eq!(buf, &[42, 0]);
365                } else if i % PAGE_SIZE as u64 == 0 {
366                    assert_eq!(buf, &[page_num - 1, page_num], "i = {i}");
367                } else {
368                    assert_eq!(buf, &[page_num; 2], "i = {i}");
369                }
370            }
371
372            // Confirm all bytes are as expected after syncing the blob.
373            blob.sync().await.unwrap();
374            buf = blob.read_at(vec![0], 0).await.unwrap().into();
375            assert_eq!(buf, &[42]);
376
377            for i in 0..11 {
378                let mut buf = vec![0; PAGE_SIZE];
379                buf = blob
380                    .read_at(buf, 1 + i * PAGE_SIZE as u64)
381                    .await
382                    .unwrap()
383                    .into();
384                assert_eq!(buf, &[i as u8; PAGE_SIZE]);
385            }
386
387            blob.sync().await.expect("Failed to sync blob");
388        });
389    }
390
391    #[test_traced]
392    fn test_append_blob_tracks_physical_size() {
393        let executor = deterministic::Runner::default();
394        executor.start(|context| async move {
395            let (blob, size) = context
396                .open("test", "blob".as_bytes())
397                .await
398                .expect("Failed to open blob");
399
400            let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
401            let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
402                .await
403                .unwrap();
404
405            // Initially blob_size should be 0.
406            assert_eq!(blob.buffer.read().await.1, 0);
407
408            // Write 100 bytes and sync.
409            blob.append(vec![1u8; 100]).await.unwrap();
410            blob.sync().await.unwrap();
411            assert_eq!(blob.buffer.read().await.1, 100);
412
413            // Append more data but don't sync yet, blob_size shouldn't change.
414            blob.append(vec![2u8; 200]).await.unwrap();
415            assert_eq!(blob.buffer.read().await.1, 100);
416
417            // Force a flush by exceeding buffer.
418            blob.append(vec![3u8; BUFFER_SIZE]).await.unwrap();
419            assert_eq!(blob.buffer.read().await.1, 100 + 200 + BUFFER_SIZE as u64);
420
421            // Test resize down and up.
422            blob.resize(50).await.unwrap();
423            assert_eq!(blob.buffer.read().await.1, 50);
424
425            blob.resize(150).await.unwrap();
426            assert_eq!(blob.buffer.read().await.1, 150);
427
428            // Append after resize and sync.
429            blob.append(vec![4u8; 100]).await.unwrap();
430            blob.sync().await.unwrap();
431            assert_eq!(blob.buffer.read().await.1, 250);
432
433            // Close and reopen.
434            let (blob, size) = context
435                .open("test", "blob".as_bytes())
436                .await
437                .expect("Failed to reopen blob");
438
439            let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
440                .await
441                .unwrap();
442            assert_eq!(blob.buffer.read().await.1, 250);
443
444            // Verify data integrity after all operations.
445            let mut buf = vec![0u8; 250];
446            buf = blob.read_at(buf, 0).await.unwrap().into();
447            assert_eq!(&buf[0..50], &vec![1u8; 50][..]);
448            assert_eq!(&buf[50..150], &vec![0u8; 100][..]); // Zeros from resize up to 150
449            assert_eq!(&buf[150..250], &vec![4u8; 100][..]);
450        });
451    }
452}