libsql_wal/storage/
mod.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3use std::path::{Path, PathBuf};
4use std::pin::Pin;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::{fmt, future::Future};
8
9use chrono::{DateTime, Utc};
10use fst::Map;
11use hashbrown::HashMap;
12use libsql_sys::name::NamespaceName;
13use libsql_sys::wal::either::Either;
14use tempfile::{tempdir, TempDir};
15use tokio_stream::Stream;
16
17use crate::io::{FileExt, Io, StdIO};
18use crate::segment::compacted::CompactedSegment;
19use crate::segment::{sealed::SealedSegment, Segment};
20
21use self::backend::{FindSegmentReq, SegmentMeta};
22pub use self::error::Error;
23
24pub mod async_storage;
25pub mod backend;
26pub mod compaction;
27pub(crate) mod error;
28mod job;
29mod scheduler;
30
31pub type Result<T, E = self::error::Error> = std::result::Result<T, E>;
32
33pub enum RestoreOptions {
34    Latest,
35    Timestamp(DateTime<Utc>),
36}
37
38/// SegmentKey is used to index segment data, where keys a lexicographically ordered.
39/// The scheme is `{u64::MAX - start_frame_no}-{u64::MAX - end_frame_no}`. With that naming convention, when looking for
40/// the segment containing 'n', we can perform a prefix search with "{u64::MAX - n}". The first
41/// element of the range will be the biggest segment that contains n if it exists.
42/// Beware that if no segments contain n, either the smallest segment not containing n, if n < argmin
43/// {start_frame_no}, or the largest segment if n > argmax {end_frame_no} will be returned.
44/// e.g:
45/// ```ignore
46/// let mut map = BTreeMap::new();
47///
48/// let meta = SegmentMeta { start_frame_no: 1, end_frame_no: 100 };
49/// map.insert(SegmentKey(&meta).to_string(), meta);
50///
51/// let meta = SegmentMeta { start_frame_no: 101, end_frame_no: 500 };
52/// map.insert(SegmentKey(&meta).to_string(), meta);
53///
54/// let meta = SegmentMeta { start_frame_no: 101, end_frame_no: 1000 };
55/// map.insert(SegmentKey(&meta).to_string(), meta);
56///
57/// map.range(format!("{:020}", u64::MAX - 50)..).next();
58/// map.range(format!("{:020}", u64::MAX - 0)..).next();
59/// map.range(format!("{:020}", u64::MAX - 1)..).next();
60/// map.range(format!("{:020}", u64::MAX - 100)..).next();
61/// map.range(format!("{:020}", u64::MAX - 101)..).next();
62/// map.range(format!("{:020}", u64::MAX - 5000)..).next();
63/// ```
64#[derive(Clone, Copy, PartialEq, Eq)]
65pub struct SegmentKey {
66    pub start_frame_no: u64,
67    pub end_frame_no: u64,
68    pub timestamp: u64,
69}
70
71impl Debug for SegmentKey {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("SegmentKey")
74            .field("start_frame_no", &self.start_frame_no)
75            .field("end_frame_no", &self.end_frame_no)
76            .field("timestamp", &self.timestamp())
77            .finish()
78    }
79}
80
81impl PartialOrd for SegmentKey {
82    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83        match self.start_frame_no.partial_cmp(&other.start_frame_no) {
84            Some(core::cmp::Ordering::Equal) => {}
85            ord => return ord,
86        }
87        self.end_frame_no.partial_cmp(&other.end_frame_no)
88    }
89}
90
91impl Ord for SegmentKey {
92    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
93        self.partial_cmp(other).unwrap()
94    }
95}
96
97impl SegmentKey {
98    pub(crate) fn includes(&self, frame_no: u64) -> bool {
99        (self.start_frame_no..=self.end_frame_no).contains(&frame_no)
100    }
101
102    #[tracing::instrument]
103    fn validate_from_path(mut path: &Path, ns: &NamespaceName) -> Option<Self> {
104        // path in the form "v2/clusters/{cluster-id}/namespaces/{namespace}/indexes/{index-key}"
105        let key: Self = path.file_name()?.to_str()?.parse().ok()?;
106
107        path = path.parent()?;
108
109        if path.file_name()? != "indexes" {
110            tracing::debug!("invalid key, ignoring");
111            return None;
112        }
113
114        path = path.parent()?;
115
116        if path.file_name()? != ns.as_str() {
117            tracing::debug!("invalid namespace for key");
118            return None;
119        }
120
121        Some(key)
122    }
123
124    fn timestamp(&self) -> DateTime<Utc> {
125        DateTime::from_timestamp_millis(self.timestamp as _)
126            .unwrap()
127            .to_utc()
128    }
129}
130
131impl From<&SegmentMeta> for SegmentKey {
132    fn from(value: &SegmentMeta) -> Self {
133        Self {
134            start_frame_no: value.start_frame_no,
135            end_frame_no: value.end_frame_no,
136            timestamp: value.segment_timestamp.timestamp_millis() as _,
137        }
138    }
139}
140
141impl FromStr for SegmentKey {
142    type Err = ();
143
144    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
145        let (rev_end_fno, s) = s.split_at(20);
146        let end_frame_no = u64::MAX - rev_end_fno.parse::<u64>().map_err(|_| ())?;
147        let (start_fno, timestamp) = s[1..].split_at(20);
148        let start_frame_no = start_fno.parse::<u64>().map_err(|_| ())?;
149        let timestamp = timestamp[1..].parse().map_err(|_| ())?;
150        Ok(Self {
151            start_frame_no,
152            end_frame_no,
153            timestamp,
154        })
155    }
156}
157
158impl fmt::Display for SegmentKey {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        write!(
161            f,
162            "{:020}-{:020}-{:020}",
163            u64::MAX - self.end_frame_no,
164            self.start_frame_no,
165            self.timestamp,
166        )
167    }
168}
169
170/// takes the new durable frame_no and returns a future
171pub type OnStoreCallback = Box<
172    dyn FnOnce(u64) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>
173        + Send
174        + Sync
175        + 'static,
176>;
177
178pub trait Storage: Send + Sync + 'static {
179    type Segment: Segment;
180    type Config: Clone + Send;
181    /// store the passed segment for `namespace`. This function is called in a context where
182    /// blocking is acceptable.
183    /// returns a future that resolves when the segment is stored
184    /// The segment should be stored whether or not the future is polled.
185    fn store(
186        &self,
187        namespace: &NamespaceName,
188        seg: Self::Segment,
189        config_override: Option<Self::Config>,
190        on_store: OnStoreCallback,
191    );
192
193    fn durable_frame_no(
194        &self,
195        namespace: &NamespaceName,
196        config_override: Option<Self::Config>,
197    ) -> impl Future<Output = Result<u64>> + Send;
198
199    async fn restore(
200        &self,
201        file: impl FileExt,
202        namespace: &NamespaceName,
203        restore_options: RestoreOptions,
204        config_override: Option<Self::Config>,
205    ) -> Result<()>;
206
207    fn find_segment(
208        &self,
209        namespace: &NamespaceName,
210        frame_no: FindSegmentReq,
211        config_override: Option<Self::Config>,
212    ) -> impl Future<Output = Result<SegmentKey>> + Send;
213
214    fn fetch_segment_index(
215        &self,
216        namespace: &NamespaceName,
217        key: &SegmentKey,
218        config_override: Option<Self::Config>,
219    ) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send;
220
221    fn fetch_segment_data(
222        &self,
223        namespace: &NamespaceName,
224        key: &SegmentKey,
225        config_override: Option<Self::Config>,
226    ) -> impl Future<Output = Result<CompactedSegment<impl FileExt>>> + Send;
227
228    fn shutdown(&self) -> impl Future<Output = ()> + Send {
229        async { () }
230    }
231
232    fn list_segments<'a>(
233        &'a self,
234        namespace: &'a NamespaceName,
235        until: u64,
236        config_override: Option<Self::Config>,
237    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a;
238}
239
240#[derive(Debug)]
241pub struct SegmentInfo {
242    pub key: SegmentKey,
243    pub size: usize,
244}
245
246/// special zip function for Either storage implementation
247fn zip<A, B, C, D>(
248    x: &Either<A, B>,
249    y: Option<Either<C, D>>,
250) -> Either<(&A, Option<C>), (&B, Option<D>)> {
251    match (x, y) {
252        (Either::A(a), Some(Either::A(c))) => Either::A((a, Some(c))),
253        (Either::B(b), Some(Either::B(d))) => Either::B((b, Some(d))),
254        (Either::A(a), None) => Either::A((a, None)),
255        (Either::B(b), None) => Either::B((b, None)),
256        _ => panic!("incompatible options"),
257    }
258}
259
260impl<A, B, S> Storage for Either<A, B>
261where
262    A: Storage<Segment = S>,
263    B: Storage<Segment = S>,
264    S: Segment,
265{
266    type Segment = S;
267    type Config = Either<A::Config, B::Config>;
268
269    fn store(
270        &self,
271        namespace: &NamespaceName,
272        seg: Self::Segment,
273        config_override: Option<Self::Config>,
274        on_store: OnStoreCallback,
275    ) {
276        match zip(self, config_override) {
277            Either::A((s, c)) => s.store(namespace, seg, c, on_store),
278            Either::B((s, c)) => s.store(namespace, seg, c, on_store),
279        }
280    }
281
282    async fn durable_frame_no(
283        &self,
284        namespace: &NamespaceName,
285        config_override: Option<Self::Config>,
286    ) -> Result<u64> {
287        match zip(self, config_override) {
288            Either::A((s, c)) => s.durable_frame_no(namespace, c).await,
289            Either::B((s, c)) => s.durable_frame_no(namespace, c).await,
290        }
291    }
292
293    async fn restore(
294        &self,
295        file: impl FileExt,
296        namespace: &NamespaceName,
297        restore_options: RestoreOptions,
298        config_override: Option<Self::Config>,
299    ) -> Result<()> {
300        match zip(self, config_override) {
301            Either::A((s, c)) => s.restore(file, namespace, restore_options, c).await,
302            Either::B((s, c)) => s.restore(file, namespace, restore_options, c).await,
303        }
304    }
305
306    fn find_segment(
307        &self,
308        namespace: &NamespaceName,
309        frame_no: FindSegmentReq,
310        config_override: Option<Self::Config>,
311    ) -> impl Future<Output = Result<SegmentKey>> + Send {
312        async move {
313            match zip(self, config_override) {
314                Either::A((s, c)) => s.find_segment(namespace, frame_no, c).await,
315                Either::B((s, c)) => s.find_segment(namespace, frame_no, c).await,
316            }
317        }
318    }
319
320    fn fetch_segment_index(
321        &self,
322        namespace: &NamespaceName,
323        key: &SegmentKey,
324        config_override: Option<Self::Config>,
325    ) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send {
326        async move {
327            match zip(self, config_override) {
328                Either::A((s, c)) => s.fetch_segment_index(namespace, key, c).await,
329                Either::B((s, c)) => s.fetch_segment_index(namespace, key, c).await,
330            }
331        }
332    }
333
334    fn fetch_segment_data(
335        &self,
336        namespace: &NamespaceName,
337        key: &SegmentKey,
338        config_override: Option<Self::Config>,
339    ) -> impl Future<Output = Result<CompactedSegment<impl FileExt>>> + Send {
340        async move {
341            match zip(self, config_override) {
342                Either::A((s, c)) => {
343                    let seg = s.fetch_segment_data(namespace, key, c).await?;
344                    let seg = seg.remap_file_type(Either::A);
345                    Ok(seg)
346                }
347                Either::B((s, c)) => {
348                    let seg = s.fetch_segment_data(namespace, key, c).await?;
349                    let seg = seg.remap_file_type(Either::B);
350                    Ok(seg)
351                }
352            }
353        }
354    }
355
356    async fn shutdown(&self) {
357        match self {
358            Either::A(a) => a.shutdown().await,
359            Either::B(b) => b.shutdown().await,
360        }
361    }
362
363    fn list_segments<'a>(
364        &'a self,
365        namespace: &'a NamespaceName,
366        until: u64,
367        config_override: Option<Self::Config>,
368    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
369        match zip(self, config_override) {
370            Either::A((s, c)) => {
371                tokio_util::either::Either::Left(s.list_segments(namespace, until, c))
372            }
373            Either::B((s, c)) => {
374                tokio_util::either::Either::Right(s.list_segments(namespace, until, c))
375            }
376        }
377    }
378}
379
380/// a placeholder storage that doesn't store segment
381#[derive(Debug, Clone, Copy)]
382pub struct NoStorage;
383
384impl Storage for NoStorage {
385    type Config = ();
386    type Segment = SealedSegment<std::fs::File>;
387
388    fn store(
389        &self,
390        _namespace: &NamespaceName,
391        _seg: Self::Segment,
392        _config: Option<Self::Config>,
393        _on_store: OnStoreCallback,
394    ) {
395    }
396
397    async fn durable_frame_no(
398        &self,
399        _namespace: &NamespaceName,
400        _config: Option<Self::Config>,
401    ) -> Result<u64> {
402        Ok(u64::MAX)
403    }
404
405    async fn restore(
406        &self,
407        _file: impl FileExt,
408        _namespace: &NamespaceName,
409        _restore_options: RestoreOptions,
410        _config_override: Option<Self::Config>,
411    ) -> Result<()> {
412        panic!("can restore from no storage")
413    }
414
415    async fn find_segment(
416        &self,
417        _namespace: &NamespaceName,
418        _frame_no: FindSegmentReq,
419        _config_override: Option<Self::Config>,
420    ) -> Result<SegmentKey> {
421        unimplemented!()
422    }
423
424    async fn fetch_segment_index(
425        &self,
426        _namespace: &NamespaceName,
427        _key: &SegmentKey,
428        _config_override: Option<Self::Config>,
429    ) -> Result<Map<Arc<[u8]>>> {
430        unimplemented!()
431    }
432
433    async fn fetch_segment_data(
434        &self,
435        _namespace: &NamespaceName,
436        _key: &SegmentKey,
437        _config_override: Option<Self::Config>,
438    ) -> Result<CompactedSegment<impl FileExt>> {
439        unimplemented!();
440        #[allow(unreachable_code)]
441        Result::<CompactedSegment<std::fs::File>>::Err(Error::InvalidIndex(""))
442    }
443
444    fn list_segments<'a>(
445        &'a self,
446        _namespace: &'a NamespaceName,
447        _until: u64,
448        _config_override: Option<Self::Config>,
449    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
450        unimplemented!("no storage!");
451        #[allow(unreachable_code)]
452        tokio_stream::empty()
453    }
454}
455
456#[doc(hidden)]
457#[derive(Debug)]
458pub struct TestStorage<IO = StdIO> {
459    inner: Arc<async_lock::Mutex<TestStorageInner<IO>>>,
460}
461
462#[derive(Debug)]
463struct TestStorageInner<IO> {
464    stored: HashMap<NamespaceName, BTreeMap<SegmentKey, (PathBuf, Map<Arc<[u8]>>)>>,
465    dir: TempDir,
466    io: IO,
467    store: bool,
468}
469
470impl<F> Clone for TestStorage<F> {
471    fn clone(&self) -> Self {
472        Self {
473            inner: self.inner.clone(),
474        }
475    }
476}
477
478impl TestStorage<StdIO> {
479    pub fn new() -> Self {
480        Self::new_io(false, StdIO(()))
481    }
482
483    pub fn new_store() -> Self {
484        Self::new_io(true, StdIO(()))
485    }
486}
487
488impl<IO: Io> TestStorage<IO> {
489    pub fn new_io(store: bool, io: IO) -> Self {
490        let dir = tempdir().unwrap();
491        Self {
492            inner: Arc::new(
493                TestStorageInner {
494                    dir,
495                    stored: Default::default(),
496                    io,
497                    store,
498                }
499                .into(),
500            ),
501        }
502    }
503}
504
505impl<IO: Io> Storage for TestStorage<IO> {
506    type Segment = SealedSegment<IO::File>;
507    type Config = ();
508
509    fn store(
510        &self,
511        namespace: &NamespaceName,
512        seg: Self::Segment,
513        _config: Option<Self::Config>,
514        on_store: OnStoreCallback,
515    ) {
516        let mut inner = self.inner.lock_blocking();
517        if inner.store {
518            let id = uuid::Uuid::new_v4();
519            let out_path = inner.dir.path().join(id.to_string());
520            let out_file = inner.io.open(true, true, true, &out_path).unwrap();
521            let index = tokio::runtime::Handle::current()
522                .block_on(seg.compact(&out_file, id))
523                .unwrap();
524            let end_frame_no = seg.header().last_committed();
525            let key = SegmentKey {
526                start_frame_no: seg.header().start_frame_no.get(),
527                end_frame_no,
528                timestamp: seg.header().sealed_at_timestamp.get(),
529            };
530            let index = Map::new(index.into()).unwrap();
531            inner
532                .stored
533                .entry(namespace.clone())
534                .or_default()
535                .insert(key, (out_path, index));
536            tokio::runtime::Handle::current().block_on(on_store(end_frame_no));
537        } else {
538            // HACK: we need to spawn because many tests just call this method indirectly in
539            // async context. That makes tests easier to write.
540            tokio::task::spawn_blocking(move || {
541                tokio::runtime::Handle::current().block_on(on_store(u64::MAX));
542            });
543        }
544    }
545
546    async fn durable_frame_no(
547        &self,
548        _namespace: &NamespaceName,
549        _config: Option<Self::Config>,
550    ) -> Result<u64> {
551        Ok(u64::MAX)
552    }
553
554    async fn restore(
555        &self,
556        _file: impl FileExt,
557        _namespace: &NamespaceName,
558        _restore_options: RestoreOptions,
559        _config_override: Option<Self::Config>,
560    ) -> Result<()> {
561        todo!();
562    }
563
564    async fn find_segment(
565        &self,
566        namespace: &NamespaceName,
567        req: FindSegmentReq,
568        _config_override: Option<Self::Config>,
569    ) -> Result<SegmentKey> {
570        let inner = self.inner.lock().await;
571        if inner.store {
572            let FindSegmentReq::EndFrameNoLessThan(fno) = req else {
573                panic!("unsupported lookup by ts")
574            };
575            if let Some(segs) = inner.stored.get(namespace) {
576                let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(fno)) else {
577                    return Err(Error::SegmentNotFound(req));
578                };
579                return Ok(*key);
580            } else {
581                panic!("namespace not found");
582            }
583        } else {
584            panic!("store not enabled")
585        }
586    }
587
588    async fn fetch_segment_index(
589        &self,
590        namespace: &NamespaceName,
591        key: &SegmentKey,
592        _config_override: Option<Self::Config>,
593    ) -> Result<Map<Arc<[u8]>>> {
594        let inner = self.inner.lock().await;
595        if inner.store {
596            match inner.stored.get(namespace) {
597                Some(segs) => Ok(segs.get(&key).unwrap().1.clone()),
598                None => panic!("unknown namespace"),
599            }
600        } else {
601            panic!("not storing")
602        }
603    }
604
605    async fn fetch_segment_data(
606        &self,
607        namespace: &NamespaceName,
608        key: &SegmentKey,
609        _config_override: Option<Self::Config>,
610    ) -> Result<CompactedSegment<impl FileExt>> {
611        let inner = self.inner.lock().await;
612        if inner.store {
613            match inner.stored.get(namespace) {
614                Some(segs) => {
615                    let path = &segs.get(&key).unwrap().0;
616                    let file = inner.io.open(false, true, false, path).unwrap();
617                    Ok(CompactedSegment::open(file).await?)
618                }
619                None => panic!("unknown namespace"),
620            }
621        } else {
622            panic!("not storing")
623        }
624    }
625
626    fn list_segments<'a>(
627        &'a self,
628        _namespace: &'a NamespaceName,
629        _until: u64,
630        _config_override: Option<Self::Config>,
631    ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
632        todo!();
633        #[allow(unreachable_code)]
634        tokio_stream::empty()
635    }
636}
637
638pub struct StoreSegmentRequest<S, C> {
639    namespace: NamespaceName,
640    /// Path to the segment. Read-only for bottomless
641    segment: S,
642    /// When this segment was created
643    created_at: DateTime<Utc>,
644
645    /// alternative configuration to use with the storage layer.
646    /// e.g: S3 overrides
647    storage_config_override: Option<C>,
648    /// Called after the segment was stored, with the new durable index
649    on_store_callback: OnStoreCallback,
650}
651
652impl<S, C> Debug for StoreSegmentRequest<S, C>
653where
654    S: Debug,
655{
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        f.debug_struct("StoreSegmentRequest")
658            .field("namespace", &self.namespace)
659            .field("segment", &self.segment)
660            .field("created_at", &self.created_at)
661            .finish()
662    }
663}