1use std::collections::BTreeMap;
2use std::fmt::Debug;
3use std::path::{Path, PathBuf};
4use std::pin::Pin;
5use std::str::FromStr;
6use std::sync::Arc;
7use std::{fmt, future::Future};
8
9use chrono::{DateTime, Utc};
10use fst::Map;
11use hashbrown::HashMap;
12use libsql_sys::name::NamespaceName;
13use libsql_sys::wal::either::Either;
14use tempfile::{tempdir, TempDir};
15use tokio_stream::Stream;
16
17use crate::io::{FileExt, Io, StdIO};
18use crate::segment::compacted::CompactedSegment;
19use crate::segment::{sealed::SealedSegment, Segment};
20
21use self::backend::{FindSegmentReq, SegmentMeta};
22pub use self::error::Error;
23
24pub mod async_storage;
25pub mod backend;
26pub mod compaction;
27pub(crate) mod error;
28mod job;
29mod scheduler;
30
31pub type Result<T, E = self::error::Error> = std::result::Result<T, E>;
32
33pub enum RestoreOptions {
34 Latest,
35 Timestamp(DateTime<Utc>),
36}
37
38#[derive(Clone, Copy, PartialEq, Eq)]
65pub struct SegmentKey {
66 pub start_frame_no: u64,
67 pub end_frame_no: u64,
68 pub timestamp: u64,
69}
70
71impl Debug for SegmentKey {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("SegmentKey")
74 .field("start_frame_no", &self.start_frame_no)
75 .field("end_frame_no", &self.end_frame_no)
76 .field("timestamp", &self.timestamp())
77 .finish()
78 }
79}
80
81impl PartialOrd for SegmentKey {
82 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
83 match self.start_frame_no.partial_cmp(&other.start_frame_no) {
84 Some(core::cmp::Ordering::Equal) => {}
85 ord => return ord,
86 }
87 self.end_frame_no.partial_cmp(&other.end_frame_no)
88 }
89}
90
91impl Ord for SegmentKey {
92 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
93 self.partial_cmp(other).unwrap()
94 }
95}
96
97impl SegmentKey {
98 pub(crate) fn includes(&self, frame_no: u64) -> bool {
99 (self.start_frame_no..=self.end_frame_no).contains(&frame_no)
100 }
101
102 #[tracing::instrument]
103 fn validate_from_path(mut path: &Path, ns: &NamespaceName) -> Option<Self> {
104 let key: Self = path.file_name()?.to_str()?.parse().ok()?;
106
107 path = path.parent()?;
108
109 if path.file_name()? != "indexes" {
110 tracing::debug!("invalid key, ignoring");
111 return None;
112 }
113
114 path = path.parent()?;
115
116 if path.file_name()? != ns.as_str() {
117 tracing::debug!("invalid namespace for key");
118 return None;
119 }
120
121 Some(key)
122 }
123
124 fn timestamp(&self) -> DateTime<Utc> {
125 DateTime::from_timestamp_millis(self.timestamp as _)
126 .unwrap()
127 .to_utc()
128 }
129}
130
131impl From<&SegmentMeta> for SegmentKey {
132 fn from(value: &SegmentMeta) -> Self {
133 Self {
134 start_frame_no: value.start_frame_no,
135 end_frame_no: value.end_frame_no,
136 timestamp: value.segment_timestamp.timestamp_millis() as _,
137 }
138 }
139}
140
141impl FromStr for SegmentKey {
142 type Err = ();
143
144 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
145 let (rev_end_fno, s) = s.split_at(20);
146 let end_frame_no = u64::MAX - rev_end_fno.parse::<u64>().map_err(|_| ())?;
147 let (start_fno, timestamp) = s[1..].split_at(20);
148 let start_frame_no = start_fno.parse::<u64>().map_err(|_| ())?;
149 let timestamp = timestamp[1..].parse().map_err(|_| ())?;
150 Ok(Self {
151 start_frame_no,
152 end_frame_no,
153 timestamp,
154 })
155 }
156}
157
158impl fmt::Display for SegmentKey {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 write!(
161 f,
162 "{:020}-{:020}-{:020}",
163 u64::MAX - self.end_frame_no,
164 self.start_frame_no,
165 self.timestamp,
166 )
167 }
168}
169
170pub type OnStoreCallback = Box<
172 dyn FnOnce(u64) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>
173 + Send
174 + Sync
175 + 'static,
176>;
177
178pub trait Storage: Send + Sync + 'static {
179 type Segment: Segment;
180 type Config: Clone + Send;
181 fn store(
186 &self,
187 namespace: &NamespaceName,
188 seg: Self::Segment,
189 config_override: Option<Self::Config>,
190 on_store: OnStoreCallback,
191 );
192
193 fn durable_frame_no(
194 &self,
195 namespace: &NamespaceName,
196 config_override: Option<Self::Config>,
197 ) -> impl Future<Output = Result<u64>> + Send;
198
199 async fn restore(
200 &self,
201 file: impl FileExt,
202 namespace: &NamespaceName,
203 restore_options: RestoreOptions,
204 config_override: Option<Self::Config>,
205 ) -> Result<()>;
206
207 fn find_segment(
208 &self,
209 namespace: &NamespaceName,
210 frame_no: FindSegmentReq,
211 config_override: Option<Self::Config>,
212 ) -> impl Future<Output = Result<SegmentKey>> + Send;
213
214 fn fetch_segment_index(
215 &self,
216 namespace: &NamespaceName,
217 key: &SegmentKey,
218 config_override: Option<Self::Config>,
219 ) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send;
220
221 fn fetch_segment_data(
222 &self,
223 namespace: &NamespaceName,
224 key: &SegmentKey,
225 config_override: Option<Self::Config>,
226 ) -> impl Future<Output = Result<CompactedSegment<impl FileExt>>> + Send;
227
228 fn shutdown(&self) -> impl Future<Output = ()> + Send {
229 async { () }
230 }
231
232 fn list_segments<'a>(
233 &'a self,
234 namespace: &'a NamespaceName,
235 until: u64,
236 config_override: Option<Self::Config>,
237 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a;
238}
239
240#[derive(Debug)]
241pub struct SegmentInfo {
242 pub key: SegmentKey,
243 pub size: usize,
244}
245
246fn zip<A, B, C, D>(
248 x: &Either<A, B>,
249 y: Option<Either<C, D>>,
250) -> Either<(&A, Option<C>), (&B, Option<D>)> {
251 match (x, y) {
252 (Either::A(a), Some(Either::A(c))) => Either::A((a, Some(c))),
253 (Either::B(b), Some(Either::B(d))) => Either::B((b, Some(d))),
254 (Either::A(a), None) => Either::A((a, None)),
255 (Either::B(b), None) => Either::B((b, None)),
256 _ => panic!("incompatible options"),
257 }
258}
259
260impl<A, B, S> Storage for Either<A, B>
261where
262 A: Storage<Segment = S>,
263 B: Storage<Segment = S>,
264 S: Segment,
265{
266 type Segment = S;
267 type Config = Either<A::Config, B::Config>;
268
269 fn store(
270 &self,
271 namespace: &NamespaceName,
272 seg: Self::Segment,
273 config_override: Option<Self::Config>,
274 on_store: OnStoreCallback,
275 ) {
276 match zip(self, config_override) {
277 Either::A((s, c)) => s.store(namespace, seg, c, on_store),
278 Either::B((s, c)) => s.store(namespace, seg, c, on_store),
279 }
280 }
281
282 async fn durable_frame_no(
283 &self,
284 namespace: &NamespaceName,
285 config_override: Option<Self::Config>,
286 ) -> Result<u64> {
287 match zip(self, config_override) {
288 Either::A((s, c)) => s.durable_frame_no(namespace, c).await,
289 Either::B((s, c)) => s.durable_frame_no(namespace, c).await,
290 }
291 }
292
293 async fn restore(
294 &self,
295 file: impl FileExt,
296 namespace: &NamespaceName,
297 restore_options: RestoreOptions,
298 config_override: Option<Self::Config>,
299 ) -> Result<()> {
300 match zip(self, config_override) {
301 Either::A((s, c)) => s.restore(file, namespace, restore_options, c).await,
302 Either::B((s, c)) => s.restore(file, namespace, restore_options, c).await,
303 }
304 }
305
306 fn find_segment(
307 &self,
308 namespace: &NamespaceName,
309 frame_no: FindSegmentReq,
310 config_override: Option<Self::Config>,
311 ) -> impl Future<Output = Result<SegmentKey>> + Send {
312 async move {
313 match zip(self, config_override) {
314 Either::A((s, c)) => s.find_segment(namespace, frame_no, c).await,
315 Either::B((s, c)) => s.find_segment(namespace, frame_no, c).await,
316 }
317 }
318 }
319
320 fn fetch_segment_index(
321 &self,
322 namespace: &NamespaceName,
323 key: &SegmentKey,
324 config_override: Option<Self::Config>,
325 ) -> impl Future<Output = Result<Map<Arc<[u8]>>>> + Send {
326 async move {
327 match zip(self, config_override) {
328 Either::A((s, c)) => s.fetch_segment_index(namespace, key, c).await,
329 Either::B((s, c)) => s.fetch_segment_index(namespace, key, c).await,
330 }
331 }
332 }
333
334 fn fetch_segment_data(
335 &self,
336 namespace: &NamespaceName,
337 key: &SegmentKey,
338 config_override: Option<Self::Config>,
339 ) -> impl Future<Output = Result<CompactedSegment<impl FileExt>>> + Send {
340 async move {
341 match zip(self, config_override) {
342 Either::A((s, c)) => {
343 let seg = s.fetch_segment_data(namespace, key, c).await?;
344 let seg = seg.remap_file_type(Either::A);
345 Ok(seg)
346 }
347 Either::B((s, c)) => {
348 let seg = s.fetch_segment_data(namespace, key, c).await?;
349 let seg = seg.remap_file_type(Either::B);
350 Ok(seg)
351 }
352 }
353 }
354 }
355
356 async fn shutdown(&self) {
357 match self {
358 Either::A(a) => a.shutdown().await,
359 Either::B(b) => b.shutdown().await,
360 }
361 }
362
363 fn list_segments<'a>(
364 &'a self,
365 namespace: &'a NamespaceName,
366 until: u64,
367 config_override: Option<Self::Config>,
368 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
369 match zip(self, config_override) {
370 Either::A((s, c)) => {
371 tokio_util::either::Either::Left(s.list_segments(namespace, until, c))
372 }
373 Either::B((s, c)) => {
374 tokio_util::either::Either::Right(s.list_segments(namespace, until, c))
375 }
376 }
377 }
378}
379
380#[derive(Debug, Clone, Copy)]
382pub struct NoStorage;
383
384impl Storage for NoStorage {
385 type Config = ();
386 type Segment = SealedSegment<std::fs::File>;
387
388 fn store(
389 &self,
390 _namespace: &NamespaceName,
391 _seg: Self::Segment,
392 _config: Option<Self::Config>,
393 _on_store: OnStoreCallback,
394 ) {
395 }
396
397 async fn durable_frame_no(
398 &self,
399 _namespace: &NamespaceName,
400 _config: Option<Self::Config>,
401 ) -> Result<u64> {
402 Ok(u64::MAX)
403 }
404
405 async fn restore(
406 &self,
407 _file: impl FileExt,
408 _namespace: &NamespaceName,
409 _restore_options: RestoreOptions,
410 _config_override: Option<Self::Config>,
411 ) -> Result<()> {
412 panic!("can restore from no storage")
413 }
414
415 async fn find_segment(
416 &self,
417 _namespace: &NamespaceName,
418 _frame_no: FindSegmentReq,
419 _config_override: Option<Self::Config>,
420 ) -> Result<SegmentKey> {
421 unimplemented!()
422 }
423
424 async fn fetch_segment_index(
425 &self,
426 _namespace: &NamespaceName,
427 _key: &SegmentKey,
428 _config_override: Option<Self::Config>,
429 ) -> Result<Map<Arc<[u8]>>> {
430 unimplemented!()
431 }
432
433 async fn fetch_segment_data(
434 &self,
435 _namespace: &NamespaceName,
436 _key: &SegmentKey,
437 _config_override: Option<Self::Config>,
438 ) -> Result<CompactedSegment<impl FileExt>> {
439 unimplemented!();
440 #[allow(unreachable_code)]
441 Result::<CompactedSegment<std::fs::File>>::Err(Error::InvalidIndex(""))
442 }
443
444 fn list_segments<'a>(
445 &'a self,
446 _namespace: &'a NamespaceName,
447 _until: u64,
448 _config_override: Option<Self::Config>,
449 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
450 unimplemented!("no storage!");
451 #[allow(unreachable_code)]
452 tokio_stream::empty()
453 }
454}
455
456#[doc(hidden)]
457#[derive(Debug)]
458pub struct TestStorage<IO = StdIO> {
459 inner: Arc<async_lock::Mutex<TestStorageInner<IO>>>,
460}
461
462#[derive(Debug)]
463struct TestStorageInner<IO> {
464 stored: HashMap<NamespaceName, BTreeMap<SegmentKey, (PathBuf, Map<Arc<[u8]>>)>>,
465 dir: TempDir,
466 io: IO,
467 store: bool,
468}
469
470impl<F> Clone for TestStorage<F> {
471 fn clone(&self) -> Self {
472 Self {
473 inner: self.inner.clone(),
474 }
475 }
476}
477
478impl TestStorage<StdIO> {
479 pub fn new() -> Self {
480 Self::new_io(false, StdIO(()))
481 }
482
483 pub fn new_store() -> Self {
484 Self::new_io(true, StdIO(()))
485 }
486}
487
488impl<IO: Io> TestStorage<IO> {
489 pub fn new_io(store: bool, io: IO) -> Self {
490 let dir = tempdir().unwrap();
491 Self {
492 inner: Arc::new(
493 TestStorageInner {
494 dir,
495 stored: Default::default(),
496 io,
497 store,
498 }
499 .into(),
500 ),
501 }
502 }
503}
504
505impl<IO: Io> Storage for TestStorage<IO> {
506 type Segment = SealedSegment<IO::File>;
507 type Config = ();
508
509 fn store(
510 &self,
511 namespace: &NamespaceName,
512 seg: Self::Segment,
513 _config: Option<Self::Config>,
514 on_store: OnStoreCallback,
515 ) {
516 let mut inner = self.inner.lock_blocking();
517 if inner.store {
518 let id = uuid::Uuid::new_v4();
519 let out_path = inner.dir.path().join(id.to_string());
520 let out_file = inner.io.open(true, true, true, &out_path).unwrap();
521 let index = tokio::runtime::Handle::current()
522 .block_on(seg.compact(&out_file, id))
523 .unwrap();
524 let end_frame_no = seg.header().last_committed();
525 let key = SegmentKey {
526 start_frame_no: seg.header().start_frame_no.get(),
527 end_frame_no,
528 timestamp: seg.header().sealed_at_timestamp.get(),
529 };
530 let index = Map::new(index.into()).unwrap();
531 inner
532 .stored
533 .entry(namespace.clone())
534 .or_default()
535 .insert(key, (out_path, index));
536 tokio::runtime::Handle::current().block_on(on_store(end_frame_no));
537 } else {
538 tokio::task::spawn_blocking(move || {
541 tokio::runtime::Handle::current().block_on(on_store(u64::MAX));
542 });
543 }
544 }
545
546 async fn durable_frame_no(
547 &self,
548 _namespace: &NamespaceName,
549 _config: Option<Self::Config>,
550 ) -> Result<u64> {
551 Ok(u64::MAX)
552 }
553
554 async fn restore(
555 &self,
556 _file: impl FileExt,
557 _namespace: &NamespaceName,
558 _restore_options: RestoreOptions,
559 _config_override: Option<Self::Config>,
560 ) -> Result<()> {
561 todo!();
562 }
563
564 async fn find_segment(
565 &self,
566 namespace: &NamespaceName,
567 req: FindSegmentReq,
568 _config_override: Option<Self::Config>,
569 ) -> Result<SegmentKey> {
570 let inner = self.inner.lock().await;
571 if inner.store {
572 let FindSegmentReq::EndFrameNoLessThan(fno) = req else {
573 panic!("unsupported lookup by ts")
574 };
575 if let Some(segs) = inner.stored.get(namespace) {
576 let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(fno)) else {
577 return Err(Error::SegmentNotFound(req));
578 };
579 return Ok(*key);
580 } else {
581 panic!("namespace not found");
582 }
583 } else {
584 panic!("store not enabled")
585 }
586 }
587
588 async fn fetch_segment_index(
589 &self,
590 namespace: &NamespaceName,
591 key: &SegmentKey,
592 _config_override: Option<Self::Config>,
593 ) -> Result<Map<Arc<[u8]>>> {
594 let inner = self.inner.lock().await;
595 if inner.store {
596 match inner.stored.get(namespace) {
597 Some(segs) => Ok(segs.get(&key).unwrap().1.clone()),
598 None => panic!("unknown namespace"),
599 }
600 } else {
601 panic!("not storing")
602 }
603 }
604
605 async fn fetch_segment_data(
606 &self,
607 namespace: &NamespaceName,
608 key: &SegmentKey,
609 _config_override: Option<Self::Config>,
610 ) -> Result<CompactedSegment<impl FileExt>> {
611 let inner = self.inner.lock().await;
612 if inner.store {
613 match inner.stored.get(namespace) {
614 Some(segs) => {
615 let path = &segs.get(&key).unwrap().0;
616 let file = inner.io.open(false, true, false, path).unwrap();
617 Ok(CompactedSegment::open(file).await?)
618 }
619 None => panic!("unknown namespace"),
620 }
621 } else {
622 panic!("not storing")
623 }
624 }
625
626 fn list_segments<'a>(
627 &'a self,
628 _namespace: &'a NamespaceName,
629 _until: u64,
630 _config_override: Option<Self::Config>,
631 ) -> impl Stream<Item = Result<SegmentInfo>> + 'a {
632 todo!();
633 #[allow(unreachable_code)]
634 tokio_stream::empty()
635 }
636}
637
638pub struct StoreSegmentRequest<S, C> {
639 namespace: NamespaceName,
640 segment: S,
642 created_at: DateTime<Utc>,
644
645 storage_config_override: Option<C>,
648 on_store_callback: OnStoreCallback,
650}
651
652impl<S, C> Debug for StoreSegmentRequest<S, C>
653where
654 S: Debug,
655{
656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657 f.debug_struct("StoreSegmentRequest")
658 .field("namespace", &self.namespace)
659 .field("segment", &self.segment)
660 .field("created_at", &self.created_at)
661 .finish()
662 }
663}