libsql_wal/storage/compaction/
mod.rs

1use std::ops::Deref;
2use std::path::Path;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use chrono::DateTime;
7use fst::map::OpBuilder;
8use fst::Streamer;
9use libsql_sys::name::NamespaceName;
10use libsql_sys::rusqlite::OptionalExtension;
11use libsql_sys::rusqlite::{self, TransactionBehavior};
12use tempfile::tempdir;
13use tokio_stream::StreamExt;
14use uuid::Uuid;
15use zerocopy::AsBytes;
16
17use crate::io::buf::ZeroCopyBuf;
18use crate::io::FileExt;
19use crate::segment::compacted::CompactedSegment;
20use crate::segment::compacted::CompactedSegmentDataFooter;
21use crate::segment::compacted::CompactedSegmentDataHeader;
22use crate::segment::Frame;
23use crate::storage::backend::SegmentMeta;
24use crate::LIBSQL_MAGIC;
25use crate::LIBSQL_PAGE_SIZE;
26use crate::LIBSQL_WAL_VERSION;
27
28use super::backend::Backend;
29use super::{SegmentInfo, SegmentKey};
30
31pub mod strategy;
32
33type Result<T, E = Error> = std::result::Result<T, E>;
34
35#[derive(Debug, thiserror::Error)]
36pub enum Error {
37    #[error("error reading from meta db: {0}")]
38    Meta(#[from] rusqlite::Error),
39    #[error("io error: {0}")]
40    Io(#[from] std::io::Error),
41    #[error("storage error: {0}")]
42    Storage(#[from] crate::storage::Error),
43}
44
45pub struct Compactor<B> {
46    backend: Arc<B>,
47    meta: rusqlite::Connection,
48    path: PathBuf,
49}
50
51impl<B> Compactor<B> {
52    pub fn new(backend: Arc<B>, compactor_path: &Path) -> Result<Self> {
53        let meta = rusqlite::Connection::open(compactor_path.join("meta.db"))?;
54        // todo! set pragmas: wal + foreign key check
55        meta.pragma_update(None, "journal_mode", "wal")?;
56        meta.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap();
57        meta.execute(
58            r#"CREATE TABLE IF NOT EXISTS segments (
59                        start_frame_no INTEGER,
60                        end_frame_no INTEGER,
61                        timestamp DATE,
62                        size INTEGER,
63                        namespace_id INTEGER REFERENCES monitored_namespaces(id) ON DELETE CASCADE,
64                        PRIMARY KEY (start_frame_no, end_frame_no))
65                        "#,
66            (),
67        )?;
68
69        Ok(Self {
70            backend,
71            meta,
72            path: compactor_path.into(),
73        })
74    }
75
76    pub async fn monitor(&mut self, namespace: &NamespaceName) -> Result<()>
77    where
78        B: Backend,
79    {
80        let tx = self.meta.transaction()?;
81        let id = {
82            let mut stmt  = tx.prepare_cached("INSERT OR IGNORE INTO monitored_namespaces(namespace_name) VALUES (?) RETURNING id")?;
83            stmt.query_row([namespace.as_str()], |r| r.get(0))
84                .optional()?
85        };
86
87        if let Some(id) = id {
88            sync_one(self.backend.as_ref(), namespace, id, &tx, true).await?;
89        }
90
91        tx.commit()?;
92
93        Ok(())
94    }
95
96    pub fn analyze(&self, namespace: &NamespaceName) -> Result<AnalyzedSegments> {
97        let mut stmt = self.meta.prepare_cached(
98            r#"
99        SELECT start_frame_no, end_frame_no, timestamp
100        FROM segments as s
101        JOIN monitored_namespaces as m
102        ON m.id = s.namespace_id
103        WHERE m.namespace_name = ?"#,
104        )?;
105        let mut rows = stmt.query([namespace.as_str()])?;
106        let mut graph = petgraph::graphmap::DiGraphMap::new();
107        let mut last_frame_no = 0;
108        while let Some(row) = rows.next()? {
109            let start_frame_no: u64 = row.get(0)?;
110            let end_frame_no: u64 = row.get(1)?;
111            let timestamp: u64 = row.get(2)?;
112            graph.add_edge(start_frame_no, end_frame_no, timestamp);
113            if start_frame_no != 1 {
114                graph.add_edge(start_frame_no - 1, start_frame_no, 0);
115            }
116            last_frame_no = last_frame_no.max(end_frame_no);
117        }
118
119        Ok(AnalyzedSegments {
120            graph,
121            last_frame_no,
122            namespace: namespace.clone(),
123        })
124    }
125
126    pub fn get_segment_range(
127        &self,
128        namespace: &NamespaceName,
129    ) -> Result<Option<(SegmentInfo, SegmentInfo)>> {
130        segments_range(&self.meta, namespace)
131    }
132
133    /// Polls storage for new frames since last sync
134    #[tracing::instrument(skip(self))]
135    async fn sync_latest(&self) -> Result<()>
136    where
137        B: Backend,
138    {
139        // let tx = self.meta.transaction()?;
140        // let stream = self.storage.list_segments();
141
142        Ok(())
143    }
144
145    /// sync all segments from storage with local cache
146    pub async fn sync_all(&mut self, full: bool) -> Result<()>
147    where
148        B: Backend,
149    {
150        let tx = self
151            .meta
152            .transaction_with_behavior(TransactionBehavior::Immediate)?;
153        {
154            let mut stmt = tx.prepare("SELECT namespace_name, id FROM monitored_namespaces")?;
155            let mut namespace_rows = stmt.query(())?;
156            while let Some(row) = namespace_rows.next()? {
157                let namespace = NamespaceName::from_string(row.get::<_, String>(0)?);
158                let id = row.get::<_, u64>(1)?;
159                sync_one(self.backend.as_ref(), &namespace, id, &tx, full).await?;
160            }
161        }
162
163        tx.commit()?;
164
165        Ok(())
166    }
167
168    pub async fn sync_one(&mut self, namespace: &NamespaceName, full: bool) -> Result<()>
169    where
170        B: Backend,
171    {
172        let tx = self
173            .meta
174            .transaction_with_behavior(TransactionBehavior::Immediate)?;
175        {
176            let mut stmt =
177                tx.prepare_cached("SELECT id FROM monitored_namespaces WHERE namespace_name = ?")?;
178            let id = stmt
179                .query_row([namespace.as_str()], |row| row.get(0))
180                .optional()?;
181            if let Some(id) = id {
182                sync_one(self.backend.as_ref(), &namespace, id, &tx, full).await?;
183            }
184        }
185
186        tx.commit()?;
187
188        Ok(())
189    }
190
191    async fn fetch(
192        &self,
193        set: &SegmentSet,
194        into: &Path,
195    ) -> Result<(
196        Vec<CompactedSegment<std::fs::File>>,
197        Vec<fst::Map<Arc<[u8]>>>,
198    )>
199    where
200        B: Backend,
201    {
202        let mut indexes = Vec::with_capacity(set.len());
203        let mut segments = Vec::with_capacity(set.len());
204        for key in set.iter() {
205            let file = std::fs::File::options()
206                .create_new(true)
207                .write(true)
208                .read(true)
209                .open(into.join(&format!("{key}.data")))
210                .unwrap();
211            let header = self
212                .backend
213                .fetch_segment_data_to_file(
214                    &self.backend.default_config(),
215                    &set.namespace,
216                    key,
217                    &file,
218                )
219                .await
220                .unwrap();
221            let index = self
222                .backend
223                .fetch_segment_index(&self.backend.default_config(), &set.namespace, key)
224                .await
225                .unwrap();
226            indexes.push(index);
227            segments.push(CompactedSegment::from_parts(file, header));
228        }
229
230        Ok((segments, indexes))
231    }
232
233    pub async fn compact(&self, set: SegmentSet) -> Result<()>
234    where
235        B: Backend,
236    {
237        assert!(!set.is_empty());
238        let tmp = tempdir().unwrap();
239        // FIXME: bruteforce: we don't necessarily need to download all the segments to cover all
240        // the changes. Iterating backward over the set items and filling the gaps in the pages
241        // range would, in theory, be sufficient
242        // another alternative is to download all the indexes, and lazily download the segment data
243        // TODO: fetch conccurently
244        let (segments, indexes) = self.fetch(&set, tmp.path()).await?;
245        let last_header = segments.last().expect("non-empty set").header();
246
247        // It's unfortunate that we need to know the number of frames in the final segment ahead of
248        // time, but it's necessary for computing the checksum. This seems like the least costly
249        // methods (over recomputing the whole checksum).
250        let mut union = OpBuilder::from_iter(indexes.iter()).union();
251        let mut count = 0;
252        while let Some(_) = union.next() {
253            count += 1;
254        }
255
256        let mut hasher = crc32fast::Hasher::new();
257
258        let out_file = std::fs::File::options()
259            .create_new(true)
260            .write(true)
261            .read(true)
262            .open(tmp.path().join("out"))
263            .unwrap();
264        let header = CompactedSegmentDataHeader {
265            frame_count: (count as u32).into(),
266            segment_id: Uuid::new_v4().to_u128_le().into(),
267            start_frame_no: set.range().expect("non-empty set").0.into(),
268            end_frame_no: set.range().expect("non-empty set").1.into(),
269            size_after: last_header.size_after,
270            version: LIBSQL_WAL_VERSION.into(),
271            magic: LIBSQL_MAGIC.into(),
272            page_size: last_header.page_size,
273            // the new compacted segment inherit the last segment timestamp: it contains the same
274            // logical data.
275            timestamp: last_header.timestamp,
276        };
277
278        hasher.update(header.as_bytes());
279        let (_, ret) = out_file
280            .write_all_at_async(ZeroCopyBuf::new_init(header), 0)
281            .await;
282        ret?;
283
284        let mut union = OpBuilder::from_iter(indexes.iter()).union();
285        let mut buffer = Box::new(ZeroCopyBuf::<Frame>::new_uninit());
286        let mut out_index = fst::MapBuilder::memory();
287        let mut current_offset = 0;
288
289        while let Some((page_no_bytes, indexed_offsets)) = union.next() {
290            let (index, offset) = indexed_offsets
291                .iter()
292                .max_by_key(|v| v.index)
293                .map(|v| (v.index, v.value))
294                .expect("union returned something, must be non-empty");
295            let segment = &segments[index];
296            let (frame, ret) = segment.read_frame(buffer, offset as u32).await;
297            ret?;
298            hasher.update(&frame.get_ref().as_bytes());
299            let dest_offset =
300                size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
301            let (mut frame, ret) = out_file.write_all_at_async(frame, dest_offset as u64).await;
302            ret?;
303            out_index
304                .insert(page_no_bytes, current_offset as _)
305                .unwrap();
306            current_offset += 1;
307            frame.deinit();
308            buffer = frame;
309        }
310
311        let footer = CompactedSegmentDataFooter {
312            checksum: hasher.finalize().into(),
313        };
314
315        let footer_offset =
316            size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
317        let (_, ret) = out_file
318            .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _)
319            .await;
320        ret?;
321
322        let (start, end) = set.range().expect("non-empty set");
323        let timestamp = DateTime::from_timestamp_millis(set.last().unwrap().timestamp as _)
324            .unwrap()
325            .to_utc();
326        self.backend
327            .store(
328                &self.backend.default_config(),
329                SegmentMeta {
330                    namespace: set.namespace.clone(),
331                    segment_id: Uuid::new_v4(),
332                    start_frame_no: start,
333                    end_frame_no: end,
334                    segment_timestamp: timestamp,
335                },
336                out_file,
337                out_index.into_inner().unwrap(),
338            )
339            .await?;
340
341        Ok(())
342    }
343
344    /// Restore a datatase file from a segment set
345    /// set must start at frame_no 1
346    pub async fn restore(&self, set: SegmentSet, to: impl AsRef<Path>) -> Result<()>
347    where
348        B: Backend,
349    {
350        if set.is_empty() {
351            return Ok(());
352        }
353        assert_eq!(set.range().unwrap().0, 1);
354        let tmp = tempdir()?;
355        let (segments, indexes) = self.fetch(&set, tmp.path()).await?;
356        let mut union = OpBuilder::from_iter(indexes.iter()).union();
357        let mut buffer = Vec::with_capacity(LIBSQL_PAGE_SIZE as usize);
358        let out_file = std::fs::File::create(to)?;
359
360        while let Some((page_no_bytes, indexed_offsets)) = union.next() {
361            let page_no = u32::from_be_bytes(page_no_bytes.try_into().unwrap());
362            let (index, offset) = indexed_offsets
363                .iter()
364                .max_by_key(|v| v.index)
365                .map(|v| (v.index, v.value as u32))
366                .expect("union returned something, must be non-empty");
367            let segment = &segments[index];
368            let (b, ret) = segment.read_page(buffer, offset).await;
369            ret?;
370            let offset = (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64;
371            let (mut b, ret) = out_file.write_all_at_async(b, offset).await;
372            ret?;
373            b.clear();
374            buffer = b;
375        }
376
377        Ok(())
378    }
379
380    pub fn list_all_segments(
381        &self,
382        namespace: &NamespaceName,
383        f: impl FnMut(SegmentInfo),
384    ) -> Result<()> {
385        list_segments(&self.meta, namespace, f)
386    }
387
388    pub fn list_monitored_namespaces(&self, f: impl FnMut(NamespaceName)) -> Result<()> {
389        list_namespace(&self.meta, f)
390    }
391
392    pub fn unmonitor(&self, ns: &NamespaceName) -> Result<()> {
393        unmonitor(&self.meta, ns)
394    }
395}
396
397pub struct AnalyzedSegments {
398    graph: petgraph::graphmap::DiGraphMap<u64, u64>,
399    last_frame_no: u64,
400    namespace: NamespaceName,
401}
402
403impl AnalyzedSegments {
404    /// returns a list of keys that covers frame_no 1 to last in the shortest amount of segments
405    pub fn shortest_restore_path(&self) -> SegmentSet {
406        if self.graph.node_count() == 0 {
407            return SegmentSet {
408                namespace: self.namespace.clone(),
409                segments: Vec::new(),
410            };
411        }
412
413        let path = petgraph::algo::astar(
414            &self.graph,
415            1,
416            |n| n == self.last_frame_no,
417            // it's always free to go from one end of the segment to the other, and it costs us to
418            // fetch a new segment. edges between segments are always 0, and edges within segments
419            // are the segment timestamp
420            |(_, _, &x)| if x == 0 { 1 } else { 0 },
421            |n| self.last_frame_no - n,
422        );
423        let mut segments = Vec::new();
424        match path {
425            Some((_len, nodes)) => {
426                for chunk in nodes.chunks(2) {
427                    let start_frame_no = chunk[0];
428                    let end_frame_no = chunk[1];
429                    let timestamp = *self
430                        .graph
431                        .edges(start_frame_no)
432                        .find_map(|(_, to, ts)| (to == end_frame_no).then_some(ts))
433                        .unwrap();
434                    let key = SegmentKey {
435                        start_frame_no,
436                        end_frame_no,
437                        timestamp,
438                    };
439                    segments.push(key);
440                }
441            }
442            None => (),
443        }
444        SegmentSet {
445            segments,
446            namespace: self.namespace.clone(),
447        }
448    }
449
450    pub fn last_frame_no(&self) -> u64 {
451        self.last_frame_no
452    }
453
454    pub fn segment_count(&self) -> usize {
455        self.graph.node_count() / 2
456    }
457}
458
459/// A set of segments, with the guarantee that segments are non-overlapping and increasing in
460/// frameno
461#[derive(Clone)]
462pub struct SegmentSet {
463    namespace: NamespaceName,
464    segments: Vec<SegmentKey>,
465}
466
467impl SegmentSet {
468    pub fn range(&self) -> Option<(u64, u64)> {
469        self.segments
470            .first()
471            .zip(self.segments.last())
472            .map(|(f, l)| (f.start_frame_no, l.end_frame_no))
473    }
474}
475
476impl Deref for SegmentSet {
477    type Target = [SegmentKey];
478
479    fn deref(&self) -> &Self::Target {
480        &self.segments
481    }
482}
483
484async fn sync_one<B: Backend>(
485    backend: &B,
486    namespace: &NamespaceName,
487    id: u64,
488    conn: &rusqlite::Connection,
489    full: bool,
490) -> Result<()> {
491    let until = if full {
492        get_last_frame_no(conn, id)?
493    } else {
494        None
495    };
496
497    let segs = backend.list_segments(backend.default_config(), &namespace, 0);
498    tokio::pin!(segs);
499
500    while let Some(info) = segs.next().await {
501        let info = info.unwrap();
502        register_segment_info(&conn, &info, id)?;
503        if let Some(until) = until {
504            if info.key.start_frame_no <= until {
505                break;
506            }
507        }
508    }
509
510    Ok(())
511}
512
513fn list_segments<'a>(
514    conn: &'a rusqlite::Connection,
515    namespace: &'a NamespaceName,
516    mut f: impl FnMut(SegmentInfo),
517) -> Result<()> {
518    let mut stmt = conn.prepare_cached(
519        r#"
520    SELECT timestamp, size, start_frame_no, end_frame_no
521    FROM segments as s
522    JOIN monitored_namespaces as m
523    ON m.id == s.namespace_id
524    WHERE m.namespace_name = ?
525    ORDER BY end_frame_no, start_frame_no
526    "#,
527    )?;
528
529    let iter = stmt.query_map([namespace.as_str()], |r| {
530        Ok(SegmentInfo {
531            key: SegmentKey {
532                start_frame_no: r.get(2)?,
533                end_frame_no: r.get(3)?,
534                timestamp: r.get(0)?,
535            },
536            size: r.get(1)?,
537        })
538    })?;
539
540    for info in iter {
541        let info = info?;
542        f(info);
543    }
544
545    Ok(())
546}
547
548fn list_namespace<'a>(
549    conn: &'a rusqlite::Connection,
550    mut f: impl FnMut(NamespaceName),
551) -> Result<()> {
552    let mut stmt = conn.prepare_cached(r#"SELECT namespace_name FROM monitored_namespaces"#)?;
553
554    stmt.query_map((), |r| {
555        let n = NamespaceName::from_string(r.get(0)?);
556        f(n);
557        Ok(())
558    })?
559    .try_for_each(|c| c)?;
560
561    Ok(())
562}
563
564fn register_segment_info(
565    conn: &rusqlite::Connection,
566    info: &SegmentInfo,
567    namespace_id: u64,
568) -> Result<()> {
569    let mut stmt = conn.prepare_cached(
570        r#"
571    INSERT OR IGNORE INTO segments (
572        start_frame_no,
573        end_frame_no,
574        timestamp,
575        size,
576        namespace_id
577    ) 
578    VALUES (?, ?, ?, ?, ?)"#,
579    )?;
580    stmt.execute((
581        info.key.start_frame_no,
582        info.key.end_frame_no,
583        info.key.timestamp,
584        info.size,
585        namespace_id,
586    ))?;
587    Ok(())
588}
589
590fn segments_range(
591    conn: &rusqlite::Connection,
592    namespace: &NamespaceName,
593) -> Result<Option<(SegmentInfo, SegmentInfo)>> {
594    let mut stmt = conn.prepare_cached(
595        r#"
596    SELECT min(timestamp), size, start_frame_no, end_frame_no
597    FROM segments as s
598    JOIN monitored_namespaces as m
599    ON m.id == s.namespace_id
600    WHERE m.namespace_name = ?
601    LIMIT 1
602"#,
603    )?;
604    let first = stmt
605        .query_row([namespace.as_str()], |r| {
606            Ok(SegmentInfo {
607                key: SegmentKey {
608                    start_frame_no: r.get(2)?,
609                    end_frame_no: r.get(3)?,
610                    timestamp: r.get(0)?,
611                },
612                size: r.get(1)?,
613            })
614        })
615        .optional()?;
616
617    let mut stmt = conn.prepare_cached(
618        r#"
619    SELECT max(timestamp), size, start_frame_no, end_frame_no
620    FROM segments as s
621    JOIN monitored_namespaces as m
622    ON m.id == s.namespace_id
623    WHERE m.namespace_name = ?
624    LIMIT 1
625"#,
626    )?;
627    let last = stmt
628        .query_row([namespace.as_str()], |r| {
629            Ok(SegmentInfo {
630                key: SegmentKey {
631                    start_frame_no: r.get(2)?,
632                    end_frame_no: r.get(3)?,
633                    timestamp: r.get(0)?,
634                },
635                size: r.get(1)?,
636            })
637        })
638        .optional()?;
639
640    Ok(first.zip(last))
641}
642
643fn get_last_frame_no(conn: &rusqlite::Connection, namespace_id: u64) -> Result<Option<u64>> {
644    let mut stmt =
645        conn.prepare_cached("SELECT MAX(end_frame_no) FROM segments WHERE namespace_id = ?")?;
646    Ok(stmt.query_row([namespace_id], |row| row.get(0))?)
647}
648
649fn unmonitor(conn: &rusqlite::Connection, namespace: &NamespaceName) -> Result<()> {
650    conn.execute(
651        "DELETE FROM monitored_namespaces WHERE namespace_name = ?",
652        [namespace.as_str()],
653    )?;
654    Ok(())
655}