1use std::ops::Deref;
2use std::path::Path;
3use std::path::PathBuf;
4use std::sync::Arc;
5
6use chrono::DateTime;
7use fst::map::OpBuilder;
8use fst::Streamer;
9use libsql_sys::name::NamespaceName;
10use libsql_sys::rusqlite::OptionalExtension;
11use libsql_sys::rusqlite::{self, TransactionBehavior};
12use tempfile::tempdir;
13use tokio_stream::StreamExt;
14use uuid::Uuid;
15use zerocopy::AsBytes;
16
17use crate::io::buf::ZeroCopyBuf;
18use crate::io::FileExt;
19use crate::segment::compacted::CompactedSegment;
20use crate::segment::compacted::CompactedSegmentDataFooter;
21use crate::segment::compacted::CompactedSegmentDataHeader;
22use crate::segment::Frame;
23use crate::storage::backend::SegmentMeta;
24use crate::LIBSQL_MAGIC;
25use crate::LIBSQL_PAGE_SIZE;
26use crate::LIBSQL_WAL_VERSION;
27
28use super::backend::Backend;
29use super::{SegmentInfo, SegmentKey};
30
31pub mod strategy;
32
33type Result<T, E = Error> = std::result::Result<T, E>;
34
35#[derive(Debug, thiserror::Error)]
36pub enum Error {
37 #[error("error reading from meta db: {0}")]
38 Meta(#[from] rusqlite::Error),
39 #[error("io error: {0}")]
40 Io(#[from] std::io::Error),
41 #[error("storage error: {0}")]
42 Storage(#[from] crate::storage::Error),
43}
44
45pub struct Compactor<B> {
46 backend: Arc<B>,
47 meta: rusqlite::Connection,
48 path: PathBuf,
49}
50
51impl<B> Compactor<B> {
52 pub fn new(backend: Arc<B>, compactor_path: &Path) -> Result<Self> {
53 let meta = rusqlite::Connection::open(compactor_path.join("meta.db"))?;
54 meta.pragma_update(None, "journal_mode", "wal")?;
56 meta.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap();
57 meta.execute(
58 r#"CREATE TABLE IF NOT EXISTS segments (
59 start_frame_no INTEGER,
60 end_frame_no INTEGER,
61 timestamp DATE,
62 size INTEGER,
63 namespace_id INTEGER REFERENCES monitored_namespaces(id) ON DELETE CASCADE,
64 PRIMARY KEY (start_frame_no, end_frame_no))
65 "#,
66 (),
67 )?;
68
69 Ok(Self {
70 backend,
71 meta,
72 path: compactor_path.into(),
73 })
74 }
75
76 pub async fn monitor(&mut self, namespace: &NamespaceName) -> Result<()>
77 where
78 B: Backend,
79 {
80 let tx = self.meta.transaction()?;
81 let id = {
82 let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO monitored_namespaces(namespace_name) VALUES (?) RETURNING id")?;
83 stmt.query_row([namespace.as_str()], |r| r.get(0))
84 .optional()?
85 };
86
87 if let Some(id) = id {
88 sync_one(self.backend.as_ref(), namespace, id, &tx, true).await?;
89 }
90
91 tx.commit()?;
92
93 Ok(())
94 }
95
96 pub fn analyze(&self, namespace: &NamespaceName) -> Result<AnalyzedSegments> {
97 let mut stmt = self.meta.prepare_cached(
98 r#"
99 SELECT start_frame_no, end_frame_no, timestamp
100 FROM segments as s
101 JOIN monitored_namespaces as m
102 ON m.id = s.namespace_id
103 WHERE m.namespace_name = ?"#,
104 )?;
105 let mut rows = stmt.query([namespace.as_str()])?;
106 let mut graph = petgraph::graphmap::DiGraphMap::new();
107 let mut last_frame_no = 0;
108 while let Some(row) = rows.next()? {
109 let start_frame_no: u64 = row.get(0)?;
110 let end_frame_no: u64 = row.get(1)?;
111 let timestamp: u64 = row.get(2)?;
112 graph.add_edge(start_frame_no, end_frame_no, timestamp);
113 if start_frame_no != 1 {
114 graph.add_edge(start_frame_no - 1, start_frame_no, 0);
115 }
116 last_frame_no = last_frame_no.max(end_frame_no);
117 }
118
119 Ok(AnalyzedSegments {
120 graph,
121 last_frame_no,
122 namespace: namespace.clone(),
123 })
124 }
125
126 pub fn get_segment_range(
127 &self,
128 namespace: &NamespaceName,
129 ) -> Result<Option<(SegmentInfo, SegmentInfo)>> {
130 segments_range(&self.meta, namespace)
131 }
132
133 #[tracing::instrument(skip(self))]
135 async fn sync_latest(&self) -> Result<()>
136 where
137 B: Backend,
138 {
139 Ok(())
143 }
144
145 pub async fn sync_all(&mut self, full: bool) -> Result<()>
147 where
148 B: Backend,
149 {
150 let tx = self
151 .meta
152 .transaction_with_behavior(TransactionBehavior::Immediate)?;
153 {
154 let mut stmt = tx.prepare("SELECT namespace_name, id FROM monitored_namespaces")?;
155 let mut namespace_rows = stmt.query(())?;
156 while let Some(row) = namespace_rows.next()? {
157 let namespace = NamespaceName::from_string(row.get::<_, String>(0)?);
158 let id = row.get::<_, u64>(1)?;
159 sync_one(self.backend.as_ref(), &namespace, id, &tx, full).await?;
160 }
161 }
162
163 tx.commit()?;
164
165 Ok(())
166 }
167
168 pub async fn sync_one(&mut self, namespace: &NamespaceName, full: bool) -> Result<()>
169 where
170 B: Backend,
171 {
172 let tx = self
173 .meta
174 .transaction_with_behavior(TransactionBehavior::Immediate)?;
175 {
176 let mut stmt =
177 tx.prepare_cached("SELECT id FROM monitored_namespaces WHERE namespace_name = ?")?;
178 let id = stmt
179 .query_row([namespace.as_str()], |row| row.get(0))
180 .optional()?;
181 if let Some(id) = id {
182 sync_one(self.backend.as_ref(), &namespace, id, &tx, full).await?;
183 }
184 }
185
186 tx.commit()?;
187
188 Ok(())
189 }
190
191 async fn fetch(
192 &self,
193 set: &SegmentSet,
194 into: &Path,
195 ) -> Result<(
196 Vec<CompactedSegment<std::fs::File>>,
197 Vec<fst::Map<Arc<[u8]>>>,
198 )>
199 where
200 B: Backend,
201 {
202 let mut indexes = Vec::with_capacity(set.len());
203 let mut segments = Vec::with_capacity(set.len());
204 for key in set.iter() {
205 let file = std::fs::File::options()
206 .create_new(true)
207 .write(true)
208 .read(true)
209 .open(into.join(&format!("{key}.data")))
210 .unwrap();
211 let header = self
212 .backend
213 .fetch_segment_data_to_file(
214 &self.backend.default_config(),
215 &set.namespace,
216 key,
217 &file,
218 )
219 .await
220 .unwrap();
221 let index = self
222 .backend
223 .fetch_segment_index(&self.backend.default_config(), &set.namespace, key)
224 .await
225 .unwrap();
226 indexes.push(index);
227 segments.push(CompactedSegment::from_parts(file, header));
228 }
229
230 Ok((segments, indexes))
231 }
232
233 pub async fn compact(&self, set: SegmentSet) -> Result<()>
234 where
235 B: Backend,
236 {
237 assert!(!set.is_empty());
238 let tmp = tempdir().unwrap();
239 let (segments, indexes) = self.fetch(&set, tmp.path()).await?;
245 let last_header = segments.last().expect("non-empty set").header();
246
247 let mut union = OpBuilder::from_iter(indexes.iter()).union();
251 let mut count = 0;
252 while let Some(_) = union.next() {
253 count += 1;
254 }
255
256 let mut hasher = crc32fast::Hasher::new();
257
258 let out_file = std::fs::File::options()
259 .create_new(true)
260 .write(true)
261 .read(true)
262 .open(tmp.path().join("out"))
263 .unwrap();
264 let header = CompactedSegmentDataHeader {
265 frame_count: (count as u32).into(),
266 segment_id: Uuid::new_v4().to_u128_le().into(),
267 start_frame_no: set.range().expect("non-empty set").0.into(),
268 end_frame_no: set.range().expect("non-empty set").1.into(),
269 size_after: last_header.size_after,
270 version: LIBSQL_WAL_VERSION.into(),
271 magic: LIBSQL_MAGIC.into(),
272 page_size: last_header.page_size,
273 timestamp: last_header.timestamp,
276 };
277
278 hasher.update(header.as_bytes());
279 let (_, ret) = out_file
280 .write_all_at_async(ZeroCopyBuf::new_init(header), 0)
281 .await;
282 ret?;
283
284 let mut union = OpBuilder::from_iter(indexes.iter()).union();
285 let mut buffer = Box::new(ZeroCopyBuf::<Frame>::new_uninit());
286 let mut out_index = fst::MapBuilder::memory();
287 let mut current_offset = 0;
288
289 while let Some((page_no_bytes, indexed_offsets)) = union.next() {
290 let (index, offset) = indexed_offsets
291 .iter()
292 .max_by_key(|v| v.index)
293 .map(|v| (v.index, v.value))
294 .expect("union returned something, must be non-empty");
295 let segment = &segments[index];
296 let (frame, ret) = segment.read_frame(buffer, offset as u32).await;
297 ret?;
298 hasher.update(&frame.get_ref().as_bytes());
299 let dest_offset =
300 size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
301 let (mut frame, ret) = out_file.write_all_at_async(frame, dest_offset as u64).await;
302 ret?;
303 out_index
304 .insert(page_no_bytes, current_offset as _)
305 .unwrap();
306 current_offset += 1;
307 frame.deinit();
308 buffer = frame;
309 }
310
311 let footer = CompactedSegmentDataFooter {
312 checksum: hasher.finalize().into(),
313 };
314
315 let footer_offset =
316 size_of::<CompactedSegmentDataHeader>() + current_offset * size_of::<Frame>();
317 let (_, ret) = out_file
318 .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _)
319 .await;
320 ret?;
321
322 let (start, end) = set.range().expect("non-empty set");
323 let timestamp = DateTime::from_timestamp_millis(set.last().unwrap().timestamp as _)
324 .unwrap()
325 .to_utc();
326 self.backend
327 .store(
328 &self.backend.default_config(),
329 SegmentMeta {
330 namespace: set.namespace.clone(),
331 segment_id: Uuid::new_v4(),
332 start_frame_no: start,
333 end_frame_no: end,
334 segment_timestamp: timestamp,
335 },
336 out_file,
337 out_index.into_inner().unwrap(),
338 )
339 .await?;
340
341 Ok(())
342 }
343
344 pub async fn restore(&self, set: SegmentSet, to: impl AsRef<Path>) -> Result<()>
347 where
348 B: Backend,
349 {
350 if set.is_empty() {
351 return Ok(());
352 }
353 assert_eq!(set.range().unwrap().0, 1);
354 let tmp = tempdir()?;
355 let (segments, indexes) = self.fetch(&set, tmp.path()).await?;
356 let mut union = OpBuilder::from_iter(indexes.iter()).union();
357 let mut buffer = Vec::with_capacity(LIBSQL_PAGE_SIZE as usize);
358 let out_file = std::fs::File::create(to)?;
359
360 while let Some((page_no_bytes, indexed_offsets)) = union.next() {
361 let page_no = u32::from_be_bytes(page_no_bytes.try_into().unwrap());
362 let (index, offset) = indexed_offsets
363 .iter()
364 .max_by_key(|v| v.index)
365 .map(|v| (v.index, v.value as u32))
366 .expect("union returned something, must be non-empty");
367 let segment = &segments[index];
368 let (b, ret) = segment.read_page(buffer, offset).await;
369 ret?;
370 let offset = (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64;
371 let (mut b, ret) = out_file.write_all_at_async(b, offset).await;
372 ret?;
373 b.clear();
374 buffer = b;
375 }
376
377 Ok(())
378 }
379
380 pub fn list_all_segments(
381 &self,
382 namespace: &NamespaceName,
383 f: impl FnMut(SegmentInfo),
384 ) -> Result<()> {
385 list_segments(&self.meta, namespace, f)
386 }
387
388 pub fn list_monitored_namespaces(&self, f: impl FnMut(NamespaceName)) -> Result<()> {
389 list_namespace(&self.meta, f)
390 }
391
392 pub fn unmonitor(&self, ns: &NamespaceName) -> Result<()> {
393 unmonitor(&self.meta, ns)
394 }
395}
396
397pub struct AnalyzedSegments {
398 graph: petgraph::graphmap::DiGraphMap<u64, u64>,
399 last_frame_no: u64,
400 namespace: NamespaceName,
401}
402
403impl AnalyzedSegments {
404 pub fn shortest_restore_path(&self) -> SegmentSet {
406 if self.graph.node_count() == 0 {
407 return SegmentSet {
408 namespace: self.namespace.clone(),
409 segments: Vec::new(),
410 };
411 }
412
413 let path = petgraph::algo::astar(
414 &self.graph,
415 1,
416 |n| n == self.last_frame_no,
417 |(_, _, &x)| if x == 0 { 1 } else { 0 },
421 |n| self.last_frame_no - n,
422 );
423 let mut segments = Vec::new();
424 match path {
425 Some((_len, nodes)) => {
426 for chunk in nodes.chunks(2) {
427 let start_frame_no = chunk[0];
428 let end_frame_no = chunk[1];
429 let timestamp = *self
430 .graph
431 .edges(start_frame_no)
432 .find_map(|(_, to, ts)| (to == end_frame_no).then_some(ts))
433 .unwrap();
434 let key = SegmentKey {
435 start_frame_no,
436 end_frame_no,
437 timestamp,
438 };
439 segments.push(key);
440 }
441 }
442 None => (),
443 }
444 SegmentSet {
445 segments,
446 namespace: self.namespace.clone(),
447 }
448 }
449
450 pub fn last_frame_no(&self) -> u64 {
451 self.last_frame_no
452 }
453
454 pub fn segment_count(&self) -> usize {
455 self.graph.node_count() / 2
456 }
457}
458
459#[derive(Clone)]
462pub struct SegmentSet {
463 namespace: NamespaceName,
464 segments: Vec<SegmentKey>,
465}
466
467impl SegmentSet {
468 pub fn range(&self) -> Option<(u64, u64)> {
469 self.segments
470 .first()
471 .zip(self.segments.last())
472 .map(|(f, l)| (f.start_frame_no, l.end_frame_no))
473 }
474}
475
476impl Deref for SegmentSet {
477 type Target = [SegmentKey];
478
479 fn deref(&self) -> &Self::Target {
480 &self.segments
481 }
482}
483
484async fn sync_one<B: Backend>(
485 backend: &B,
486 namespace: &NamespaceName,
487 id: u64,
488 conn: &rusqlite::Connection,
489 full: bool,
490) -> Result<()> {
491 let until = if full {
492 get_last_frame_no(conn, id)?
493 } else {
494 None
495 };
496
497 let segs = backend.list_segments(backend.default_config(), &namespace, 0);
498 tokio::pin!(segs);
499
500 while let Some(info) = segs.next().await {
501 let info = info.unwrap();
502 register_segment_info(&conn, &info, id)?;
503 if let Some(until) = until {
504 if info.key.start_frame_no <= until {
505 break;
506 }
507 }
508 }
509
510 Ok(())
511}
512
513fn list_segments<'a>(
514 conn: &'a rusqlite::Connection,
515 namespace: &'a NamespaceName,
516 mut f: impl FnMut(SegmentInfo),
517) -> Result<()> {
518 let mut stmt = conn.prepare_cached(
519 r#"
520 SELECT timestamp, size, start_frame_no, end_frame_no
521 FROM segments as s
522 JOIN monitored_namespaces as m
523 ON m.id == s.namespace_id
524 WHERE m.namespace_name = ?
525 ORDER BY end_frame_no, start_frame_no
526 "#,
527 )?;
528
529 let iter = stmt.query_map([namespace.as_str()], |r| {
530 Ok(SegmentInfo {
531 key: SegmentKey {
532 start_frame_no: r.get(2)?,
533 end_frame_no: r.get(3)?,
534 timestamp: r.get(0)?,
535 },
536 size: r.get(1)?,
537 })
538 })?;
539
540 for info in iter {
541 let info = info?;
542 f(info);
543 }
544
545 Ok(())
546}
547
548fn list_namespace<'a>(
549 conn: &'a rusqlite::Connection,
550 mut f: impl FnMut(NamespaceName),
551) -> Result<()> {
552 let mut stmt = conn.prepare_cached(r#"SELECT namespace_name FROM monitored_namespaces"#)?;
553
554 stmt.query_map((), |r| {
555 let n = NamespaceName::from_string(r.get(0)?);
556 f(n);
557 Ok(())
558 })?
559 .try_for_each(|c| c)?;
560
561 Ok(())
562}
563
564fn register_segment_info(
565 conn: &rusqlite::Connection,
566 info: &SegmentInfo,
567 namespace_id: u64,
568) -> Result<()> {
569 let mut stmt = conn.prepare_cached(
570 r#"
571 INSERT OR IGNORE INTO segments (
572 start_frame_no,
573 end_frame_no,
574 timestamp,
575 size,
576 namespace_id
577 )
578 VALUES (?, ?, ?, ?, ?)"#,
579 )?;
580 stmt.execute((
581 info.key.start_frame_no,
582 info.key.end_frame_no,
583 info.key.timestamp,
584 info.size,
585 namespace_id,
586 ))?;
587 Ok(())
588}
589
590fn segments_range(
591 conn: &rusqlite::Connection,
592 namespace: &NamespaceName,
593) -> Result<Option<(SegmentInfo, SegmentInfo)>> {
594 let mut stmt = conn.prepare_cached(
595 r#"
596 SELECT min(timestamp), size, start_frame_no, end_frame_no
597 FROM segments as s
598 JOIN monitored_namespaces as m
599 ON m.id == s.namespace_id
600 WHERE m.namespace_name = ?
601 LIMIT 1
602"#,
603 )?;
604 let first = stmt
605 .query_row([namespace.as_str()], |r| {
606 Ok(SegmentInfo {
607 key: SegmentKey {
608 start_frame_no: r.get(2)?,
609 end_frame_no: r.get(3)?,
610 timestamp: r.get(0)?,
611 },
612 size: r.get(1)?,
613 })
614 })
615 .optional()?;
616
617 let mut stmt = conn.prepare_cached(
618 r#"
619 SELECT max(timestamp), size, start_frame_no, end_frame_no
620 FROM segments as s
621 JOIN monitored_namespaces as m
622 ON m.id == s.namespace_id
623 WHERE m.namespace_name = ?
624 LIMIT 1
625"#,
626 )?;
627 let last = stmt
628 .query_row([namespace.as_str()], |r| {
629 Ok(SegmentInfo {
630 key: SegmentKey {
631 start_frame_no: r.get(2)?,
632 end_frame_no: r.get(3)?,
633 timestamp: r.get(0)?,
634 },
635 size: r.get(1)?,
636 })
637 })
638 .optional()?;
639
640 Ok(first.zip(last))
641}
642
643fn get_last_frame_no(conn: &rusqlite::Connection, namespace_id: u64) -> Result<Option<u64>> {
644 let mut stmt =
645 conn.prepare_cached("SELECT MAX(end_frame_no) FROM segments WHERE namespace_id = ?")?;
646 Ok(stmt.query_row([namespace_id], |row| row.get(0))?)
647}
648
649fn unmonitor(conn: &rusqlite::Connection, namespace: &NamespaceName) -> Result<()> {
650 conn.execute(
651 "DELETE FROM monitored_namespaces WHERE namespace_name = ?",
652 [namespace.as_str()],
653 )?;
654 Ok(())
655}