1use super::Error;
52use bytes::BufMut;
53use commonware_cryptography::Array;
54use commonware_runtime::{Blob, Error as RError, Storage};
55use commonware_utils::{hex, SizedSerialize};
56use futures::stream::{self, Stream, StreamExt};
57use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
58use prometheus_client::registry::Registry;
59use std::collections::BTreeMap;
60use std::marker::PhantomData;
61use std::sync::{Arc, Mutex};
62use tracing::{debug, trace, warn};
63
64#[derive(Clone)]
66pub struct Config {
67 pub registry: Arc<Mutex<Registry>>,
69
70 pub partition: String,
72
73 pub items_per_blob: u64,
78}
79
80pub struct Journal<B: Blob, E: Storage<B>, A: Array> {
82 runtime: E,
83 cfg: Config,
84
85 blobs: BTreeMap<u64, B>,
88
89 tracked: Gauge,
90 synced: Counter,
91 pruned: Counter,
92
93 _array: PhantomData<A>,
94}
95
96impl<B: Blob, E: Storage<B>, A: Array> Journal<B, E, A> {
97 const CHUNK_SIZE: usize = u32::SERIALIZED_LEN + A::SERIALIZED_LEN;
98
99 pub async fn init(runtime: E, cfg: Config) -> Result<Self, Error> {
104 let mut blobs = BTreeMap::new();
106 let stored_blobs = match runtime.scan(&cfg.partition).await {
107 Ok(blobs) => blobs,
108 Err(RError::PartitionMissing(_)) => Vec::new(),
109 Err(err) => return Err(Error::Runtime(err)),
110 };
111 for name in stored_blobs {
112 let blob = runtime
113 .open(&cfg.partition, &name)
114 .await
115 .map_err(Error::Runtime)?;
116 let index = match name.try_into() {
117 Ok(index) => u64::from_be_bytes(index),
118 Err(nm) => return Err(Error::InvalidBlobName(hex(&nm))),
119 };
120 debug!(blob = index, "loaded blob");
121 blobs.insert(index, blob);
122 }
123 if !blobs.is_empty() {
124 let mut it = blobs.keys();
126 let mut previous_index = *it.next().unwrap();
127 for index in it {
128 if *index != previous_index + 1 {
129 return Err(Error::MissingBlob(previous_index + 1));
130 }
131 previous_index = *index;
132 }
133 } else {
134 debug!("no blobs found");
135 let blob = runtime.open(&cfg.partition, &0u64.to_be_bytes()).await?;
136 blobs.insert(0, blob);
137 }
138
139 let tracked = Gauge::default();
141 let synced = Counter::default();
142 let pruned = Counter::default();
143 {
144 let mut registry = cfg.registry.lock().unwrap();
145 registry.register("tracked", "Number of blobs", tracked.clone());
146 registry.register("synced", "Number of syncs", synced.clone());
147 registry.register("pruned", "Number of blobs pruned", pruned.clone());
148 }
149 tracked.set(blobs.len() as i64);
150
151 let newest_blob = blobs.last_key_value().unwrap().1;
154 let blob_len: u64 = newest_blob.len().await?;
155 if blob_len % Self::CHUNK_SIZE as u64 != 0 {
156 warn!(
157 "last blob len ({}) is not a multiple of item size, truncating",
158 blob_len
159 );
160 newest_blob
161 .truncate(blob_len - blob_len % Self::CHUNK_SIZE as u64)
162 .await?;
163 newest_blob.sync().await?;
164 }
165
166 Ok(Self {
167 runtime,
168 cfg,
169 blobs,
170 tracked,
171 synced,
172 pruned,
173
174 _array: PhantomData,
175 })
176 }
177
178 pub async fn sync(&mut self) -> Result<(), Error> {
180 self.synced.inc();
181 let newest_blob = self.newest_blob();
182 debug!("syncing blob {}", newest_blob.0);
183 self.newest_blob().1.sync().await.map_err(Error::Runtime)
184 }
185
186 pub async fn size(&self) -> Result<u64, Error> {
189 let newest_blob = self.newest_blob();
190 let blob_len = newest_blob.1.len().await?;
191 assert_eq!(blob_len % Self::CHUNK_SIZE as u64, 0);
192 let items_in_blob = blob_len / Self::CHUNK_SIZE as u64;
193 Ok(items_in_blob + self.cfg.items_per_blob * newest_blob.0)
194 }
195
196 pub async fn append(&mut self, item: A) -> Result<u64, Error> {
199 let mut newest_blob = self.newest_blob();
200
201 let mut blob_len = newest_blob.1.len().await?;
202 assert_eq!(blob_len % Self::CHUNK_SIZE as u64, 0);
203 let items_in_blob = blob_len / Self::CHUNK_SIZE as u64;
204
205 if items_in_blob >= self.cfg.items_per_blob {
207 let next_blob_index = newest_blob.0 + 1;
208 debug!("creating next blob {}", next_blob_index);
209 assert_eq!(items_in_blob, self.cfg.items_per_blob);
210 newest_blob.1.sync().await?;
212 let next_blob = self
213 .runtime
214 .open(&self.cfg.partition, &next_blob_index.to_be_bytes())
215 .await?;
216 assert!(self.blobs.insert(next_blob_index, next_blob).is_none());
217 newest_blob = self.newest_blob();
218 self.tracked.inc();
219 blob_len = 0;
220 }
221
222 let mut buf: Vec<u8> = Vec::with_capacity(Self::CHUNK_SIZE);
223 let checksum = crc32fast::hash(&item);
224 buf.extend_from_slice(&item);
225 buf.put_u32(checksum);
226
227 let item_position = blob_len / Self::CHUNK_SIZE as u64;
228 newest_blob.1.write_at(&buf, blob_len).await?;
229 trace!(
230 blob = newest_blob.0,
231 position = item_position,
232 "appended item"
233 );
234 Ok(item_position + self.cfg.items_per_blob * newest_blob.0)
235 }
236
237 pub async fn rewind(&mut self, journal_size: u64) -> Result<(), Error> {
241 let size = self.size().await?;
242 match journal_size.cmp(&size) {
243 std::cmp::Ordering::Greater => return Err(Error::InvalidRewind(journal_size)),
244 std::cmp::Ordering::Equal => return Ok(()),
245 std::cmp::Ordering::Less => {}
246 }
247 let rewind_to_size = journal_size;
248 let rewind_to_blob_index = rewind_to_size / self.cfg.items_per_blob;
249 if rewind_to_blob_index < self.oldest_blob().0 {
250 return Err(Error::ItemPruned(rewind_to_size));
251 }
252
253 let mut current_blob_index = self.newest_blob().0;
254
255 while current_blob_index > rewind_to_blob_index {
257 let blob = match self.blobs.remove(¤t_blob_index) {
258 Some(blob) => blob,
259 None => return Err(Error::MissingBlob(current_blob_index)),
260 };
261 blob.close().await?;
262 self.runtime
263 .remove(&self.cfg.partition, Some(¤t_blob_index.to_be_bytes()))
264 .await?;
265 debug!(blob = current_blob_index, "unwound over blob");
266 self.tracked.dec();
267 current_blob_index -= 1;
268 }
269
270 let rewind_blob = match self.blobs.get_mut(&rewind_to_blob_index) {
272 Some(blob) => blob,
273 None => return Err(Error::MissingBlob(rewind_to_blob_index)),
274 };
275 let rewind_to_offset = (rewind_to_size % self.cfg.items_per_blob) * Self::CHUNK_SIZE as u64;
276 rewind_blob.truncate(rewind_to_offset).await?;
277 Ok(())
278 }
279
280 pub async fn read(&self, item_position: u64) -> Result<A, Error> {
282 let blob_index = item_position / self.cfg.items_per_blob;
283
284 let blob = match self.blobs.get(&blob_index) {
285 Some(blob) => blob,
286 None => {
287 let newest_blob = self.newest_blob();
288 if blob_index > newest_blob.0 {
289 return Err(Error::InvalidItem(item_position));
290 }
291 assert!(blob_index < self.oldest_blob().0);
292 return Err(Error::ItemPruned(item_position));
293 }
294 };
295
296 let item_index = item_position % self.cfg.items_per_blob;
297 let offset = item_index * Self::CHUNK_SIZE as u64;
298 let mut buf = vec![0u8; Self::CHUNK_SIZE];
299 blob.read_at(&mut buf, offset).await?;
300
301 Self::verify_integrity(&buf)
303 }
304
305 fn verify_integrity(buf: &[u8]) -> Result<A, Error> {
306 let stored_checksum = u32::from_be_bytes(buf[A::SERIALIZED_LEN..].try_into().unwrap());
307 let checksum = crc32fast::hash(&buf[..A::SERIALIZED_LEN]);
308 if checksum != stored_checksum {
309 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
310 }
311 Ok(buf[..A::SERIALIZED_LEN].try_into().unwrap())
312 }
313
314 pub async fn replay(
326 &mut self,
327 concurrency: usize,
328 ) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
329 let mut blobs = Vec::with_capacity(self.blobs.len());
331 for (index, blob) in self.blobs.iter() {
332 let blob_len = {
333 if *index == (self.blobs.len() - 1) as u64 {
334 blob.len().await?
335 } else {
336 self.cfg.items_per_blob * Self::CHUNK_SIZE as u64
337 }
338 };
339 blobs.push((index, blob, blob_len));
340 }
341
342 let items_per_blob = self.cfg.items_per_blob;
345 Ok(stream::iter(blobs)
346 .map(move |(index, blob, blob_len)| async move {
347 stream::unfold(
348 (index, blob, 0u64),
349 move |(index, blob, offset)| async move {
350 if offset == blob_len {
352 return None;
353 }
354 let mut buf = vec![0u8; Self::CHUNK_SIZE];
356 let item = blob.read_at(&mut buf, offset).await.map_err(Error::Runtime);
357 let next_offset = offset + Self::CHUNK_SIZE as u64;
358 match item {
359 Ok(_) => match Self::verify_integrity(&buf) {
360 Ok(item) => Some((
361 Ok((
362 items_per_blob * *index + offset / Self::CHUNK_SIZE as u64,
363 item,
364 )),
365 (index, blob, next_offset),
366 )),
367 Err(err) => Some((Err(err), (index, blob, next_offset))),
368 },
369 Err(err) => Some((Err(err), (index, blob, blob_len))),
370 }
371 },
372 )
373 })
374 .buffer_unordered(concurrency)
375 .flatten())
376 }
377
378 fn newest_blob(&self) -> (u64, &B) {
380 if let Some((index, blob)) = self.blobs.last_key_value() {
381 return (*index, blob);
382 }
383 panic!("no blobs found");
384 }
385
386 fn oldest_blob(&self) -> (u64, &B) {
388 if let Some((index, blob)) = self.blobs.first_key_value() {
389 return (*index, blob);
390 }
391 panic!("no blobs found");
392 }
393
394 pub async fn prune(&mut self, min_item_position: u64) -> Result<(), Error> {
397 let oldest_blob = self.oldest_blob().0;
398 let mut new_oldest_blob = min_item_position / self.cfg.items_per_blob;
399 if new_oldest_blob <= oldest_blob {
400 return Ok(());
402 }
403 let newest_blob = self.newest_blob();
405 if new_oldest_blob >= newest_blob.0 {
406 new_oldest_blob = newest_blob.0
407 }
408
409 for index in oldest_blob..new_oldest_blob {
410 let blob = self.blobs.remove(&index).unwrap();
411 blob.close().await?;
413 self.runtime
414 .remove(&self.cfg.partition, Some(&index.to_be_bytes()))
415 .await?;
416 debug!(blob = index, "pruned blob");
417 self.pruned.inc();
418 self.tracked.dec();
419 }
420 Ok(())
421 }
422
423 pub async fn close(self) -> Result<(), Error> {
425 for (i, blob) in self.blobs.into_iter() {
426 blob.close().await?;
427 debug!(blob = i, "closed blob");
428 }
429 Ok(())
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436 use commonware_cryptography::{hash, sha256::Digest};
437 use commonware_macros::test_traced;
438 use commonware_runtime::{deterministic::Executor, Blob, Runner, Storage};
439 use futures::{pin_mut, StreamExt};
440 use prometheus_client::encoding::text::encode;
441
442 fn test_digest(value: u64) -> Digest {
444 hash(&value.to_be_bytes())
445 }
446
447 #[test_traced]
448 fn test_fixed_journal_append_and_prune() {
449 let (executor, context, _) = Executor::default();
451
452 executor.start(async move {
454 let cfg = Config {
456 registry: Arc::new(Mutex::new(Registry::default())),
457 partition: "test_partition".into(),
458 items_per_blob: 2,
459 };
460 let mut journal = Journal::init(context.clone(), cfg.clone())
461 .await
462 .expect("failed to initialize journal");
463
464 let mut buffer = String::new();
465 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
466 assert!(buffer.contains("tracked 1"));
467
468 let mut position = journal
470 .append(test_digest(0))
471 .await
472 .expect("failed to append data 0");
473 assert_eq!(position, 0);
474
475 journal.close().await.expect("Failed to close journal");
477
478 let cfg = Config {
480 registry: Arc::new(Mutex::new(Registry::default())),
481 partition: "test_partition".into(),
482 items_per_blob: 2,
483 };
484 let mut journal = Journal::init(context, cfg.clone())
485 .await
486 .expect("failed to re-initialize journal");
487
488 position = journal
490 .append(test_digest(1))
491 .await
492 .expect("failed to append data 1");
493 assert_eq!(position, 1);
494 position = journal
495 .append(test_digest(2))
496 .await
497 .expect("failed to append data 2");
498 assert_eq!(position, 2);
499 let mut buffer = String::new();
500 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
501 assert!(buffer.contains("tracked 2"));
502
503 let item0 = journal.read(0).await.expect("failed to read data 0");
505 assert_eq!(item0, test_digest(0));
506 let item1 = journal.read(1).await.expect("failed to read data 1");
507 assert_eq!(item1, test_digest(1));
508 let item2 = journal.read(2).await.expect("failed to read data 2");
509 assert_eq!(item2, test_digest(2));
510 let err = journal.read(3).await.expect_err("expected read to fail");
511 assert!(matches!(err, Error::Runtime(_)));
512 let err = journal.read(400).await.expect_err("expected read to fail");
513 assert!(matches!(err, Error::InvalidItem(x) if x == 400));
514
515 journal.sync().await.expect("failed to sync journal");
517 let mut buffer = String::new();
518 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
519 assert!(buffer.contains("synced_total 1"));
520
521 journal.prune(1).await.expect("failed to prune journal 1");
523 let mut buffer = String::new();
524 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
525 assert!(buffer.contains("tracked 2"));
526
527 journal.prune(2).await.expect("failed to prune journal 2");
529 let mut buffer = String::new();
530 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
531 assert!(buffer.contains("tracked 1"));
532 assert!(buffer.contains("pruned_total 1"));
533
534 let result0 = journal.read(0).await;
536 assert!(matches!(result0, Err(Error::ItemPruned(0))));
537 let result1 = journal.read(1).await;
538 assert!(matches!(result1, Err(Error::ItemPruned(1))));
539
540 let result2 = journal.read(2).await.unwrap();
542 assert_eq!(result2, test_digest(2));
543
544 for i in 3..10 {
546 let position = journal
547 .append(test_digest(i))
548 .await
549 .expect("failed to append data");
550 assert_eq!(position, i);
551 }
552
553 journal
555 .prune(0)
556 .await
557 .expect("failed to no-op prune the journal");
558 assert_eq!(journal.oldest_blob().0, 1);
559 assert_eq!(journal.newest_blob().0, 4);
560
561 journal
563 .prune(3 * cfg.items_per_blob)
564 .await
565 .expect("failed to prune journal 2");
566 let mut buffer = String::new();
567 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
568 assert_eq!(journal.oldest_blob().0, 3);
569 assert_eq!(journal.newest_blob().0, 4);
570 assert!(buffer.contains("tracked 2"));
571 assert!(buffer.contains("pruned_total 3"));
572
573 journal
576 .prune(10000)
577 .await
578 .expect("failed to max-prune journal");
579 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
580 assert_eq!(journal.size().await.unwrap(), 10);
581 assert_eq!(journal.oldest_blob().0, 4);
582 assert_eq!(journal.newest_blob().0, 4);
583 assert!(buffer.contains("tracked 1"));
584 assert!(buffer.contains("pruned_total 4"));
585
586 let stream = journal.replay(1).await.expect("failed to replay journal");
587 pin_mut!(stream);
588 let mut items = Vec::new();
589 while let Some(result) = stream.next().await {
590 match result {
591 Ok((position, item)) => {
592 assert_eq!(test_digest(position), item);
593 items.push(position);
594 }
595 Err(err) => panic!("Failed to read item: {}", err),
596 }
597 }
598 assert_eq!(items, vec![8u64, 9u64]);
599 });
600 }
601
602 #[test_traced]
603 fn test_fixed_journal_replay() {
604 const ITEMS_PER_BLOB: u64 = 7;
605 let (executor, context, _) = Executor::default();
607
608 executor.start(async move {
610 let cfg = Config {
612 registry: Arc::new(Mutex::new(Registry::default())),
613 partition: "test_partition".into(),
614 items_per_blob: ITEMS_PER_BLOB,
615 };
616 let mut journal = Journal::init(context.clone(), cfg.clone())
617 .await
618 .expect("failed to initialize journal");
619
620 for i in 0u64..(ITEMS_PER_BLOB * 100 + ITEMS_PER_BLOB / 2) {
622 let position = journal
623 .append(test_digest(i))
624 .await
625 .expect("failed to append data");
626 assert_eq!(position, i);
627 }
628
629 let mut buffer = String::new();
630 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
631 assert!(buffer.contains("tracked 101"));
632
633 {
635 let stream = journal.replay(10).await.expect("failed to replay journal");
636 let mut items = Vec::new();
637 pin_mut!(stream);
638 while let Some(result) = stream.next().await {
639 match result {
640 Ok((position, item)) => {
641 assert_eq!(test_digest(position), item);
642 items.push(position);
643 }
644 Err(err) => panic!("Failed to read item: {}", err),
645 }
646 }
647
648 assert_eq!(
650 items.len(),
651 ITEMS_PER_BLOB as usize * 100 + ITEMS_PER_BLOB as usize / 2
652 );
653 items.sort();
654 for (i, position) in items.iter().enumerate() {
655 assert_eq!(i as u64, *position);
656 }
657 }
658 journal.close().await.expect("Failed to close journal");
659
660 let checksum_offset = Digest::SERIALIZED_LEN as u64
662 + (ITEMS_PER_BLOB / 2) * (Digest::SERIALIZED_LEN + u32::SERIALIZED_LEN) as u64;
663 let blob = context
664 .open(&cfg.partition, &40u64.to_be_bytes())
665 .await
666 .expect("Failed to open blob");
667 let bad_checksum = 123456789u32;
669 blob.write_at(&bad_checksum.to_be_bytes(), checksum_offset)
670 .await
671 .expect("Failed to write incorrect checksum");
672 let corrupted_item_position = 40 * ITEMS_PER_BLOB + ITEMS_PER_BLOB / 2;
673 blob.close().await.expect("Failed to close blob");
674
675 let mut journal = Journal::init(context.clone(), cfg.clone())
677 .await
678 .expect("Failed to re-initialize journal");
679 let err = journal.read(corrupted_item_position).await.unwrap_err();
680 assert!(matches!(err, Error::ChecksumMismatch(x, _) if x == bad_checksum));
681
682 {
684 let stream = journal.replay(10).await.expect("failed to replay journal");
685 let mut items = Vec::new();
686 pin_mut!(stream);
687 let mut error_count = 0;
688 while let Some(result) = stream.next().await {
689 match result {
690 Ok((position, item)) => {
691 assert_eq!(test_digest(position), item);
692 items.push(position);
693 }
694 Err(err) => {
695 error_count += 1;
696 assert!(matches!(err, Error::ChecksumMismatch(_, _)));
697 }
698 }
699 }
700 assert_eq!(error_count, 1);
701 assert_eq!(
703 items.len(),
704 ITEMS_PER_BLOB as usize * 100 + ITEMS_PER_BLOB as usize / 2 - 1
705 );
706 }
707 journal.close().await.expect("Failed to close journal");
708
709 let blob = context
712 .open(&cfg.partition, &40u64.to_be_bytes())
713 .await
714 .expect("Failed to open blob");
715 blob.truncate(checksum_offset)
717 .await
718 .expect("Failed to corrupt blob");
719 blob.close().await.expect("Failed to close blob");
720
721 let mut journal = Journal::init(context.clone(), cfg.clone())
723 .await
724 .expect("Failed to re-initialize journal");
725 let err = journal.read(corrupted_item_position).await.unwrap_err();
726 assert!(matches!(err, Error::Runtime(_)));
727
728 {
730 let stream = journal.replay(10).await.expect("failed to replay journal");
731 let mut items = Vec::new();
732 pin_mut!(stream);
733 let mut error_count = 0;
734 while let Some(result) = stream.next().await {
735 match result {
736 Ok((position, item)) => {
737 assert_eq!(test_digest(position), item);
738 items.push(position);
739 }
740 Err(err) => {
741 error_count += 1;
742 assert!(matches!(err, Error::Runtime(_)));
743 }
744 }
745 }
746 assert_eq!(error_count, 1);
747 assert_eq!(
749 items.len(),
750 ITEMS_PER_BLOB as usize * 100 + ITEMS_PER_BLOB as usize / 2 - 4
751 );
752 }
753
754 context
756 .remove(&cfg.partition, Some(&40u64.to_be_bytes()))
757 .await
758 .expect("Failed to open blob");
759 let result = Journal::<_, _, Digest>::init(context.clone(), cfg.clone()).await;
761 assert!(matches!(result.err().unwrap(), Error::MissingBlob(n) if n == 40));
762 });
763 }
764
765 #[test_traced]
766 fn test_fixed_journal_recover_from_partial_write() {
767 let (executor, context, _) = Executor::default();
769
770 executor.start(async move {
772 let cfg = Config {
774 registry: Arc::new(Mutex::new(Registry::default())),
775 partition: "test_partition".into(),
776 items_per_blob: 2,
777 };
778 let mut journal = Journal::init(context.clone(), cfg.clone())
779 .await
780 .expect("failed to initialize journal");
781 for i in 0..4 {
782 journal
783 .append(test_digest(i))
784 .await
785 .expect("failed to append data");
786 }
787 assert_eq!(journal.size().await.unwrap(), 4);
788 let mut buffer = String::new();
789 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
790 assert!(buffer.contains("tracked 2"));
791 journal.close().await.expect("Failed to close journal");
792
793 let blob = context
795 .open(&cfg.partition, &1u64.to_be_bytes())
796 .await
797 .expect("Failed to open blob");
798 let blob_len = blob.len().await.expect("Failed to get blob length");
799 blob.truncate(blob_len - 1)
801 .await
802 .expect("Failed to corrupt blob");
803 blob.close().await.expect("Failed to close blob");
804
805 let journal = Journal::<_, _, Digest>::init(context.clone(), cfg.clone())
807 .await
808 .expect("Failed to re-initialize journal");
809 assert_eq!(journal.size().await.unwrap(), 3);
811 let mut buffer = String::new();
812 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
813 assert!(buffer.contains("tracked 2"));
814 });
815 }
816
817 #[test_traced]
818 fn test_fixed_journal_rewinding() {
819 let (executor, context, _) = Executor::default();
820 executor.start(async move {
821 let cfg = Config {
823 registry: Arc::new(Mutex::new(Registry::default())),
824 partition: "test_partition".into(),
825 items_per_blob: 2,
826 };
827 let mut journal = Journal::init(context.clone(), cfg.clone())
828 .await
829 .expect("failed to initialize journal");
830 assert!(matches!(journal.rewind(0).await, Ok(())));
831 assert!(matches!(
832 journal.rewind(1).await,
833 Err(Error::InvalidRewind(1))
834 ));
835 let mut buffer = String::new();
836
837 journal
839 .append(test_digest(0))
840 .await
841 .expect("failed to append data 0");
842 assert_eq!(journal.size().await.unwrap(), 1);
843 assert!(matches!(journal.rewind(1).await, Ok(()))); assert!(matches!(journal.rewind(0).await, Ok(())));
845 assert_eq!(journal.size().await.unwrap(), 0);
846
847 for i in 0..7 {
849 let pos = journal
850 .append(test_digest(i))
851 .await
852 .expect("failed to append data");
853 assert_eq!(pos, i);
854 }
855 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
856 assert!(buffer.contains("tracked 4"));
857 assert_eq!(journal.size().await.unwrap(), 7);
858
859 assert!(matches!(journal.rewind(4).await, Ok(())));
861 assert_eq!(journal.size().await.unwrap(), 4);
862 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
863 assert!(buffer.contains("tracked 3"));
864
865 assert!(matches!(journal.rewind(0).await, Ok(())));
867 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
868 assert!(buffer.contains("tracked 1"));
869 assert_eq!(journal.size().await.unwrap(), 0);
870
871 for _ in 0..10 {
873 for i in 0..100 {
874 journal
875 .append(test_digest(i))
876 .await
877 .expect("failed to append data");
878 }
879 journal
880 .rewind(journal.size().await.unwrap() - 49)
881 .await
882 .unwrap();
883 }
884 const ITEMS_REMAINING: u64 = 10 * (100 - 49);
885 assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
886
887 journal.close().await.expect("Failed to close journal");
888
889 let cfg = Config {
891 registry: Arc::new(Mutex::new(Registry::default())),
892 partition: "test_partition_2".into(),
893 items_per_blob: 3,
894 };
895 let mut journal = Journal::init(context.clone(), cfg.clone())
896 .await
897 .expect("failed to initialize journal");
898 for _ in 0..10 {
899 for i in 0..100 {
900 journal
901 .append(test_digest(i))
902 .await
903 .expect("failed to append data");
904 }
905 journal
906 .rewind(journal.size().await.unwrap() - 49)
907 .await
908 .unwrap();
909 }
910 assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
911
912 journal.close().await.expect("Failed to close journal");
913
914 let mut journal: Journal<_, _, Digest> = Journal::init(context.clone(), cfg.clone())
916 .await
917 .expect("failed to re-initialize journal");
918 assert_eq!(journal.size().await.unwrap(), 10 * (100 - 49));
919
920 journal.prune(300).await.expect("failed to prune journal");
922 assert_eq!(journal.size().await.unwrap(), ITEMS_REMAINING);
923 assert!(matches!(
924 journal.rewind(299).await,
925 Err(Error::ItemPruned(299))
926 ));
927 assert!(matches!(journal.rewind(301).await, Ok(())));
928 assert_eq!(journal.size().await.unwrap(), 301);
929 encode(&mut buffer, &cfg.registry.lock().unwrap()).unwrap();
930 assert!(buffer.contains("tracked 1"));
931 assert!(matches!(journal.rewind(300).await, Ok(())));
932 assert!(matches!(
933 journal.rewind(299).await,
934 Err(Error::ItemPruned(299))
935 ));
936 });
937 }
938}