commonware_runtime/utils/buffer/
append.rs1use crate::{
2 buffer::{tip::Buffer, PoolRef},
3 Blob, Error, RwLock,
4};
5use commonware_utils::{NZUsize, StableBuf};
6use std::{num::NonZeroUsize, sync::Arc};
7
8#[derive(Clone)]
11pub struct Append<B: Blob> {
12 blob: B,
14
15 id: u64,
17
18 pool_ref: PoolRef,
20
21 buffer: Arc<RwLock<(Buffer, u64)>>,
31}
32
33impl<B: Blob> Append<B> {
34 pub async fn new(
37 blob: B,
38 size: u64,
39 buffer_size: NonZeroUsize,
40 pool_ref: PoolRef,
41 ) -> Result<Self, Error> {
42 let mut buffer_size = buffer_size.get();
47 buffer_size = buffer_size.max(pool_ref.page_size * 2);
48
49 let leftover_size = size % pool_ref.page_size as u64;
52 let page_aligned_size = size - leftover_size;
53 let mut buffer = Buffer::new(page_aligned_size, NZUsize!(buffer_size));
54 if leftover_size != 0 {
55 let page_buf = vec![0; leftover_size as usize];
56 let buf = blob.read_at(page_buf, page_aligned_size).await?;
57 assert!(!buffer.append(buf.as_ref()));
58 }
59
60 Ok(Self {
61 blob,
62 id: pool_ref.next_id().await,
63 pool_ref,
64 buffer: Arc::new(RwLock::new((buffer, size))),
65 })
66 }
67
68 pub async fn append(&self, buf: impl Into<StableBuf> + Send) -> Result<(), Error> {
70 let buf = buf.into();
72
73 let (buffer, blob_size) = &mut *self.buffer.write().await;
75
76 buffer
78 .size()
79 .checked_add(buf.len() as u64)
80 .ok_or(Error::OffsetOverflow)?;
81
82 if buffer.append(buf.as_ref()) {
83 return self.flush(buffer, blob_size).await;
85 }
86
87 Ok(())
88 }
89
90 #[allow(clippy::len_without_is_empty)]
94 pub async fn size(&self) -> u64 {
95 self.buffer.read().await.0.size()
96 }
97
98 async fn flush(&self, buffer: &mut Buffer, blob_size: &mut u64) -> Result<(), Error> {
101 let Some((mut buf, offset)) = buffer.take() else {
103 return Ok(());
104 };
105
106 let remaining = self.pool_ref.cache(self.id, &buf, offset).await;
111
112 if remaining != 0 {
115 buffer.offset -= remaining as u64;
116 buffer.data.extend_from_slice(&buf[buf.len() - remaining..])
117 }
118
119 let new_data_start = blob_size.saturating_sub(offset) as usize;
121
122 if new_data_start >= buf.len() {
124 return Ok(());
125 }
126
127 if new_data_start > 0 {
128 buf.drain(0..new_data_start);
129 }
130 let new_data_len = buf.len() as u64;
131 self.blob.write_at(buf, *blob_size).await?;
132 *blob_size += new_data_len;
133
134 Ok(())
135 }
136
137 pub fn clone_blob(&self) -> B {
139 self.blob.clone()
140 }
141}
142
143impl<B: Blob> Blob for Append<B> {
144 async fn read_at(
145 &self,
146 buf: impl Into<StableBuf> + Send,
147 offset: u64,
148 ) -> Result<StableBuf, Error> {
149 let mut buf = buf.into();
151
152 let end_offset = offset
154 .checked_add(buf.len() as u64)
155 .ok_or(Error::OffsetOverflow)?;
156
157 let (buffer, _) = &*self.buffer.read().await;
159
160 if end_offset > buffer.size() {
162 return Err(Error::BlobInsufficientLength);
163 }
164
165 let remaining = buffer.extract(buf.as_mut(), offset);
167 if remaining == 0 {
168 return Ok(buf);
169 }
170
171 self.pool_ref
173 .read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset)
174 .await?;
175
176 Ok(buf)
177 }
178
179 async fn write_at(&self, _buf: impl Into<StableBuf> + Send, _offset: u64) -> Result<(), Error> {
181 unimplemented!("append-only blob type does not support write_at")
184 }
185
186 async fn sync(&self) -> Result<(), Error> {
187 {
188 let (buffer, blob_size) = &mut *self.buffer.write().await;
189 self.flush(buffer, blob_size).await?;
190 }
191 self.blob.sync().await
192 }
193
194 async fn resize(&self, size: u64) -> Result<(), Error> {
196 let (buffer, blob_size) = &mut *self.buffer.write().await;
204
205 self.flush(buffer, blob_size).await?;
208
209 self.blob.resize(size).await?;
211
212 *blob_size = size;
214
215 let leftover_size = size % self.pool_ref.page_size as u64;
217 buffer.offset = size - leftover_size; buffer.data.clear();
219 if leftover_size != 0 {
220 let page_buf = vec![0; leftover_size as usize];
221 let buf = self.blob.read_at(page_buf, buffer.offset).await?;
222 assert!(!buffer.append(buf.as_ref()));
223 }
224
225 Ok(())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::{deterministic, Runner, Storage as _};
233 use commonware_macros::test_traced;
234 use commonware_utils::NZUsize;
235
236 const PAGE_SIZE: usize = 1024;
237 const BUFFER_SIZE: usize = PAGE_SIZE * 2;
238
239 #[test_traced]
240 #[should_panic(expected = "not implemented")]
241 fn test_append_blob_write_panics() {
242 let executor = deterministic::Runner::default();
244 executor.start(|context| async move {
246 let (blob, size) = context
247 .open("test", "blob".as_bytes())
248 .await
249 .expect("Failed to open blob");
250 let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
251 let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
252 .await
253 .unwrap();
254 assert_eq!(blob.size().await, 0);
255 blob.write_at(vec![0], 0).await.unwrap();
256 });
257 }
258
259 #[test_traced]
260 fn test_append_blob_append() {
261 let executor = deterministic::Runner::default();
263 executor.start(|context| async move {
265 let (blob, size) = context
266 .open("test", "blob".as_bytes())
267 .await
268 .expect("Failed to open blob");
269 assert_eq!(size, 0);
270
271 let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
273 let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
274 .await
275 .unwrap();
276 for i in 0..11 {
277 let buf = vec![i as u8; PAGE_SIZE];
278 blob.append(buf).await.unwrap();
279 }
280 assert_eq!(blob.size().await, 11 * PAGE_SIZE as u64);
281
282 blob.sync().await.expect("Failed to sync blob");
283
284 let (blob, size) = context
286 .open("test", "blob".as_bytes())
287 .await
288 .expect("Failed to open blob");
289 assert_eq!(size, 11 * PAGE_SIZE as u64);
290 blob.sync().await.expect("Failed to sync blob");
291 });
292 }
293
294 #[test_traced]
295 fn test_append_blob_read() {
296 let executor = deterministic::Runner::default();
298 executor.start(|context| async move {
300 let (blob, size) = context
301 .open("test", "blob".as_bytes())
302 .await
303 .expect("Failed to open blob");
304 assert_eq!(size, 0);
305
306 let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
307 let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
308 .await
309 .unwrap();
310
311 blob.append(vec![42]).await.unwrap();
313 blob.sync().await.unwrap();
314
315 for i in 0..11 {
317 let buf = vec![i as u8; PAGE_SIZE];
318 blob.append(buf).await.unwrap();
319 }
320
321 let mut buf = vec![0; 100];
323 buf = blob
324 .read_at(buf, 1 + PAGE_SIZE as u64 - 50)
325 .await
326 .unwrap()
327 .into();
328 let mut expected = vec![0; 50];
329 expected.extend_from_slice(&[1; 50]);
330 assert_eq!(buf, expected);
331
332 let mut buf = vec![0; 100];
334 buf = blob
335 .read_at(buf, 1 + (PAGE_SIZE as u64 * 10) - 50)
336 .await
337 .unwrap()
338 .into();
339 let mut expected = vec![9; 50];
340 expected.extend_from_slice(&[10; 50]);
341 assert_eq!(buf, expected);
342
343 let buf_size = PAGE_SIZE * 4;
346 let mut buf = vec![0; buf_size];
347 buf = blob
348 .read_at(buf, blob.size().await - buf_size as u64)
349 .await
350 .unwrap()
351 .into();
352 let mut expected = vec![7; PAGE_SIZE];
353 expected.extend_from_slice(&[8; PAGE_SIZE]);
354 expected.extend_from_slice(&[9; PAGE_SIZE]);
355 expected.extend_from_slice(&[10; PAGE_SIZE]);
356 assert_eq!(buf, expected);
357
358 for i in 0..blob.size().await - 1 {
360 let mut buf = vec![0; 2];
361 buf = blob.read_at(buf, i).await.unwrap().into();
362 let page_num = (i / PAGE_SIZE as u64) as u8;
363 if i == 0 {
364 assert_eq!(buf, &[42, 0]);
365 } else if i % PAGE_SIZE as u64 == 0 {
366 assert_eq!(buf, &[page_num - 1, page_num], "i = {i}");
367 } else {
368 assert_eq!(buf, &[page_num; 2], "i = {i}");
369 }
370 }
371
372 blob.sync().await.unwrap();
374 buf = blob.read_at(vec![0], 0).await.unwrap().into();
375 assert_eq!(buf, &[42]);
376
377 for i in 0..11 {
378 let mut buf = vec![0; PAGE_SIZE];
379 buf = blob
380 .read_at(buf, 1 + i * PAGE_SIZE as u64)
381 .await
382 .unwrap()
383 .into();
384 assert_eq!(buf, &[i as u8; PAGE_SIZE]);
385 }
386
387 blob.sync().await.expect("Failed to sync blob");
388 });
389 }
390
391 #[test_traced]
392 fn test_append_blob_tracks_physical_size() {
393 let executor = deterministic::Runner::default();
394 executor.start(|context| async move {
395 let (blob, size) = context
396 .open("test", "blob".as_bytes())
397 .await
398 .expect("Failed to open blob");
399
400 let pool_ref = PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(10));
401 let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
402 .await
403 .unwrap();
404
405 assert_eq!(blob.buffer.read().await.1, 0);
407
408 blob.append(vec![1u8; 100]).await.unwrap();
410 blob.sync().await.unwrap();
411 assert_eq!(blob.buffer.read().await.1, 100);
412
413 blob.append(vec![2u8; 200]).await.unwrap();
415 assert_eq!(blob.buffer.read().await.1, 100);
416
417 blob.append(vec![3u8; BUFFER_SIZE]).await.unwrap();
419 assert_eq!(blob.buffer.read().await.1, 100 + 200 + BUFFER_SIZE as u64);
420
421 blob.resize(50).await.unwrap();
423 assert_eq!(blob.buffer.read().await.1, 50);
424
425 blob.resize(150).await.unwrap();
426 assert_eq!(blob.buffer.read().await.1, 150);
427
428 blob.append(vec![4u8; 100]).await.unwrap();
430 blob.sync().await.unwrap();
431 assert_eq!(blob.buffer.read().await.1, 250);
432
433 let (blob, size) = context
435 .open("test", "blob".as_bytes())
436 .await
437 .expect("Failed to reopen blob");
438
439 let blob = Append::new(blob, size, NZUsize!(BUFFER_SIZE), pool_ref.clone())
440 .await
441 .unwrap();
442 assert_eq!(blob.buffer.read().await.1, 250);
443
444 let mut buf = vec![0u8; 250];
446 buf = blob.read_at(buf, 0).await.unwrap().into();
447 assert_eq!(&buf[0..50], &vec![1u8; 50][..]);
448 assert_eq!(&buf[50..150], &vec![0u8; 100][..]); assert_eq!(&buf[150..250], &vec![4u8; 100][..]);
450 });
451 }
452}