libsql_wal/storage/backend/
mod.rs

1#![allow(dead_code)]
2use std::future::Future;
3use std::sync::Arc;
4
5use chrono::{DateTime, Utc};
6use fst::Map;
7use tokio_stream::Stream;
8use uuid::Uuid;
9
10use super::{RestoreOptions, Result, SegmentInfo, SegmentKey};
11use crate::io::file::FileExt;
12use crate::segment::compacted::CompactedSegmentDataHeader;
13use libsql_sys::name::NamespaceName;
14
15// pub mod fs;
16#[cfg(feature = "s3")]
17pub mod s3;
18
19#[derive(Debug)]
20pub struct SegmentMeta {
21    pub namespace: NamespaceName,
22    pub segment_id: Uuid,
23    pub start_frame_no: u64,
24    pub end_frame_no: u64,
25    pub segment_timestamp: DateTime<Utc>,
26}
27
28pub struct RestoreRequest {}
29
30pub struct DbMeta {
31    pub max_frame_no: u64,
32}
33
34#[derive(Debug, Clone, Copy)]
35pub enum FindSegmentReq {
36    /// returns a segment containing this frame
37    EndFrameNoLessThan(u64),
38    /// Returns the segment with closest timestamp less than or equal to the requested timestamp
39    Timestamp(DateTime<Utc>),
40}
41
42pub trait Backend: Send + Sync + 'static {
43    /// Config type associated with the Storage
44    type Config: Clone + Send + Sync + 'static;
45
46    /// Store `segment_data` with its associated `meta`
47    fn store(
48        &self,
49        config: &Self::Config,
50        meta: SegmentMeta,
51        segment_data: impl FileExt,
52        segment_index: Vec<u8>,
53    ) -> impl Future<Output = Result<()>> + Send;
54
55    fn find_segment(
56        &self,
57        config: &Self::Config,
58        namespace: &NamespaceName,
59        req: FindSegmentReq,
60    ) -> impl Future<Output = Result<SegmentKey>> + Send;
61
62    fn fetch_segment_index(
63        &self,
64        config: &Self::Config,
65        namespace: &NamespaceName,
66        key: &SegmentKey,
67    ) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send;
68
69    /// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`.
70    async fn fetch_segment_data_to_file(
71        &self,
72        config: &Self::Config,
73        namespace: &NamespaceName,
74        key: &SegmentKey,
75        file: &impl FileExt,
76    ) -> Result<CompactedSegmentDataHeader>;
77
78    // this method taking self: Arc<Self> is an infortunate consequence of rust type system making
79    // impl FileExt variant with all the arguments, with no escape hatch...
80    fn fetch_segment_data(
81        self: Arc<Self>,
82        config: Self::Config,
83        namespace: NamespaceName,
84        key: SegmentKey,
85    ) -> impl Future<Output = Result<impl FileExt>> + Send;
86
87    /// Fetch meta for `namespace`
88    fn meta(
89        &self,
90        config: &Self::Config,
91        namespace: &NamespaceName,
92    ) -> impl Future<Output = Result<DbMeta>> + Send;
93
94    async fn restore(
95        &self,
96        config: &Self::Config,
97        namespace: &NamespaceName,
98        restore_options: RestoreOptions,
99        dest: impl FileExt,
100    ) -> Result<()>;
101
102    fn list_segments<'a>(
103        &'a self,
104        config: Self::Config,
105        namespace: &'a NamespaceName,
106        until: u64,
107    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a;
108
109    /// Returns the default configuration for this storage
110    fn default_config(&self) -> Self::Config;
111}
112
113impl<T: Backend> Backend for Arc<T> {
114    type Config = T::Config;
115
116    fn store(
117        &self,
118        config: &Self::Config,
119        meta: SegmentMeta,
120        segment_data: impl FileExt,
121        segment_index: Vec<u8>,
122    ) -> impl Future<Output = Result<()>> + Send {
123        self.as_ref()
124            .store(config, meta, segment_data, segment_index)
125    }
126
127    async fn meta(&self, config: &Self::Config, namespace: &NamespaceName) -> Result<DbMeta> {
128        self.as_ref().meta(config, namespace).await
129    }
130
131    fn default_config(&self) -> Self::Config {
132        self.as_ref().default_config()
133    }
134
135    async fn restore(
136        &self,
137        config: &Self::Config,
138        namespace: &NamespaceName,
139        restore_options: RestoreOptions,
140        dest: impl FileExt,
141    ) -> Result<()> {
142        self.as_ref()
143            .restore(config, namespace, restore_options, dest)
144            .await
145    }
146
147    async fn find_segment(
148        &self,
149        config: &Self::Config,
150        namespace: &NamespaceName,
151        req: FindSegmentReq,
152    ) -> Result<SegmentKey> {
153        self.as_ref().find_segment(config, namespace, req).await
154    }
155
156    async fn fetch_segment_index(
157        &self,
158        config: &Self::Config,
159        namespace: &NamespaceName,
160        key: &SegmentKey,
161    ) -> Result<Map<Arc<[u8]>>> {
162        self.as_ref()
163            .fetch_segment_index(config, namespace, key)
164            .await
165    }
166
167    async fn fetch_segment_data_to_file(
168        &self,
169        config: &Self::Config,
170        namespace: &NamespaceName,
171        key: &SegmentKey,
172        file: &impl FileExt,
173    ) -> Result<CompactedSegmentDataHeader> {
174        self.as_ref()
175            .fetch_segment_data_to_file(config, namespace, key, file)
176            .await
177    }
178
179    async fn fetch_segment_data(
180        self: Arc<Self>,
181        config: Self::Config,
182        namespace: NamespaceName,
183        key: SegmentKey,
184    ) -> Result<impl FileExt> {
185        // this implementation makes no sense (Arc<Arc<T>>)
186        self.as_ref()
187            .clone()
188            .fetch_segment_data(config, namespace, key)
189            .await
190    }
191
192    fn list_segments<'a>(
193        &'a self,
194        config: Self::Config,
195        namespace: &'a NamespaceName,
196        until: u64,
197    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
198        self.as_ref().list_segments(config, namespace, until)
199    }
200}