Skip to main content

interstice_core/persistence/
table_store.rs

1use std::collections::HashMap;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5use parking_lot::Mutex;
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use interstice_abi::{IndexKey, PersistenceKind, Row, decode, encode};
10use serde::{Deserialize, Serialize};
11
12use crate::{error::IntersticeError, runtime::table::Table};
13
14const SNAPSHOT_VERSION: u16 = 1;
15const SNAPSHOT_INTERVAL: u64 = 256;
16
17#[derive(Clone, Debug)]
18pub struct SnapshotPlan {
19    pub module: String,
20    pub table: String,
21    pub seq: u64,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub enum LogOperation {
26    Insert {
27        primary_key: IndexKey,
28        row: Option<Row>,
29    },
30    Update {
31        primary_key: IndexKey,
32        row: Option<Row>,
33    },
34    Delete {
35        primary_key: IndexKey,
36    },
37    Clear,
38}
39
40#[derive(Clone, Debug, Hash, PartialEq, Eq)]
41struct TableKey {
42    module: String,
43    table: String,
44}
45
46impl TableKey {
47    fn new(module: &str, table: &str) -> Self {
48        Self {
49            module: module.to_string(),
50            table: table.to_string(),
51        }
52    }
53}
54
55#[derive(Debug)]
56struct TableState {
57    persistence: PersistenceKind,
58    next_seq: u64,
59    last_snapshot_seq: u64,
60}
61
62impl TableState {
63    fn new(persistence: PersistenceKind) -> Self {
64        Self {
65            persistence,
66            next_seq: 0,
67            last_snapshot_seq: 0,
68        }
69    }
70}
71
72/// Persistent open file handle for async WAL writes.
73struct WalWriter {
74    file: File,
75    /// Absolute path to this log file (for re-opening a sync fd).
76    path: PathBuf,
77    /// True when entries have been written but not yet fsynced.
78    dirty: bool,
79}
80
81/// Key into the dirty-stateful map: (module, table, primary key).
82#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83struct StatefulRowKey {
84    module: String,
85    table: String,
86    pk: IndexKey,
87}
88
89pub struct TableStore {
90    modules_root: Option<PathBuf>,
91    tables: Mutex<HashMap<TableKey, Arc<Mutex<TableState>>>>,
92    /// Open WAL log-file handles keyed by absolute path.
93    /// Hot path: write without fsync; background WAL thread fsyncs every 10ms.
94    wal_writers: Arc<Mutex<HashMap<PathBuf, WalWriter>>>,
95    /// Pending stateful row writes — flushed to disk by background thread every 10ms.
96    /// `None` value means the row was deleted.
97    /// Bounded by the number of unique rows touched (not by operation count).
98    dirty_stateful: Arc<Mutex<HashMap<StatefulRowKey, Option<Row>>>>,
99}
100
101impl TableStore {
102    pub fn new(root: Option<PathBuf>) -> Self {
103        Self {
104            modules_root: root,
105            tables: Mutex::new(HashMap::new()),
106            wal_writers: Arc::new(Mutex::new(HashMap::new())),
107            dirty_stateful: Arc::new(Mutex::new(HashMap::new())),
108        }
109    }
110
111    pub fn in_memory() -> Self {
112        Self::new(None)
113    }
114
115    pub fn record_logged_operation(
116        &self,
117        module: &str,
118        table: &str,
119        operation: LogOperation,
120    ) -> Result<Option<SnapshotPlan>, IntersticeError> {
121        let Some(root) = &self.modules_root else {
122            return Ok(None);
123        };
124        let state = self.get_or_create_state(module, table, PersistenceKind::Logged)?;
125        let log_path = self.log_path(root, module, table);
126        let mut guard = state.lock();
127        guard.persistence = PersistenceKind::Logged;
128        let seq = guard.next_seq;
129        guard.next_seq += 1;
130
131        let entry = TableLogEntry::new(seq, operation);
132        // Write without fsync — background WAL thread fsyncs every 10ms.
133        self.write_log_entry_async(&log_path, &entry, module)?;
134
135        let needs_snapshot = seq.saturating_sub(guard.last_snapshot_seq) >= SNAPSHOT_INTERVAL;
136        if needs_snapshot {
137            Ok(Some(SnapshotPlan {
138                module: module.to_string(),
139                table: table.to_string(),
140                seq,
141            }))
142        } else {
143            Ok(None)
144        }
145    }
146
147    pub fn snapshot_logged_table(
148        &self,
149        plan: SnapshotPlan,
150        rows: Vec<Row>,
151    ) -> Result<(), IntersticeError> {
152        let Some(root) = &self.modules_root else {
153            return Ok(());
154        };
155
156        // Flush WAL before writing snapshot so log is durable through this seq.
157        self.flush_wal();
158
159        let state = self.get_or_create_state(&plan.module, &plan.table, PersistenceKind::Logged)?;
160        let module_paths = self.ensure_module_dirs(root, &plan.module)?;
161        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", plan.table));
162        let log_path = module_paths.logs.join(format!("{}.log", plan.table));
163
164        {
165            let mut guard = state.lock();
166            Self::write_snapshot_file(&snapshot_path, plan.seq, &rows)?;
167            // compact_log replaces the file; drop the stale WalWriter so it reopens.
168            Self::compact_log(&log_path, plan.seq)?;
169            self.wal_writers.lock().remove(&log_path);
170            guard.last_snapshot_seq = plan.seq;
171        }
172
173        Ok(())
174    }
175
176    /// Flush all pending WAL writes to disk.
177    ///
178    /// Called by the background WAL thread every 10ms.
179    /// Also called synchronously before writing a snapshot.
180    pub fn flush_wal(&self) {
181        // 1. Collect dirty log file paths and mark them clean — hold the lock
182        //    only for this brief step, NOT during the actual sync_data call.
183        //    This prevents sync_data (which can take ~5ms) from blocking the
184        //    reducer loop that also acquires wal_writers to append entries.
185        let dirty_paths: Vec<PathBuf> = {
186            let mut writers = self.wal_writers.lock();
187            let paths: Vec<PathBuf> = writers
188                .values()
189                .filter(|w| w.dirty)
190                .map(|w| w.path.clone())
191                .collect();
192            for writer in writers.values_mut() {
193                writer.dirty = false;
194            }
195            paths
196        };
197
198        // Sync each dirty file using a fresh read-only handle so the write
199        // handle (and its lock) stays free for the reducer loop.
200        for path in dirty_paths {
201            if let Ok(file) = File::open(&path) {
202                let _ = file.sync_data();
203            }
204        }
205    }
206
207    fn stateful_dir(root: &Path, module: &str, table: &str) -> PathBuf {
208        root.join(module).join("stateful").join(table)
209    }
210
211    fn pk_filename(pk: &IndexKey) -> String {
212        // Hex-encode the msgpack-serialized primary key → safe filename
213        match encode(pk) {
214            Ok(bytes) => bytes.iter().map(|b| format!("{:02x}", b)).collect(),
215            Err(_) => "unknown".to_string(),
216        }
217    }
218
219    /// Stage a stateful row insert/update in the dirty map.
220    /// The background flush thread writes it to disk within 10ms.
221    pub fn persist_stateful_insert(
222        &self,
223        module: &str,
224        table: &str,
225        pk: &IndexKey,
226        row: &Row,
227    ) -> Result<(), IntersticeError> {
228        if self.modules_root.is_none() {
229            return Ok(());
230        }
231        let key = StatefulRowKey {
232            module: module.to_string(),
233            table: table.to_string(),
234            pk: pk.clone(),
235        };
236        self.dirty_stateful.lock().insert(key, Some(row.clone()));
237        Ok(())
238    }
239
240    pub fn persist_stateful_update(
241        &self,
242        module: &str,
243        table: &str,
244        pk: &IndexKey,
245        row: &Row,
246    ) -> Result<(), IntersticeError> {
247        self.persist_stateful_insert(module, table, pk, row)
248    }
249
250    /// Stage a stateful row deletion in the dirty map (None = delete on flush).
251    pub fn persist_stateful_delete(
252        &self,
253        module: &str,
254        table: &str,
255        pk: &IndexKey,
256    ) -> Result<(), IntersticeError> {
257        if self.modules_root.is_none() {
258            return Ok(());
259        }
260        let key = StatefulRowKey {
261            module: module.to_string(),
262            table: table.to_string(),
263            pk: pk.clone(),
264        };
265        self.dirty_stateful.lock().insert(key, None);
266        Ok(())
267    }
268
269    /// Clear: flush immediately (table reset is rare and correctness-critical).
270    pub fn persist_stateful_clear(
271        &self,
272        module: &str,
273        table: &str,
274    ) -> Result<(), IntersticeError> {
275        let Some(root) = &self.modules_root else {
276            return Ok(());
277        };
278        // Drop all pending dirty writes for this table — they're about to be irrelevant.
279        self.dirty_stateful.lock().retain(|k, _| !(k.module == module && k.table == table));
280        // Synchronously wipe the directory (clear is rare, correctness beats latency).
281        let dir = Self::stateful_dir(root, module, table);
282        if dir.exists() {
283            fs::remove_dir_all(&dir).map_err(|e| {
284                IntersticeError::Internal(format!("Failed to clear stateful dir: {}", e))
285            })?;
286            fs::create_dir_all(&dir).map_err(|e| {
287                IntersticeError::Internal(format!("Failed to recreate stateful dir: {}", e))
288            })?;
289        }
290        Ok(())
291    }
292
293    /// Flush all pending stateful row writes to disk.
294    /// Called by the background WAL thread every 10ms alongside `flush_wal`.
295    pub fn flush_stateful(&self) {
296        let Some(root) = &self.modules_root else {
297            return;
298        };
299        // Swap with an empty map — hold the lock only for this instant.
300        let batch = {
301            let mut guard = self.dirty_stateful.lock();
302            if guard.is_empty() {
303                return;
304            }
305            std::mem::take(&mut *guard)
306        };
307
308        for (key, row_opt) in batch {
309            let dir = Self::stateful_dir(root, &key.module, &key.table);
310            let _ = fs::create_dir_all(&dir);
311            let path = dir.join(format!("{}.row", Self::pk_filename(&key.pk)));
312            match row_opt {
313                Some(row) => {
314                    if let Ok(encoded) = encode(&row) {
315                        let tmp = path.with_extension("row.tmp");
316                        if let Ok(mut f) = File::create(&tmp) {
317                            let _ = f.write_all(&encoded);
318                            let _ = fs::rename(&tmp, &path);
319                        }
320                    }
321                }
322                None => {
323                    let _ = fs::remove_file(&path);
324                }
325            }
326        }
327    }
328
329    pub fn restore_table(&self, module: &str, table: &mut Table) -> Result<(), IntersticeError> {
330        let Some(root) = &self.modules_root else {
331            return Ok(());
332        };
333
334        if table.schema.persistence == PersistenceKind::Ephemeral {
335            let state =
336                self.get_or_create_state(module, &table.schema.name, PersistenceKind::Ephemeral)?;
337            let mut guard = state.lock();
338            guard.persistence = PersistenceKind::Ephemeral;
339            guard.last_snapshot_seq = 0;
340            guard.next_seq = 0;
341            return Ok(());
342        }
343
344        // Stateful: restore from per-row files (no WAL)
345        if table.schema.persistence == PersistenceKind::Stateful {
346            let dir = Self::stateful_dir(root, module, &table.schema.name);
347            if dir.exists() {
348                for entry in fs::read_dir(&dir).map_err(|e| {
349                    IntersticeError::Internal(format!("Failed to read stateful dir: {}", e))
350                })? {
351                    let entry = entry.map_err(|e| {
352                        IntersticeError::Internal(format!("Failed to read dir entry: {}", e))
353                    })?;
354                    let path = entry.path();
355                    if path.extension().and_then(|e| e.to_str()) == Some("row") {
356                        let bytes = fs::read(&path).map_err(|e| {
357                            IntersticeError::Internal(format!("Failed to read row file: {}", e))
358                        })?;
359                        let row: Row = decode(&bytes).map_err(|e| {
360                            IntersticeError::Internal(format!("Failed to decode row: {}", e))
361                        })?;
362                        table.insert(row)?;
363                    }
364                }
365            }
366            let state = self.get_or_create_state(module, &table.schema.name, PersistenceKind::Stateful)?;
367            let mut guard = state.lock();
368            guard.persistence = PersistenceKind::Stateful;
369            return Ok(());
370        }
371
372        // Logged: snapshot + WAL replay
373        let table_name = table.schema.name.clone();
374        let module_paths = self.ensure_module_dirs(root, module)?;
375        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table_name));
376        let log_path = module_paths.logs.join(format!("{}.log", table_name));
377        let snapshot = Self::read_snapshot_file(&snapshot_path)?;
378
379        table.restore_from_rows(snapshot.rows)?;
380        let mut last_seq = snapshot.last_seq;
381
382        Self::read_log_entries(&log_path, |entry| {
383            if entry.seq > snapshot.last_seq {
384                TableStore::apply_entry(table, &entry.operation)?;
385                last_seq = entry.seq;
386            }
387            Ok(())
388        })?;
389
390        let state =
391            self.get_or_create_state(module, &table_name, table.schema.persistence.clone())?;
392        let mut guard = state.lock();
393        guard.persistence = table.schema.persistence.clone();
394        guard.last_snapshot_seq = last_seq;
395        guard.next_seq = last_seq.saturating_add(1);
396
397        Ok(())
398    }
399
400    pub fn clear_all(&self) -> Result<(), IntersticeError> {
401        let Some(root) = &self.modules_root else {
402            return Ok(());
403        };
404
405        // Close all open WAL file handles before clearing.
406        self.wal_writers.lock().clear();
407
408        if root.exists() {
409            for entry in fs::read_dir(root).map_err(|err| {
410                IntersticeError::Internal(format!("Unable to read modules dir: {err}"))
411            })? {
412                if let Ok(entry) = entry {
413                    let path = entry.path();
414                    if path.is_dir() {
415                        let logs = path.join("logs");
416                        if logs.exists() {
417                            fs::remove_dir_all(&logs).map_err(|err| {
418                                IntersticeError::Internal(format!(
419                                    "Failed to clear logs for {:?}: {}",
420                                    logs, err
421                                ))
422                            })?;
423                        }
424                        fs::create_dir_all(&logs).map_err(|err| {
425                            IntersticeError::Internal(format!(
426                                "Failed to recreate logs dir {:?}: {}",
427                                logs, err
428                            ))
429                        })?;
430
431                        let snapshots = path.join("snapshots");
432                        if snapshots.exists() {
433                            fs::remove_dir_all(&snapshots).map_err(|err| {
434                                IntersticeError::Internal(format!(
435                                    "Failed to clear snapshots for {:?}: {}",
436                                    snapshots, err
437                                ))
438                            })?;
439                        }
440                        fs::create_dir_all(&snapshots).map_err(|err| {
441                            IntersticeError::Internal(format!(
442                                "Failed to recreate snapshots dir {:?}: {}",
443                                snapshots, err
444                            ))
445                        })?;
446
447                        let stateful = path.join("stateful");
448                        if stateful.exists() {
449                            fs::remove_dir_all(&stateful).map_err(|err| {
450                                IntersticeError::Internal(format!(
451                                    "Failed to clear stateful for {:?}: {}",
452                                    stateful, err
453                                ))
454                            })?;
455                        }
456                        // Don't recreate stateful dir — it's created on demand per table
457                    }
458                }
459            }
460        }
461
462        self.tables.lock().clear();
463        Ok(())
464    }
465
466    pub fn cleanup_module(&self, module: &str) {
467        self.tables.lock().retain(|key, _| key.module != module);
468        // Close WAL writers for this module's log files.
469        if let Some(root) = &self.modules_root {
470            let module_log_dir = root.join(module).join("logs");
471            self.wal_writers
472                .lock()
473                
474                .retain(|path, _| !path.starts_with(&module_log_dir));
475        }
476        // Remove stateful row files for this module.
477        if let Some(root) = &self.modules_root {
478            let stateful_module_dir = root.join(module).join("stateful");
479            if stateful_module_dir.exists() {
480                let _ = fs::remove_dir_all(&stateful_module_dir);
481            }
482        }
483    }
484
485    fn get_or_create_state(
486        &self,
487        module: &str,
488        table: &str,
489        persistence: PersistenceKind,
490    ) -> Result<Arc<Mutex<TableState>>, IntersticeError> {
491        let mut tables = self.tables.lock();
492        if let Some(state) = tables.get(&TableKey::new(module, table)) {
493            return Ok(state.clone());
494        }
495
496        let state = Arc::new(Mutex::new(TableState::new(persistence)));
497        tables.insert(TableKey::new(module, table), state.clone());
498        Ok(state)
499    }
500
501    fn ensure_module_dirs(
502        &self,
503        root: &Path,
504        module: &str,
505    ) -> Result<ModulePaths, IntersticeError> {
506        let module_dir = root.join(module);
507        let logs = module_dir.join("logs");
508        let snapshots = module_dir.join("snapshots");
509        fs::create_dir_all(&logs).map_err(|err| {
510            IntersticeError::Internal(format!(
511                "Failed to create logs dir for module {}: {}",
512                module, err
513            ))
514        })?;
515        fs::create_dir_all(&snapshots).map_err(|err| {
516            IntersticeError::Internal(format!(
517                "Failed to create snapshots dir for module {}: {}",
518                module, err
519            ))
520        })?;
521        Ok(ModulePaths { logs, snapshots })
522    }
523
524    fn log_path(&self, root: &Path, module: &str, table: &str) -> PathBuf {
525        root.join(module)
526            .join("logs")
527            .join(format!("{}.log", table))
528    }
529
530    /// Write a log entry to disk without fsync (async WAL hot path).
531    /// Reuses an open file handle; opens one on first write to this path.
532    fn write_log_entry_async(
533        &self,
534        path: &PathBuf,
535        entry: &TableLogEntry,
536        module: &str,
537    ) -> Result<(), IntersticeError> {
538        let encoded = encode(entry).map_err(|err| {
539            IntersticeError::Internal(format!("Failed to encode log entry: {err}"))
540        })?;
541        let length = (encoded.len() as u32).to_le_bytes();
542
543        // Fast path: reuse the already-open file handle.
544        {
545            let mut writers = self.wal_writers.lock();
546            if let Some(writer) = writers.get_mut(path) {
547                writer.file.write_all(&length).map_err(|err| {
548                    IntersticeError::Internal(format!("Failed to write log length: {err}"))
549                })?;
550                writer.file.write_all(&encoded).map_err(|err| {
551                    IntersticeError::Internal(format!("Failed to write log entry: {err}"))
552                })?;
553                writer.dirty = true;
554                return Ok(());
555            }
556        }
557
558        // Slow path: first write to this path — open the file.
559        if let Some(root) = &self.modules_root {
560            self.ensure_module_dirs(root, module)?;
561        }
562        let file = OpenOptions::new()
563            .create(true)
564            .append(true)
565            .open(path)
566            .map_err(|err| {
567                IntersticeError::Internal(format!(
568                    "Failed to open log file {:?}: {}",
569                    path, err
570                ))
571            })?;
572
573        let mut writers = self.wal_writers.lock();
574        let writer = writers
575            .entry(path.clone())
576            .or_insert(WalWriter { file, path: path.clone(), dirty: false });
577        writer.file.write_all(&length).map_err(|err| {
578            IntersticeError::Internal(format!("Failed to write log length: {err}"))
579        })?;
580        writer.file.write_all(&encoded).map_err(|err| {
581            IntersticeError::Internal(format!("Failed to write log entry: {err}"))
582        })?;
583        writer.dirty = true;
584        Ok(())
585    }
586
587    fn read_log_entries<F>(path: &Path, mut visitor: F) -> Result<(), IntersticeError>
588    where
589        F: FnMut(TableLogEntry) -> Result<(), IntersticeError>,
590    {
591        if !path.exists() {
592            return Ok(());
593        }
594
595        let mut file = File::open(path).map_err(|err| {
596            IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
597        })?;
598        file.seek(SeekFrom::Start(0))
599            .map_err(|err| IntersticeError::Internal(format!("Failed to seek log file: {err}")))?;
600
601        loop {
602            let mut len_buf = [0u8; 4];
603            if file.read_exact(&mut len_buf).is_err() {
604                break;
605            }
606            let length = u32::from_le_bytes(len_buf) as usize;
607            let mut buffer = vec![0u8; length];
608            file.read_exact(&mut buffer).map_err(|err| {
609                IntersticeError::Internal(format!("Failed to read log entry: {err}"))
610            })?;
611            let entry: TableLogEntry = decode(&buffer).map_err(|err| {
612                IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
613            })?;
614            visitor(entry)?;
615        }
616
617        Ok(())
618    }
619
620    fn write_snapshot_file(path: &Path, seq: u64, rows: &[Row]) -> Result<(), IntersticeError> {
621        let snapshot = TableSnapshot {
622            version: SNAPSHOT_VERSION,
623            last_seq: seq,
624            rows: rows.to_vec(),
625        };
626        let encoded = encode(&snapshot).map_err(|err| {
627            IntersticeError::Internal(format!("Failed to encode snapshot: {err}"))
628        })?;
629        let tmp_path = path.with_extension("snap.tmp");
630        {
631            let mut file = File::create(&tmp_path).map_err(|err| {
632                IntersticeError::Internal(format!(
633                    "Failed to create snapshot temp file {:?}: {}",
634                    tmp_path, err
635                ))
636            })?;
637            file.write_all(&encoded).map_err(|err| {
638                IntersticeError::Internal(format!("Failed to write snapshot: {err}"))
639            })?;
640            file.sync_all().map_err(|err| {
641                IntersticeError::Internal(format!("Failed to sync snapshot: {err}"))
642            })?;
643        }
644        fs::rename(&tmp_path, path).map_err(|err| {
645            IntersticeError::Internal(format!("Failed to finalize snapshot {:?}: {}", path, err))
646        })?;
647        Ok(())
648    }
649
650    fn read_snapshot_file(path: &Path) -> Result<TableSnapshot, IntersticeError> {
651        if !path.exists() {
652            return Ok(TableSnapshot {
653                version: SNAPSHOT_VERSION,
654                last_seq: 0,
655                rows: Vec::new(),
656            });
657        }
658        let bytes = fs::read(path).map_err(|err| {
659            IntersticeError::Internal(format!("Failed to read snapshot {:?}: {}", path, err))
660        })?;
661        decode(&bytes)
662            .map_err(|err| IntersticeError::Internal(format!("Failed to decode snapshot: {err}")))
663    }
664
665    fn compact_log(path: &Path, keep_after_seq: u64) -> Result<(), IntersticeError> {
666        if !path.exists() {
667            return Ok(());
668        }
669        let tmp_path = path.with_extension("log.tmp");
670        let mut reader = File::open(path).map_err(|err| {
671            IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
672        })?;
673        let mut writer = File::create(&tmp_path).map_err(|err| {
674            IntersticeError::Internal(format!("Failed to create temp log: {err}"))
675        })?;
676
677        loop {
678            let mut len_buf = [0u8; 4];
679            if reader.read_exact(&mut len_buf).is_err() {
680                break;
681            }
682            let length = u32::from_le_bytes(len_buf) as usize;
683            let mut buffer = vec![0u8; length];
684            reader.read_exact(&mut buffer).map_err(|err| {
685                IntersticeError::Internal(format!("Failed to read log entry: {err}"))
686            })?;
687            let entry: TableLogEntry = decode(&buffer).map_err(|err| {
688                IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
689            })?;
690            if entry.seq > keep_after_seq {
691                writer.write_all(&len_buf).map_err(|err| {
692                    IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
693                })?;
694                writer.write_all(&buffer).map_err(|err| {
695                    IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
696                })?;
697            }
698        }
699
700        writer.sync_all().map_err(|err| {
701            IntersticeError::Internal(format!("Failed to sync compacted log: {err}"))
702        })?;
703        fs::rename(&tmp_path, path).map_err(|err| {
704            IntersticeError::Internal(format!("Failed to replace log file: {err}"))
705        })?;
706        Ok(())
707    }
708
709    fn apply_entry(table: &mut Table, op: &LogOperation) -> Result<(), IntersticeError> {
710        match op {
711            LogOperation::Insert { row, .. } => {
712                let row = row.clone().ok_or_else(|| {
713                    IntersticeError::Internal("Missing row data for log insert".into())
714                })?;
715                table.insert(row)?;
716            }
717            LogOperation::Update { row, .. } => {
718                let row = row.clone().ok_or_else(|| {
719                    IntersticeError::Internal("Missing row data for log update".into())
720                })?;
721                table.update(row)?;
722            }
723            LogOperation::Delete { primary_key } => {
724                let _ = table.delete(primary_key)?;
725            }
726            LogOperation::Clear => {
727                table.clear();
728            }
729        }
730        Ok(())
731    }
732
733    pub fn forget_module(&self, module: &str) {
734        self.cleanup_module(module);
735    }
736}
737
738struct ModulePaths {
739    logs: PathBuf,
740    snapshots: PathBuf,
741}
742
743#[derive(Serialize, Deserialize)]
744struct TableLogEntry {
745    seq: u64,
746    timestamp_ms: u64,
747    operation: LogOperation,
748}
749
750impl TableLogEntry {
751    fn new(seq: u64, operation: LogOperation) -> Self {
752        let timestamp_ms = SystemTime::now()
753            .duration_since(UNIX_EPOCH)
754            .unwrap_or_default()
755            .as_millis() as u64;
756        Self {
757            seq,
758            timestamp_ms,
759            operation,
760        }
761    }
762}
763
764#[derive(Serialize, Deserialize)]
765struct TableSnapshot {
766    version: u16,
767    last_seq: u64,
768    rows: Vec<Row>,
769}