Skip to main content

interstice_core/persistence/
table_store.rs

1use std::collections::HashMap;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use interstice_abi::{decode, encode, IndexKey, PersistenceKind, Row};
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    error::IntersticeError,
13    runtime::table::Table,
14};
15
16const SNAPSHOT_VERSION: u16 = 1;
17const SNAPSHOT_INTERVAL: u64 = 256;
18
19#[derive(Clone, Debug)]
20pub struct SnapshotPlan {
21    pub module: String,
22    pub table: String,
23    pub seq: u64,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum LogOperation {
28    Insert {
29        primary_key: IndexKey,
30        row: Option<Row>,
31    },
32    Update {
33        primary_key: IndexKey,
34        row: Option<Row>,
35    },
36    Delete {
37        primary_key: IndexKey,
38    },
39}
40
41#[derive(Clone, Debug, Hash, PartialEq, Eq)]
42struct TableKey {
43    module: String,
44    table: String,
45}
46
47impl TableKey {
48    fn new(module: &str, table: &str) -> Self {
49        Self {
50            module: module.to_string(),
51            table: table.to_string(),
52        }
53    }
54}
55
56#[derive(Debug)]
57struct TableState {
58    persistence: PersistenceKind,
59    next_seq: u64,
60    last_snapshot_seq: u64,
61}
62
63impl TableState {
64    fn new(persistence: PersistenceKind) -> Self {
65        Self {
66            persistence,
67            next_seq: 0,
68            last_snapshot_seq: 0,
69        }
70    }
71}
72
73pub struct TableStore {
74    modules_root: Option<PathBuf>,
75    tables: Mutex<HashMap<TableKey, Arc<Mutex<TableState>>>>,
76}
77
78impl TableStore {
79    pub fn new(root: Option<PathBuf>) -> Self {
80        Self {
81            modules_root: root,
82            tables: Mutex::new(HashMap::new()),
83        }
84    }
85
86    pub fn in_memory() -> Self {
87        Self::new(None)
88    }
89
90    pub fn record_logged_operation(
91        &self,
92        module: &str,
93        table: &str,
94        operation: LogOperation,
95    ) -> Result<Option<SnapshotPlan>, IntersticeError> {
96        let Some(root) = &self.modules_root else {
97            return Ok(None);
98        };
99        let state = self.get_or_create_state(module, table, PersistenceKind::Logged)?;
100        let log_path = self.log_path(root, module, table);
101        let mut guard = state.lock().unwrap();
102        guard.persistence = PersistenceKind::Logged;
103        let seq = guard.next_seq;
104        guard.next_seq += 1;
105
106        let entry = TableLogEntry::new(seq, operation);
107        Self::append_log_entry(&log_path, &entry)?;
108
109        let needs_snapshot = seq.saturating_sub(guard.last_snapshot_seq) >= SNAPSHOT_INTERVAL;
110        if needs_snapshot {
111            Ok(Some(SnapshotPlan {
112                module: module.to_string(),
113                table: table.to_string(),
114                seq,
115            }))
116        } else {
117            Ok(None)
118        }
119    }
120
121    pub fn snapshot_logged_table(
122        &self,
123        plan: SnapshotPlan,
124        rows: Vec<Row>,
125    ) -> Result<(), IntersticeError> {
126        let Some(root) = &self.modules_root else {
127            return Ok(());
128        };
129
130        let state = self.get_or_create_state(&plan.module, &plan.table, PersistenceKind::Logged)?;
131        let module_paths = self.ensure_module_dirs(root, &plan.module)?;
132        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", plan.table));
133        let log_path = module_paths.logs.join(format!("{}.log", plan.table));
134
135        {
136            let mut guard = state.lock().unwrap();
137            Self::write_snapshot_file(&snapshot_path, plan.seq, &rows)?;
138            Self::compact_log(&log_path, plan.seq)?;
139            guard.last_snapshot_seq = plan.seq;
140        }
141
142        Ok(())
143    }
144
145    pub fn persist_stateful_operation(
146        &self,
147        module: &str,
148        table: &str,
149        operation: LogOperation,
150        rows: Vec<Row>,
151    ) -> Result<(), IntersticeError> {
152        let Some(root) = &self.modules_root else {
153            return Ok(());
154        };
155
156        let state = self.get_or_create_state(module, table, PersistenceKind::Stateful)?;
157        let module_paths = self.ensure_module_dirs(root, module)?;
158        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table));
159        let log_path = module_paths.logs.join(format!("{}.log", table));
160
161        let mut guard = state.lock().unwrap();
162        let seq = guard.next_seq;
163        guard.next_seq += 1;
164
165        Self::write_snapshot_file(&snapshot_path, seq, &rows)?;
166        Self::append_log_entry(&log_path, &TableLogEntry::new(seq, operation))?;
167        guard.last_snapshot_seq = seq;
168        Ok(())
169    }
170
171    pub fn restore_table(
172        &self,
173        module: &str,
174        table: &mut Table,
175    ) -> Result<(), IntersticeError> {
176        let Some(root) = &self.modules_root else {
177            return Ok(());
178        };
179
180        let table_name = table.schema.name.clone();
181        let module_paths = self.ensure_module_dirs(root, module)?;
182        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table_name));
183        let log_path = module_paths.logs.join(format!("{}.log", table_name));
184        let snapshot = Self::read_snapshot_file(&snapshot_path)?;
185
186        table.restore_from_rows(snapshot.rows)?;
187        let mut last_seq = snapshot.last_seq;
188
189        if table.schema.persistence != PersistenceKind::Stateful {
190            Self::read_log_entries(&log_path, |entry| {
191                if entry.seq > snapshot.last_seq {
192                    TableStore::apply_entry(table, &entry.operation)?;
193                    last_seq = entry.seq;
194                }
195                Ok(())
196            })?;
197        }
198
199        let state = self.get_or_create_state(module, &table_name, table.schema.persistence.clone())?;
200        let mut guard = state.lock().unwrap();
201        guard.persistence = table.schema.persistence.clone();
202        guard.last_snapshot_seq = last_seq;
203        guard.next_seq = last_seq.saturating_add(1);
204
205        Ok(())
206    }
207
208    pub fn clear_all(&self) -> Result<(), IntersticeError> {
209        let Some(root) = &self.modules_root else {
210            return Ok(());
211        };
212
213        if root.exists() {
214            for entry in fs::read_dir(root).map_err(|err| {
215                IntersticeError::Internal(format!("Unable to read modules dir: {err}"))
216            })? {
217                if let Ok(entry) = entry {
218                    let path = entry.path();
219                    if path.is_dir() {
220                        let logs = path.join("logs");
221                        if logs.exists() {
222                            fs::remove_dir_all(&logs).map_err(|err| {
223                                IntersticeError::Internal(format!(
224                                    "Failed to clear logs for {:?}: {}",
225                                    logs, err
226                                ))
227                            })?;
228                        }
229                        fs::create_dir_all(&logs).map_err(|err| {
230                            IntersticeError::Internal(format!(
231                                "Failed to recreate logs dir {:?}: {}",
232                                logs, err
233                            ))
234                        })?;
235
236                        let snapshots = path.join("snapshots");
237                        if snapshots.exists() {
238                            fs::remove_dir_all(&snapshots).map_err(|err| {
239                                IntersticeError::Internal(format!(
240                                    "Failed to clear snapshots for {:?}: {}",
241                                    snapshots, err
242                                ))
243                            })?;
244                        }
245                        fs::create_dir_all(&snapshots).map_err(|err| {
246                            IntersticeError::Internal(format!(
247                                "Failed to recreate snapshots dir {:?}: {}",
248                                snapshots, err
249                            ))
250                        })?;
251                    }
252                }
253            }
254        }
255
256        self.tables.lock().unwrap().clear();
257        Ok(())
258    }
259
260    pub fn cleanup_module(&self, module: &str) {
261        let mut tables = self.tables.lock().unwrap();
262        tables.retain(|key, _| key.module != module);
263    }
264
265    fn get_or_create_state(
266        &self,
267        module: &str,
268        table: &str,
269        persistence: PersistenceKind,
270    ) -> Result<Arc<Mutex<TableState>>, IntersticeError> {
271        let mut tables = self.tables.lock().unwrap();
272        if let Some(state) = tables.get(&TableKey::new(module, table)) {
273            return Ok(state.clone());
274        }
275
276        let state = Arc::new(Mutex::new(TableState::new(persistence)));
277        tables.insert(TableKey::new(module, table), state.clone());
278        Ok(state)
279    }
280
281    fn ensure_module_dirs(
282        &self,
283        root: &Path,
284        module: &str,
285    ) -> Result<ModulePaths, IntersticeError> {
286        let module_dir = root.join(module);
287        let logs = module_dir.join("logs");
288        let snapshots = module_dir.join("snapshots");
289        fs::create_dir_all(&logs).map_err(|err| {
290            IntersticeError::Internal(format!(
291                "Failed to create logs dir for module {}: {}",
292                module, err
293            ))
294        })?;
295        fs::create_dir_all(&snapshots).map_err(|err| {
296            IntersticeError::Internal(format!(
297                "Failed to create snapshots dir for module {}: {}",
298                module, err
299            ))
300        })?;
301        Ok(ModulePaths { logs, snapshots })
302    }
303
304    fn log_path(&self, root: &Path, module: &str, table: &str) -> PathBuf {
305        root.join(module).join("logs").join(format!("{}.log", table))
306    }
307
308    fn append_log_entry(path: &Path, entry: &TableLogEntry) -> Result<(), IntersticeError> {
309        let encoded = encode(entry).map_err(|err| {
310            IntersticeError::Internal(format!("Failed to encode log entry: {err}"))
311        })?;
312        let mut file = OpenOptions::new()
313            .create(true)
314            .append(true)
315            .open(path)
316            .map_err(|err| {
317                IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
318            })?;
319
320        let length = (encoded.len() as u32).to_le_bytes();
321        file.write_all(&length).map_err(|err| {
322            IntersticeError::Internal(format!("Failed to write log length: {err}"))
323        })?;
324        file.write_all(&encoded).map_err(|err| {
325            IntersticeError::Internal(format!("Failed to write log entry: {err}"))
326        })?;
327        file.sync_data().map_err(|err| {
328            IntersticeError::Internal(format!("Failed to sync log file: {err}"))
329        })?;
330        Ok(())
331    }
332
333    fn read_log_entries<F>(
334        path: &Path,
335        mut visitor: F,
336    ) -> Result<(), IntersticeError>
337    where
338        F: FnMut(TableLogEntry) -> Result<(), IntersticeError>,
339    {
340        if !path.exists() {
341            return Ok(());
342        }
343
344        let mut file = File::open(path).map_err(|err| {
345            IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
346        })?;
347        file.seek(SeekFrom::Start(0)).map_err(|err| {
348            IntersticeError::Internal(format!("Failed to seek log file: {err}"))
349        })?;
350
351        loop {
352            let mut len_buf = [0u8; 4];
353            if file.read_exact(&mut len_buf).is_err() {
354                break;
355            }
356            let length = u32::from_le_bytes(len_buf) as usize;
357            let mut buffer = vec![0u8; length];
358            file.read_exact(&mut buffer).map_err(|err| {
359                IntersticeError::Internal(format!("Failed to read log entry: {err}"))
360            })?;
361            let entry: TableLogEntry = decode(&buffer).map_err(|err| {
362                IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
363            })?;
364            visitor(entry)?;
365        }
366
367        Ok(())
368    }
369
370    fn write_snapshot_file(
371        path: &Path,
372        seq: u64,
373        rows: &[Row],
374    ) -> Result<(), IntersticeError> {
375        let snapshot = TableSnapshot {
376            version: SNAPSHOT_VERSION,
377            last_seq: seq,
378            rows: rows.to_vec(),
379        };
380        let encoded = encode(&snapshot).map_err(|err| {
381            IntersticeError::Internal(format!("Failed to encode snapshot: {err}"))
382        })?;
383        let tmp_path = path.with_extension("snap.tmp");
384        {
385            let mut file = File::create(&tmp_path).map_err(|err| {
386                IntersticeError::Internal(format!(
387                    "Failed to create snapshot temp file {:?}: {}",
388                    tmp_path, err
389                ))
390            })?;
391            file.write_all(&encoded).map_err(|err| {
392                IntersticeError::Internal(format!("Failed to write snapshot: {err}"))
393            })?;
394            file.sync_all().map_err(|err| {
395                IntersticeError::Internal(format!("Failed to sync snapshot: {err}"))
396            })?;
397        }
398        fs::rename(&tmp_path, path).map_err(|err| {
399            IntersticeError::Internal(format!(
400                "Failed to finalize snapshot {:?}: {}",
401                path, err
402            ))
403        })?;
404        Ok(())
405    }
406
407    fn read_snapshot_file(path: &Path) -> Result<TableSnapshot, IntersticeError> {
408        if !path.exists() {
409            return Ok(TableSnapshot {
410                version: SNAPSHOT_VERSION,
411                last_seq: 0,
412                rows: Vec::new(),
413            });
414        }
415        let bytes = fs::read(path).map_err(|err| {
416            IntersticeError::Internal(format!("Failed to read snapshot {:?}: {}", path, err))
417        })?;
418        decode(&bytes).map_err(|err| {
419            IntersticeError::Internal(format!("Failed to decode snapshot: {err}"))
420        })
421    }
422
423    fn compact_log(path: &Path, keep_after_seq: u64) -> Result<(), IntersticeError> {
424        if !path.exists() {
425            return Ok(());
426        }
427        let tmp_path = path.with_extension("log.tmp");
428        let mut reader = File::open(path).map_err(|err| {
429            IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
430        })?;
431        let mut writer = File::create(&tmp_path).map_err(|err| {
432            IntersticeError::Internal(format!("Failed to create temp log: {err}"))
433        })?;
434
435        loop {
436            let mut len_buf = [0u8; 4];
437            if reader.read_exact(&mut len_buf).is_err() {
438                break;
439            }
440            let length = u32::from_le_bytes(len_buf) as usize;
441            let mut buffer = vec![0u8; length];
442            reader.read_exact(&mut buffer).map_err(|err| {
443                IntersticeError::Internal(format!("Failed to read log entry: {err}"))
444            })?;
445            let entry: TableLogEntry = decode(&buffer).map_err(|err| {
446                IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
447            })?;
448            if entry.seq > keep_after_seq {
449                writer.write_all(&len_buf).map_err(|err| {
450                    IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
451                })?;
452                writer.write_all(&buffer).map_err(|err| {
453                    IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
454                })?;
455            }
456        }
457
458        writer.sync_all().map_err(|err| {
459            IntersticeError::Internal(format!("Failed to sync compacted log: {err}"))
460        })?;
461        fs::rename(&tmp_path, path).map_err(|err| {
462            IntersticeError::Internal(format!("Failed to replace log file: {err}"))
463        })?;
464        Ok(())
465    }
466
467    fn apply_entry(table: &mut Table, op: &LogOperation) -> Result<(), IntersticeError> {
468        match op {
469            LogOperation::Insert { row, .. } => {
470                let row = row.clone().ok_or_else(|| {
471                    IntersticeError::Internal("Missing row data for log insert".into())
472                })?;
473                table.insert(row)?;
474            }
475            LogOperation::Update { row, .. } => {
476                let row = row.clone().ok_or_else(|| {
477                    IntersticeError::Internal("Missing row data for log update".into())
478                })?;
479                table.update(row)?;
480            }
481            LogOperation::Delete { primary_key } => {
482                let _ = table.delete(primary_key)?;
483            }
484        }
485        Ok(())
486    }
487
488    pub fn forget_module(&self, module: &str) {
489        self.cleanup_module(module);
490    }
491}
492
493struct ModulePaths {
494    logs: PathBuf,
495    snapshots: PathBuf,
496}
497
498#[derive(Serialize, Deserialize)]
499struct TableLogEntry {
500    seq: u64,
501    timestamp_ms: u64,
502    operation: LogOperation,
503}
504
505impl TableLogEntry {
506    fn new(seq: u64, operation: LogOperation) -> Self {
507        let timestamp_ms = SystemTime::now()
508            .duration_since(UNIX_EPOCH)
509            .unwrap_or_default()
510            .as_millis() as u64;
511        Self {
512            seq,
513            timestamp_ms,
514            operation,
515        }
516    }
517}
518
519#[derive(Serialize, Deserialize)]
520struct TableSnapshot {
521    version: u16,
522    last_seq: u64,
523    rows: Vec<Row>,
524}