libsql_wal/storage/backend/
mod.rs1#![allow(dead_code)]
2use std::future::Future;
3use std::sync::Arc;
4
5use chrono::{DateTime, Utc};
6use fst::Map;
7use tokio_stream::Stream;
8use uuid::Uuid;
9
10use super::{RestoreOptions, Result, SegmentInfo, SegmentKey};
11use crate::io::file::FileExt;
12use crate::segment::compacted::CompactedSegmentDataHeader;
13use libsql_sys::name::NamespaceName;
14
15#[cfg(feature = "s3")]
17pub mod s3;
18
19#[derive(Debug)]
20pub struct SegmentMeta {
21 pub namespace: NamespaceName,
22 pub segment_id: Uuid,
23 pub start_frame_no: u64,
24 pub end_frame_no: u64,
25 pub segment_timestamp: DateTime<Utc>,
26}
27
28pub struct RestoreRequest {}
29
30pub struct DbMeta {
31 pub max_frame_no: u64,
32}
33
34#[derive(Debug, Clone, Copy)]
35pub enum FindSegmentReq {
36 EndFrameNoLessThan(u64),
38 Timestamp(DateTime<Utc>),
40}
41
42pub trait Backend: Send + Sync + 'static {
43 type Config: Clone + Send + Sync + 'static;
45
46 fn store(
48 &self,
49 config: &Self::Config,
50 meta: SegmentMeta,
51 segment_data: impl FileExt,
52 segment_index: Vec<u8>,
53 ) -> impl Future<Output = Result<()>> + Send;
54
55 fn find_segment(
56 &self,
57 config: &Self::Config,
58 namespace: &NamespaceName,
59 req: FindSegmentReq,
60 ) -> impl Future<Output = Result<SegmentKey>> + Send;
61
62 fn fetch_segment_index(
63 &self,
64 config: &Self::Config,
65 namespace: &NamespaceName,
66 key: &SegmentKey,
67 ) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send;
68
69 async fn fetch_segment_data_to_file(
71 &self,
72 config: &Self::Config,
73 namespace: &NamespaceName,
74 key: &SegmentKey,
75 file: &impl FileExt,
76 ) -> Result<CompactedSegmentDataHeader>;
77
78 fn fetch_segment_data(
81 self: Arc<Self>,
82 config: Self::Config,
83 namespace: NamespaceName,
84 key: SegmentKey,
85 ) -> impl Future<Output = Result<impl FileExt>> + Send;
86
87 fn meta(
89 &self,
90 config: &Self::Config,
91 namespace: &NamespaceName,
92 ) -> impl Future<Output = Result<DbMeta>> + Send;
93
94 async fn restore(
95 &self,
96 config: &Self::Config,
97 namespace: &NamespaceName,
98 restore_options: RestoreOptions,
99 dest: impl FileExt,
100 ) -> Result<()>;
101
102 fn list_segments<'a>(
103 &'a self,
104 config: Self::Config,
105 namespace: &'a NamespaceName,
106 until: u64,
107 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a;
108
109 fn default_config(&self) -> Self::Config;
111}
112
113impl<T: Backend> Backend for Arc<T> {
114 type Config = T::Config;
115
116 fn store(
117 &self,
118 config: &Self::Config,
119 meta: SegmentMeta,
120 segment_data: impl FileExt,
121 segment_index: Vec<u8>,
122 ) -> impl Future<Output = Result<()>> + Send {
123 self.as_ref()
124 .store(config, meta, segment_data, segment_index)
125 }
126
127 async fn meta(&self, config: &Self::Config, namespace: &NamespaceName) -> Result<DbMeta> {
128 self.as_ref().meta(config, namespace).await
129 }
130
131 fn default_config(&self) -> Self::Config {
132 self.as_ref().default_config()
133 }
134
135 async fn restore(
136 &self,
137 config: &Self::Config,
138 namespace: &NamespaceName,
139 restore_options: RestoreOptions,
140 dest: impl FileExt,
141 ) -> Result<()> {
142 self.as_ref()
143 .restore(config, namespace, restore_options, dest)
144 .await
145 }
146
147 async fn find_segment(
148 &self,
149 config: &Self::Config,
150 namespace: &NamespaceName,
151 req: FindSegmentReq,
152 ) -> Result<SegmentKey> {
153 self.as_ref().find_segment(config, namespace, req).await
154 }
155
156 async fn fetch_segment_index(
157 &self,
158 config: &Self::Config,
159 namespace: &NamespaceName,
160 key: &SegmentKey,
161 ) -> Result<Map<Arc<[u8]>>> {
162 self.as_ref()
163 .fetch_segment_index(config, namespace, key)
164 .await
165 }
166
167 async fn fetch_segment_data_to_file(
168 &self,
169 config: &Self::Config,
170 namespace: &NamespaceName,
171 key: &SegmentKey,
172 file: &impl FileExt,
173 ) -> Result<CompactedSegmentDataHeader> {
174 self.as_ref()
175 .fetch_segment_data_to_file(config, namespace, key, file)
176 .await
177 }
178
179 async fn fetch_segment_data(
180 self: Arc<Self>,
181 config: Self::Config,
182 namespace: NamespaceName,
183 key: SegmentKey,
184 ) -> Result<impl FileExt> {
185 self.as_ref()
187 .clone()
188 .fetch_segment_data(config, namespace, key)
189 .await
190 }
191
192 fn list_segments<'a>(
193 &'a self,
194 config: Self::Config,
195 namespace: &'a NamespaceName,
196 until: u64,
197 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
198 self.as_ref().list_segments(config, namespace, until)
199 }
200}