commonware_runtime/utils/buffer/
append.rs

1use crate::{
2    buffer::{tip::Buffer, PoolRef},
3    Blob, Error, RwLock,
4};
5use commonware_utils::StableBuf;
6use std::sync::Arc;
7
8/// A [Blob] wrapper that supports appending new data that is both read and write cached, and
9/// provides buffer-pool managed read caching of older data.
10#[derive(Clone)]
11pub struct Append<B: Blob> {
12    /// The underlying blob being wrapped.
13    blob: B,
14
15    /// Unique id assigned by the buffer pool.
16    id: u64,
17
18    /// Buffer pool to consult for caching.
19    pool_ref: PoolRef,
20
21    /// The buffer containing the data yet to be appended to the tip of the underlying blob, as well
22    /// as up to the final page_size-1 bytes from the underlying blob (to ensure the buffer's offset
23    /// is always at a page boundary).
24    ///
25    /// # Invariants
26    ///
27    /// - The buffer's `offset` into the blob is always page aligned.
28    /// - The range of bytes in this buffer never overlaps with any page buffered by `pool`. (See
29    ///   the warning in [Self::resize] for one uncommon exception.)
30    buffer: Arc<RwLock<Buffer>>,
31}
32
33impl<B: Blob> Append<B> {
34    /// Create a new [Append] of provided `size` using the provided `pool` for read caching, and a
35    /// write buffer with capacity `buffer_size`.
36    pub async fn new(
37        blob: B,
38        size: u64,
39        mut buffer_size: usize,
40        pool_ref: PoolRef,
41    ) -> Result<Self, Error> {
42        // Set a floor on the write buffer size to make sure we always write at least 1 page of new
43        // data with each flush. We multiply page_size by two here since we could be storing up to
44        // page_size-1 bytes of already written data in the append buffer to maintain page
45        // alignment.
46        buffer_size = buffer_size.max(pool_ref.page_size * 2);
47
48        // Initialize the append buffer to contain the last non-full page of bytes from the blob to
49        // ensure its offset into the blob is always page aligned.
50        let leftover_size = size % pool_ref.page_size as u64;
51        let page_aligned_size = size - leftover_size;
52        let mut buffer = Buffer::new(page_aligned_size, buffer_size);
53        if leftover_size != 0 {
54            let page_buf = vec![0; leftover_size as usize];
55            let buf = blob.read_at(page_buf, page_aligned_size).await?;
56            assert!(!buffer.append(buf.as_ref()));
57        }
58
59        Ok(Self {
60            blob,
61            id: pool_ref.next_id().await,
62            pool_ref,
63            buffer: Arc::new(RwLock::new(buffer)),
64        })
65    }
66
67    /// Append all bytes in `buf` to the tip of the blob.
68    pub async fn append(&self, buf: impl Into<StableBuf> + Send) -> Result<(), Error> {
69        // Prepare `buf` to be written.
70        let buf = buf.into();
71
72        // Acquire a write lock on the buffer.
73        let mut buffer = self.buffer.write().await;
74
75        // Ensure the write doesn't overflow.
76        buffer
77            .size()
78            .checked_add(buf.len() as u64)
79            .ok_or(Error::OffsetOverflow)?;
80
81        if buffer.append(buf.as_ref()) {
82            // Buffer is over capacity, flush it to the underlying blob.
83            return self.flush(&mut buffer).await;
84        }
85
86        Ok(())
87    }
88
89    /// Returns the current logical size of the blob including any buffered data.
90    ///
91    /// This represents the total size of data that would be present after flushing.
92    #[allow(clippy::len_without_is_empty)]
93    pub async fn size(&self) -> u64 {
94        let buffer = self.buffer.read().await;
95        buffer.size()
96    }
97
98    /// Flush the append buffer to the underlying blob, caching each page worth of written data in
99    /// the buffer pool.
100    async fn flush(&self, buffer: &mut Buffer) -> Result<(), Error> {
101        // Take the buffered data, if any.
102        let Some((buf, offset)) = buffer.take() else {
103            return Ok(());
104        };
105
106        // Insert the flushed data into the buffer pool. This step isn't just to ensure recently
107        // written data remains cached for future reads, but is in fact required to purge
108        // potentially stale cache data which might result from the edge the case of rewinding a
109        // blob across a page boundary.
110        let remaining = self.pool_ref.cache(self.id, &buf, offset).await;
111
112        // If there's any data left over that doesn't constitute an entire page, re-buffer it into
113        // the append buffer to maintain its page-boundary alignment.
114        if remaining != 0 {
115            buffer.offset -= remaining as u64;
116            buffer.data.extend_from_slice(&buf[buf.len() - remaining..])
117        }
118
119        // Write the data buffer to the underlying blob.
120        // TODO(https://github.com/commonwarexyz/monorepo/issues/1218): The implementation will
121        // unnecessarily rewrite the last (blob_size % page_size) "trailing bytes" of the underlying
122        // blob since the write's starting offset is always page aligned.
123        self.blob.write_at(buf, offset).await?;
124
125        Ok(())
126    }
127
128    /// Clones and returns the underlying blob.
129    pub fn clone_blob(&self) -> B {
130        self.blob.clone()
131    }
132}
133
134impl<B: Blob> Blob for Append<B> {
135    async fn read_at(
136        &self,
137        buf: impl Into<StableBuf> + Send,
138        offset: u64,
139    ) -> Result<StableBuf, Error> {
140        // Prepare `buf` to capture the read data.
141        let mut buf = buf.into();
142
143        // Ensure the read doesn't overflow.
144        let end_offset = offset
145            .checked_add(buf.len() as u64)
146            .ok_or(Error::OffsetOverflow)?;
147
148        // Acquire a read lock on the buffer.
149        let buffer = self.buffer.read().await;
150
151        // If the data required is beyond the size of the blob, return an error.
152        if end_offset > buffer.size() {
153            return Err(Error::BlobInsufficientLength);
154        }
155
156        // Extract any bytes from the buffer that overlap with the requested range.
157        let remaining = buffer.extract(buf.as_mut(), offset);
158        if remaining == 0 {
159            return Ok(buf);
160        }
161
162        // If there are bytes remaining to be read, use the buffer pool to get them.
163        self.pool_ref
164            .read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset)
165            .await?;
166
167        Ok(buf)
168    }
169
170    /// This [Blob] trait method is unimplemented by [Append] and unconditionally panics.
171    async fn write_at(&self, _buf: impl Into<StableBuf> + Send, _offset: u64) -> Result<(), Error> {
172        // TODO(<https://github.com/commonwarexyz/monorepo/issues/1207>): Extend the buffer pool to
173        // support arbitrary writes.
174        unimplemented!("append-only blob type does not support write_at")
175    }
176
177    async fn sync(&self) -> Result<(), Error> {
178        {
179            let mut buffer = self.buffer.write().await;
180            self.flush(&mut buffer).await?;
181        }
182        self.blob.sync().await
183    }
184
185    /// Resize the blob to the provided `size`.
186    async fn resize(&self, size: u64) -> Result<(), Error> {
187        // Implementation note: rewinding the blob across a page boundary potentially results in
188        // stale data remaining in the buffer pool's cache. We don't proactively purge the data
189        // within this function since it would be inaccessible anyway. Instead we ensure it is
190        // always updated should the blob grow back to the point where we have new data for the same
191        // page, if any old data hasn't expired naturally by then.
192
193        // Acquire a write lock on the buffer.
194        let mut buffer = self.buffer.write().await;
195
196        // Flush any buffered bytes to the underlying blob. (Note that a fancier implementation
197        // might avoid flushing those bytes that are backed up over by the next step, if any.)
198        self.flush(&mut buffer).await?;
199
200        // Resize the underlying blob.
201        self.blob.resize(size).await?;
202
203        // Reset the append buffer to the new size, ensuring its page alignment.
204        let leftover_size = size % self.pool_ref.page_size as u64;
205        buffer.offset = size - leftover_size; // page aligned size
206        buffer.data.clear();
207        if leftover_size != 0 {
208            let page_buf = vec![0; leftover_size as usize];
209            let buf = self.blob.read_at(page_buf, buffer.offset).await?;
210            assert!(!buffer.append(buf.as_ref()));
211        }
212
213        Ok(())
214    }
215
216    async fn close(self) -> Result<(), Error> {
217        self.sync().await?;
218        self.blob.close().await
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use crate::{deterministic, Runner, Storage as _};
226    use commonware_macros::test_traced;
227
228    const PAGE_SIZE: usize = 1024;
229    const BUFFER_SIZE: usize = PAGE_SIZE * 2;
230
231    #[test_traced]
232    #[should_panic(expected = "not implemented")]
233    fn test_append_blob_write_panics() {
234        // Initialize the deterministic context
235        let executor = deterministic::Runner::default();
236        // Start the test within the executor
237        executor.start(|context| async move {
238            let (blob, size) = context
239                .open("test", "blob".as_bytes())
240                .await
241                .expect("Failed to open blob");
242            let pool_ref = PoolRef::new(PAGE_SIZE, 10);
243            let blob = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
244                .await
245                .unwrap();
246            assert_eq!(blob.size().await, 0);
247            blob.write_at(vec![0], 0).await.unwrap();
248        });
249    }
250
251    #[test_traced]
252    fn test_append_blob_append() {
253        // Initialize the deterministic context
254        let executor = deterministic::Runner::default();
255        // Start the test within the executor
256        executor.start(|context| async move {
257            let (blob, size) = context
258                .open("test", "blob".as_bytes())
259                .await
260                .expect("Failed to open blob");
261            assert_eq!(size, 0);
262
263            // Wrap the blob, then append 11 consecutive pages of data.
264            let pool_ref = PoolRef::new(PAGE_SIZE, 10);
265            let blob = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
266                .await
267                .unwrap();
268            for i in 0..11 {
269                let buf = vec![i as u8; PAGE_SIZE];
270                blob.append(buf).await.unwrap();
271            }
272            assert_eq!(blob.size().await, 11 * PAGE_SIZE as u64);
273
274            blob.close().await.expect("Failed to close blob");
275
276            // Make sure blob has expected size when reopened.
277            let (blob, size) = context
278                .open("test", "blob".as_bytes())
279                .await
280                .expect("Failed to open blob");
281            assert_eq!(size, 11 * PAGE_SIZE as u64);
282            blob.close().await.expect("Failed to close blob");
283        });
284    }
285
286    #[test_traced]
287    fn test_append_blob_read() {
288        // Initialize the deterministic context
289        let executor = deterministic::Runner::default();
290        // Start the test within the executor
291        executor.start(|context| async move {
292            let (blob, size) = context
293                .open("test", "blob".as_bytes())
294                .await
295                .expect("Failed to open blob");
296            assert_eq!(size, 0);
297
298            let pool_ref = PoolRef::new(PAGE_SIZE, 10);
299            let blob = Append::new(blob, size, BUFFER_SIZE, pool_ref.clone())
300                .await
301                .unwrap();
302
303            // Append one byte & sync to ensure we have "trailing bytes".
304            blob.append(vec![42]).await.unwrap();
305            blob.sync().await.unwrap();
306
307            // Append 11 consecutive pages of data.
308            for i in 0..11 {
309                let buf = vec![i as u8; PAGE_SIZE];
310                blob.append(buf).await.unwrap();
311            }
312
313            // Read from the blob across a page boundary but well outside any write buffered data.
314            let mut buf = vec![0; 100];
315            buf = blob
316                .read_at(buf, 1 + PAGE_SIZE as u64 - 50)
317                .await
318                .unwrap()
319                .into();
320            let mut expected = vec![0; 50];
321            expected.extend_from_slice(&[1; 50]);
322            assert_eq!(buf, expected);
323
324            // Read from the blob across a page boundary but within the write buffered data.
325            let mut buf = vec![0; 100];
326            buf = blob
327                .read_at(buf, 1 + (PAGE_SIZE as u64 * 10) - 50)
328                .await
329                .unwrap()
330                .into();
331            let mut expected = vec![9; 50];
332            expected.extend_from_slice(&[10; 50]);
333            assert_eq!(buf, expected);
334
335            // Read across read-only and write-buffered section, all the way up to the very last
336            // byte.
337            let buf_size = PAGE_SIZE * 4;
338            let mut buf = vec![0; buf_size];
339            buf = blob
340                .read_at(buf, blob.size().await - buf_size as u64)
341                .await
342                .unwrap()
343                .into();
344            let mut expected = vec![7; PAGE_SIZE];
345            expected.extend_from_slice(&[8; PAGE_SIZE]);
346            expected.extend_from_slice(&[9; PAGE_SIZE]);
347            expected.extend_from_slice(&[10; PAGE_SIZE]);
348            assert_eq!(buf, expected);
349
350            // Exercise more boundary conditions by reading every possible 2-byte slice.
351            for i in 0..blob.size().await - 1 {
352                let mut buf = vec![0; 2];
353                buf = blob.read_at(buf, i).await.unwrap().into();
354                let page_num = (i / PAGE_SIZE as u64) as u8;
355                if i == 0 {
356                    assert_eq!(buf, &[42, 0]);
357                } else if i % PAGE_SIZE as u64 == 0 {
358                    assert_eq!(buf, &[page_num - 1, page_num], "i = {i}");
359                } else {
360                    assert_eq!(buf, &[page_num; 2], "i = {i}");
361                }
362            }
363
364            // Confirm all bytes are as expected after syncing the blob.
365            blob.sync().await.unwrap();
366            buf = blob.read_at(vec![0], 0).await.unwrap().into();
367            assert_eq!(buf, &[42]);
368
369            for i in 0..11 {
370                let mut buf = vec![0; PAGE_SIZE];
371                buf = blob
372                    .read_at(buf, 1 + i * PAGE_SIZE as u64)
373                    .await
374                    .unwrap()
375                    .into();
376                assert_eq!(buf, &[i as u8; PAGE_SIZE]);
377            }
378
379            blob.close().await.expect("Failed to close blob");
380        });
381    }
382}