libsql_wal/replication/
storage.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use fst::{IntoStreamer, Streamer};
5use libsql_sys::name::NamespaceName;
6use roaring::RoaringBitmap;
7use tokio_stream::Stream;
8use zerocopy::FromZeroes;
9
10use crate::io::buf::ZeroCopyBoxIoBuf;
11use crate::segment::Frame;
12use crate::storage::backend::FindSegmentReq;
13use crate::storage::Storage;
14
15use super::Result;
16
17pub trait ReplicateFromStorage: Sync + Send + 'static {
18    fn stream<'a, 'b>(
19        &'b self,
20        seen: &'a mut RoaringBitmap,
21        current: u64,
22        until: u64,
23    ) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + 'a + Send>>;
24}
25
26pub struct StorageReplicator<S> {
27    storage: Arc<S>,
28    namespace: NamespaceName,
29}
30
31impl<S> StorageReplicator<S> {
32    pub fn new(storage: Arc<S>, namespace: NamespaceName) -> Self {
33        Self { storage, namespace }
34    }
35}
36
37impl<S> ReplicateFromStorage for StorageReplicator<S>
38where
39    S: Storage,
40{
41    fn stream<'a, 'b>(
42        &'b self,
43        seen: &'a mut roaring::RoaringBitmap,
44        mut current: u64,
45        until: u64,
46    ) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + Send + 'a>> {
47        let storage = self.storage.clone();
48        let namespace = self.namespace.clone();
49        Box::pin(async_stream::try_stream! {
50            loop {
51                let key = storage.find_segment(&namespace, FindSegmentReq::EndFrameNoLessThan(current), None).await?;
52                let index = storage.fetch_segment_index(&namespace, &key, None).await?;
53                let mut pages = index.into_stream();
54                let mut maybe_seg = None;
55                while let Some((page, offset)) = pages.next() {
56                    let page = u32::from_be_bytes(page.try_into().unwrap());
57                    // this segment contains data we are interested in, lazy dowload the segment
58                    if !seen.contains(page) {
59                        seen.insert(page);
60                        let segment = match maybe_seg {
61                            Some(ref seg) => seg,
62                            None => {
63                                maybe_seg = Some(storage.fetch_segment_data(&namespace, &key, None).await?);
64                                maybe_seg.as_ref().unwrap()
65                            },
66                        };
67
68                        let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await;
69                        ret?;
70                        let frame = frame.into_inner();
71                        debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0");
72                        if frame.header().frame_no() >= until {
73                            yield frame;
74                        }
75                    };
76                }
77
78                if key.start_frame_no <= until {
79                    break
80                }
81                current = key.start_frame_no - 1;
82            }
83        })
84    }
85}