libsql_wal/replication/
storage.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use fst::{IntoStreamer, Streamer};
5use libsql_sys::name::NamespaceName;
6use roaring::RoaringBitmap;
7use tokio_stream::Stream;
8use zerocopy::FromZeroes;
9
10use crate::io::buf::ZeroCopyBoxIoBuf;
11use crate::segment::Frame;
12use crate::storage::backend::FindSegmentReq;
13use crate::storage::Storage;
14
15use super::Result;
16
17pub trait ReplicateFromStorage: Sync + Send + 'static {
18 fn stream<'a, 'b>(
19 &'b self,
20 seen: &'a mut RoaringBitmap,
21 current: u64,
22 until: u64,
23 ) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + 'a + Send>>;
24}
25
26pub struct StorageReplicator<S> {
27 storage: Arc<S>,
28 namespace: NamespaceName,
29}
30
31impl<S> StorageReplicator<S> {
32 pub fn new(storage: Arc<S>, namespace: NamespaceName) -> Self {
33 Self { storage, namespace }
34 }
35}
36
37impl<S> ReplicateFromStorage for StorageReplicator<S>
38where
39 S: Storage,
40{
41 fn stream<'a, 'b>(
42 &'b self,
43 seen: &'a mut roaring::RoaringBitmap,
44 mut current: u64,
45 until: u64,
46 ) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + Send + 'a>> {
47 let storage = self.storage.clone();
48 let namespace = self.namespace.clone();
49 Box::pin(async_stream::try_stream! {
50 loop {
51 let key = storage.find_segment(&namespace, FindSegmentReq::EndFrameNoLessThan(current), None).await?;
52 let index = storage.fetch_segment_index(&namespace, &key, None).await?;
53 let mut pages = index.into_stream();
54 let mut maybe_seg = None;
55 while let Some((page, offset)) = pages.next() {
56 let page = u32::from_be_bytes(page.try_into().unwrap());
57 if !seen.contains(page) {
59 seen.insert(page);
60 let segment = match maybe_seg {
61 Some(ref seg) => seg,
62 None => {
63 maybe_seg = Some(storage.fetch_segment_data(&namespace, &key, None).await?);
64 maybe_seg.as_ref().unwrap()
65 },
66 };
67
68 let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await;
69 ret?;
70 let frame = frame.into_inner();
71 debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0");
72 if frame.header().frame_no() >= until {
73 yield frame;
74 }
75 };
76 }
77
78 if key.start_frame_no <= until {
79 break
80 }
81 current = key.start_frame_no - 1;
82 }
83 })
84 }
85}