Skip to main content

interstice_core/persistence/
table_store.rs

1use std::collections::HashMap;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use interstice_abi::{IndexKey, PersistenceKind, Row, decode, encode};
9use serde::{Deserialize, Serialize};
10
11use crate::{error::IntersticeError, runtime::table::Table};
12
13const SNAPSHOT_VERSION: u16 = 1;
14const SNAPSHOT_INTERVAL: u64 = 256;
15
16#[derive(Clone, Debug)]
17pub struct SnapshotPlan {
18    pub module: String,
19    pub table: String,
20    pub seq: u64,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub enum LogOperation {
25    Insert {
26        primary_key: IndexKey,
27        row: Option<Row>,
28    },
29    Update {
30        primary_key: IndexKey,
31        row: Option<Row>,
32    },
33    Delete {
34        primary_key: IndexKey,
35    },
36    Clear,
37}
38
39#[derive(Clone, Debug, Hash, PartialEq, Eq)]
40struct TableKey {
41    module: String,
42    table: String,
43}
44
45impl TableKey {
46    fn new(module: &str, table: &str) -> Self {
47        Self {
48            module: module.to_string(),
49            table: table.to_string(),
50        }
51    }
52}
53
54#[derive(Debug)]
55struct TableState {
56    persistence: PersistenceKind,
57    next_seq: u64,
58    last_snapshot_seq: u64,
59}
60
61impl TableState {
62    fn new(persistence: PersistenceKind) -> Self {
63        Self {
64            persistence,
65            next_seq: 0,
66            last_snapshot_seq: 0,
67        }
68    }
69}
70
71pub struct TableStore {
72    modules_root: Option<PathBuf>,
73    tables: Mutex<HashMap<TableKey, Arc<Mutex<TableState>>>>,
74}
75
76impl TableStore {
77    pub fn new(root: Option<PathBuf>) -> Self {
78        Self {
79            modules_root: root,
80            tables: Mutex::new(HashMap::new()),
81        }
82    }
83
84    pub fn in_memory() -> Self {
85        Self::new(None)
86    }
87
88    pub fn record_logged_operation(
89        &self,
90        module: &str,
91        table: &str,
92        operation: LogOperation,
93    ) -> Result<Option<SnapshotPlan>, IntersticeError> {
94        let Some(root) = &self.modules_root else {
95            return Ok(None);
96        };
97        let state = self.get_or_create_state(module, table, PersistenceKind::Logged)?;
98        let log_path = self.log_path(root, module, table);
99        let mut guard = state.lock().unwrap();
100        guard.persistence = PersistenceKind::Logged;
101        let seq = guard.next_seq;
102        guard.next_seq += 1;
103
104        let entry = TableLogEntry::new(seq, operation);
105        Self::append_log_entry(&log_path, &entry)?;
106
107        let needs_snapshot = seq.saturating_sub(guard.last_snapshot_seq) >= SNAPSHOT_INTERVAL;
108        if needs_snapshot {
109            Ok(Some(SnapshotPlan {
110                module: module.to_string(),
111                table: table.to_string(),
112                seq,
113            }))
114        } else {
115            Ok(None)
116        }
117    }
118
119    pub fn snapshot_logged_table(
120        &self,
121        plan: SnapshotPlan,
122        rows: Vec<Row>,
123    ) -> Result<(), IntersticeError> {
124        let Some(root) = &self.modules_root else {
125            return Ok(());
126        };
127
128        let state = self.get_or_create_state(&plan.module, &plan.table, PersistenceKind::Logged)?;
129        let module_paths = self.ensure_module_dirs(root, &plan.module)?;
130        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", plan.table));
131        let log_path = module_paths.logs.join(format!("{}.log", plan.table));
132
133        {
134            let mut guard = state.lock().unwrap();
135            Self::write_snapshot_file(&snapshot_path, plan.seq, &rows)?;
136            Self::compact_log(&log_path, plan.seq)?;
137            guard.last_snapshot_seq = plan.seq;
138        }
139
140        Ok(())
141    }
142
143    pub fn persist_stateful_operation(
144        &self,
145        module: &str,
146        table: &str,
147        operation: LogOperation,
148        rows: Vec<Row>,
149    ) -> Result<(), IntersticeError> {
150        let Some(root) = &self.modules_root else {
151            return Ok(());
152        };
153
154        let state = self.get_or_create_state(module, table, PersistenceKind::Stateful)?;
155        let module_paths = self.ensure_module_dirs(root, module)?;
156        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table));
157        let log_path = module_paths.logs.join(format!("{}.log", table));
158
159        let mut guard = state.lock().unwrap();
160        let seq = guard.next_seq;
161        guard.next_seq += 1;
162
163        Self::write_snapshot_file(&snapshot_path, seq, &rows)?;
164        Self::append_log_entry(&log_path, &TableLogEntry::new(seq, operation))?;
165        guard.last_snapshot_seq = seq;
166        Ok(())
167    }
168
169    pub fn restore_table(&self, module: &str, table: &mut Table) -> Result<(), IntersticeError> {
170        let Some(root) = &self.modules_root else {
171            return Ok(());
172        };
173
174        if table.schema.persistence == PersistenceKind::Ephemeral {
175            let state =
176                self.get_or_create_state(module, &table.schema.name, PersistenceKind::Ephemeral)?;
177            let mut guard = state.lock().unwrap();
178            guard.persistence = PersistenceKind::Ephemeral;
179            guard.last_snapshot_seq = 0;
180            guard.next_seq = 0;
181            return Ok(());
182        }
183
184        let table_name = table.schema.name.clone();
185        let module_paths = self.ensure_module_dirs(root, module)?;
186        let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table_name));
187        let log_path = module_paths.logs.join(format!("{}.log", table_name));
188        let snapshot = Self::read_snapshot_file(&snapshot_path)?;
189
190        table.restore_from_rows(snapshot.rows)?;
191        let mut last_seq = snapshot.last_seq;
192
193        if table.schema.persistence != PersistenceKind::Stateful {
194            Self::read_log_entries(&log_path, |entry| {
195                if entry.seq > snapshot.last_seq {
196                    TableStore::apply_entry(table, &entry.operation)?;
197                    last_seq = entry.seq;
198                }
199                Ok(())
200            })?;
201        }
202
203        let state =
204            self.get_or_create_state(module, &table_name, table.schema.persistence.clone())?;
205        let mut guard = state.lock().unwrap();
206        guard.persistence = table.schema.persistence.clone();
207        guard.last_snapshot_seq = last_seq;
208        guard.next_seq = last_seq.saturating_add(1);
209
210        Ok(())
211    }
212
213    pub fn clear_all(&self) -> Result<(), IntersticeError> {
214        let Some(root) = &self.modules_root else {
215            return Ok(());
216        };
217
218        if root.exists() {
219            for entry in fs::read_dir(root).map_err(|err| {
220                IntersticeError::Internal(format!("Unable to read modules dir: {err}"))
221            })? {
222                if let Ok(entry) = entry {
223                    let path = entry.path();
224                    if path.is_dir() {
225                        let logs = path.join("logs");
226                        if logs.exists() {
227                            fs::remove_dir_all(&logs).map_err(|err| {
228                                IntersticeError::Internal(format!(
229                                    "Failed to clear logs for {:?}: {}",
230                                    logs, err
231                                ))
232                            })?;
233                        }
234                        fs::create_dir_all(&logs).map_err(|err| {
235                            IntersticeError::Internal(format!(
236                                "Failed to recreate logs dir {:?}: {}",
237                                logs, err
238                            ))
239                        })?;
240
241                        let snapshots = path.join("snapshots");
242                        if snapshots.exists() {
243                            fs::remove_dir_all(&snapshots).map_err(|err| {
244                                IntersticeError::Internal(format!(
245                                    "Failed to clear snapshots for {:?}: {}",
246                                    snapshots, err
247                                ))
248                            })?;
249                        }
250                        fs::create_dir_all(&snapshots).map_err(|err| {
251                            IntersticeError::Internal(format!(
252                                "Failed to recreate snapshots dir {:?}: {}",
253                                snapshots, err
254                            ))
255                        })?;
256                    }
257                }
258            }
259        }
260
261        self.tables.lock().unwrap().clear();
262        Ok(())
263    }
264
265    pub fn cleanup_module(&self, module: &str) {
266        let mut tables = self.tables.lock().unwrap();
267        tables.retain(|key, _| key.module != module);
268    }
269
270    fn get_or_create_state(
271        &self,
272        module: &str,
273        table: &str,
274        persistence: PersistenceKind,
275    ) -> Result<Arc<Mutex<TableState>>, IntersticeError> {
276        let mut tables = self.tables.lock().unwrap();
277        if let Some(state) = tables.get(&TableKey::new(module, table)) {
278            return Ok(state.clone());
279        }
280
281        let state = Arc::new(Mutex::new(TableState::new(persistence)));
282        tables.insert(TableKey::new(module, table), state.clone());
283        Ok(state)
284    }
285
286    fn ensure_module_dirs(
287        &self,
288        root: &Path,
289        module: &str,
290    ) -> Result<ModulePaths, IntersticeError> {
291        let module_dir = root.join(module);
292        let logs = module_dir.join("logs");
293        let snapshots = module_dir.join("snapshots");
294        fs::create_dir_all(&logs).map_err(|err| {
295            IntersticeError::Internal(format!(
296                "Failed to create logs dir for module {}: {}",
297                module, err
298            ))
299        })?;
300        fs::create_dir_all(&snapshots).map_err(|err| {
301            IntersticeError::Internal(format!(
302                "Failed to create snapshots dir for module {}: {}",
303                module, err
304            ))
305        })?;
306        Ok(ModulePaths { logs, snapshots })
307    }
308
309    fn log_path(&self, root: &Path, module: &str, table: &str) -> PathBuf {
310        root.join(module)
311            .join("logs")
312            .join(format!("{}.log", table))
313    }
314
315    fn append_log_entry(path: &Path, entry: &TableLogEntry) -> Result<(), IntersticeError> {
316        let encoded = encode(entry).map_err(|err| {
317            IntersticeError::Internal(format!("Failed to encode log entry: {err}"))
318        })?;
319        let mut file = OpenOptions::new()
320            .create(true)
321            .append(true)
322            .open(path)
323            .map_err(|err| {
324                IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
325            })?;
326
327        let length = (encoded.len() as u32).to_le_bytes();
328        file.write_all(&length).map_err(|err| {
329            IntersticeError::Internal(format!("Failed to write log length: {err}"))
330        })?;
331        file.write_all(&encoded).map_err(|err| {
332            IntersticeError::Internal(format!("Failed to write log entry: {err}"))
333        })?;
334        file.sync_data()
335            .map_err(|err| IntersticeError::Internal(format!("Failed to sync log file: {err}")))?;
336        Ok(())
337    }
338
339    fn read_log_entries<F>(path: &Path, mut visitor: F) -> Result<(), IntersticeError>
340    where
341        F: FnMut(TableLogEntry) -> Result<(), IntersticeError>,
342    {
343        if !path.exists() {
344            return Ok(());
345        }
346
347        let mut file = File::open(path).map_err(|err| {
348            IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
349        })?;
350        file.seek(SeekFrom::Start(0))
351            .map_err(|err| IntersticeError::Internal(format!("Failed to seek log file: {err}")))?;
352
353        loop {
354            let mut len_buf = [0u8; 4];
355            if file.read_exact(&mut len_buf).is_err() {
356                break;
357            }
358            let length = u32::from_le_bytes(len_buf) as usize;
359            let mut buffer = vec![0u8; length];
360            file.read_exact(&mut buffer).map_err(|err| {
361                IntersticeError::Internal(format!("Failed to read log entry: {err}"))
362            })?;
363            let entry: TableLogEntry = decode(&buffer).map_err(|err| {
364                IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
365            })?;
366            visitor(entry)?;
367        }
368
369        Ok(())
370    }
371
372    fn write_snapshot_file(path: &Path, seq: u64, rows: &[Row]) -> Result<(), IntersticeError> {
373        let snapshot = TableSnapshot {
374            version: SNAPSHOT_VERSION,
375            last_seq: seq,
376            rows: rows.to_vec(),
377        };
378        let encoded = encode(&snapshot).map_err(|err| {
379            IntersticeError::Internal(format!("Failed to encode snapshot: {err}"))
380        })?;
381        let tmp_path = path.with_extension("snap.tmp");
382        {
383            let mut file = File::create(&tmp_path).map_err(|err| {
384                IntersticeError::Internal(format!(
385                    "Failed to create snapshot temp file {:?}: {}",
386                    tmp_path, err
387                ))
388            })?;
389            file.write_all(&encoded).map_err(|err| {
390                IntersticeError::Internal(format!("Failed to write snapshot: {err}"))
391            })?;
392            file.sync_all().map_err(|err| {
393                IntersticeError::Internal(format!("Failed to sync snapshot: {err}"))
394            })?;
395        }
396        fs::rename(&tmp_path, path).map_err(|err| {
397            IntersticeError::Internal(format!("Failed to finalize snapshot {:?}: {}", path, err))
398        })?;
399        Ok(())
400    }
401
402    fn read_snapshot_file(path: &Path) -> Result<TableSnapshot, IntersticeError> {
403        if !path.exists() {
404            return Ok(TableSnapshot {
405                version: SNAPSHOT_VERSION,
406                last_seq: 0,
407                rows: Vec::new(),
408            });
409        }
410        let bytes = fs::read(path).map_err(|err| {
411            IntersticeError::Internal(format!("Failed to read snapshot {:?}: {}", path, err))
412        })?;
413        decode(&bytes)
414            .map_err(|err| IntersticeError::Internal(format!("Failed to decode snapshot: {err}")))
415    }
416
417    fn compact_log(path: &Path, keep_after_seq: u64) -> Result<(), IntersticeError> {
418        if !path.exists() {
419            return Ok(());
420        }
421        let tmp_path = path.with_extension("log.tmp");
422        let mut reader = File::open(path).map_err(|err| {
423            IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
424        })?;
425        let mut writer = File::create(&tmp_path).map_err(|err| {
426            IntersticeError::Internal(format!("Failed to create temp log: {err}"))
427        })?;
428
429        loop {
430            let mut len_buf = [0u8; 4];
431            if reader.read_exact(&mut len_buf).is_err() {
432                break;
433            }
434            let length = u32::from_le_bytes(len_buf) as usize;
435            let mut buffer = vec![0u8; length];
436            reader.read_exact(&mut buffer).map_err(|err| {
437                IntersticeError::Internal(format!("Failed to read log entry: {err}"))
438            })?;
439            let entry: TableLogEntry = decode(&buffer).map_err(|err| {
440                IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
441            })?;
442            if entry.seq > keep_after_seq {
443                writer.write_all(&len_buf).map_err(|err| {
444                    IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
445                })?;
446                writer.write_all(&buffer).map_err(|err| {
447                    IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
448                })?;
449            }
450        }
451
452        writer.sync_all().map_err(|err| {
453            IntersticeError::Internal(format!("Failed to sync compacted log: {err}"))
454        })?;
455        fs::rename(&tmp_path, path).map_err(|err| {
456            IntersticeError::Internal(format!("Failed to replace log file: {err}"))
457        })?;
458        Ok(())
459    }
460
461    fn apply_entry(table: &mut Table, op: &LogOperation) -> Result<(), IntersticeError> {
462        match op {
463            LogOperation::Insert { row, .. } => {
464                let row = row.clone().ok_or_else(|| {
465                    IntersticeError::Internal("Missing row data for log insert".into())
466                })?;
467                table.insert(row)?;
468            }
469            LogOperation::Update { row, .. } => {
470                let row = row.clone().ok_or_else(|| {
471                    IntersticeError::Internal("Missing row data for log update".into())
472                })?;
473                table.update(row)?;
474            }
475            LogOperation::Delete { primary_key } => {
476                let _ = table.delete(primary_key)?;
477            }
478            LogOperation::Clear => {
479                table.clear();
480            }
481        }
482        Ok(())
483    }
484
485    pub fn forget_module(&self, module: &str) {
486        self.cleanup_module(module);
487    }
488}
489
490struct ModulePaths {
491    logs: PathBuf,
492    snapshots: PathBuf,
493}
494
495#[derive(Serialize, Deserialize)]
496struct TableLogEntry {
497    seq: u64,
498    timestamp_ms: u64,
499    operation: LogOperation,
500}
501
502impl TableLogEntry {
503    fn new(seq: u64, operation: LogOperation) -> Self {
504        let timestamp_ms = SystemTime::now()
505            .duration_since(UNIX_EPOCH)
506            .unwrap_or_default()
507            .as_millis() as u64;
508        Self {
509            seq,
510            timestamp_ms,
511            operation,
512        }
513    }
514}
515
516#[derive(Serialize, Deserialize)]
517struct TableSnapshot {
518    version: u16,
519    last_seq: u64,
520    rows: Vec<Row>,
521}