1use std::collections::HashMap;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5use parking_lot::Mutex;
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use interstice_abi::{IndexKey, PersistenceKind, Row, decode, encode};
10use serde::{Deserialize, Serialize};
11
12use crate::{error::IntersticeError, runtime::table::Table};
13
14const SNAPSHOT_VERSION: u16 = 1;
15const SNAPSHOT_INTERVAL: u64 = 256;
16
17#[derive(Clone, Debug)]
18pub struct SnapshotPlan {
19 pub module: String,
20 pub table: String,
21 pub seq: u64,
22}
23
24#[derive(Clone, Debug, Serialize, Deserialize)]
25pub enum LogOperation {
26 Insert {
27 primary_key: IndexKey,
28 row: Option<Row>,
29 },
30 Update {
31 primary_key: IndexKey,
32 row: Option<Row>,
33 },
34 Delete {
35 primary_key: IndexKey,
36 },
37 Clear,
38}
39
40#[derive(Clone, Debug, Hash, PartialEq, Eq)]
41struct TableKey {
42 module: String,
43 table: String,
44}
45
46impl TableKey {
47 fn new(module: &str, table: &str) -> Self {
48 Self {
49 module: module.to_string(),
50 table: table.to_string(),
51 }
52 }
53}
54
55#[derive(Debug)]
56struct TableState {
57 persistence: PersistenceKind,
58 next_seq: u64,
59 last_snapshot_seq: u64,
60}
61
62impl TableState {
63 fn new(persistence: PersistenceKind) -> Self {
64 Self {
65 persistence,
66 next_seq: 0,
67 last_snapshot_seq: 0,
68 }
69 }
70}
71
72struct WalWriter {
74 file: File,
75 path: PathBuf,
77 dirty: bool,
79}
80
81#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83struct StatefulRowKey {
84 module: String,
85 table: String,
86 pk: IndexKey,
87}
88
89pub struct TableStore {
90 modules_root: Option<PathBuf>,
91 tables: Mutex<HashMap<TableKey, Arc<Mutex<TableState>>>>,
92 wal_writers: Arc<Mutex<HashMap<PathBuf, WalWriter>>>,
95 dirty_stateful: Arc<Mutex<HashMap<StatefulRowKey, Option<Row>>>>,
99}
100
101impl TableStore {
102 pub fn new(root: Option<PathBuf>) -> Self {
103 Self {
104 modules_root: root,
105 tables: Mutex::new(HashMap::new()),
106 wal_writers: Arc::new(Mutex::new(HashMap::new())),
107 dirty_stateful: Arc::new(Mutex::new(HashMap::new())),
108 }
109 }
110
111 pub fn in_memory() -> Self {
112 Self::new(None)
113 }
114
115 pub fn record_logged_operation(
116 &self,
117 module: &str,
118 table: &str,
119 operation: LogOperation,
120 ) -> Result<Option<SnapshotPlan>, IntersticeError> {
121 let Some(root) = &self.modules_root else {
122 return Ok(None);
123 };
124 let state = self.get_or_create_state(module, table, PersistenceKind::Logged)?;
125 let log_path = self.log_path(root, module, table);
126 let mut guard = state.lock();
127 guard.persistence = PersistenceKind::Logged;
128 let seq = guard.next_seq;
129 guard.next_seq += 1;
130
131 let entry = TableLogEntry::new(seq, operation);
132 self.write_log_entry_async(&log_path, &entry, module)?;
134
135 let needs_snapshot = seq.saturating_sub(guard.last_snapshot_seq) >= SNAPSHOT_INTERVAL;
136 if needs_snapshot {
137 Ok(Some(SnapshotPlan {
138 module: module.to_string(),
139 table: table.to_string(),
140 seq,
141 }))
142 } else {
143 Ok(None)
144 }
145 }
146
147 pub fn snapshot_logged_table(
148 &self,
149 plan: SnapshotPlan,
150 rows: Vec<Row>,
151 ) -> Result<(), IntersticeError> {
152 let Some(root) = &self.modules_root else {
153 return Ok(());
154 };
155
156 self.flush_wal();
158
159 let state = self.get_or_create_state(&plan.module, &plan.table, PersistenceKind::Logged)?;
160 let module_paths = self.ensure_module_dirs(root, &plan.module)?;
161 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", plan.table));
162 let log_path = module_paths.logs.join(format!("{}.log", plan.table));
163
164 {
165 let mut guard = state.lock();
166 Self::write_snapshot_file(&snapshot_path, plan.seq, &rows)?;
167 Self::compact_log(&log_path, plan.seq)?;
169 self.wal_writers.lock().remove(&log_path);
170 guard.last_snapshot_seq = plan.seq;
171 }
172
173 Ok(())
174 }
175
176 pub fn flush_wal(&self) {
181 let dirty_paths: Vec<PathBuf> = {
186 let mut writers = self.wal_writers.lock();
187 let paths: Vec<PathBuf> = writers
188 .values()
189 .filter(|w| w.dirty)
190 .map(|w| w.path.clone())
191 .collect();
192 for writer in writers.values_mut() {
193 writer.dirty = false;
194 }
195 paths
196 };
197
198 for path in dirty_paths {
201 if let Ok(file) = File::open(&path) {
202 let _ = file.sync_data();
203 }
204 }
205 }
206
207 fn stateful_dir(root: &Path, module: &str, table: &str) -> PathBuf {
208 root.join(module).join("stateful").join(table)
209 }
210
211 fn pk_filename(pk: &IndexKey) -> String {
212 match encode(pk) {
214 Ok(bytes) => bytes.iter().map(|b| format!("{:02x}", b)).collect(),
215 Err(_) => "unknown".to_string(),
216 }
217 }
218
219 pub fn persist_stateful_insert(
222 &self,
223 module: &str,
224 table: &str,
225 pk: &IndexKey,
226 row: &Row,
227 ) -> Result<(), IntersticeError> {
228 if self.modules_root.is_none() {
229 return Ok(());
230 }
231 let key = StatefulRowKey {
232 module: module.to_string(),
233 table: table.to_string(),
234 pk: pk.clone(),
235 };
236 self.dirty_stateful.lock().insert(key, Some(row.clone()));
237 Ok(())
238 }
239
240 pub fn persist_stateful_update(
241 &self,
242 module: &str,
243 table: &str,
244 pk: &IndexKey,
245 row: &Row,
246 ) -> Result<(), IntersticeError> {
247 self.persist_stateful_insert(module, table, pk, row)
248 }
249
250 pub fn persist_stateful_delete(
252 &self,
253 module: &str,
254 table: &str,
255 pk: &IndexKey,
256 ) -> Result<(), IntersticeError> {
257 if self.modules_root.is_none() {
258 return Ok(());
259 }
260 let key = StatefulRowKey {
261 module: module.to_string(),
262 table: table.to_string(),
263 pk: pk.clone(),
264 };
265 self.dirty_stateful.lock().insert(key, None);
266 Ok(())
267 }
268
269 pub fn persist_stateful_clear(
271 &self,
272 module: &str,
273 table: &str,
274 ) -> Result<(), IntersticeError> {
275 let Some(root) = &self.modules_root else {
276 return Ok(());
277 };
278 self.dirty_stateful.lock().retain(|k, _| !(k.module == module && k.table == table));
280 let dir = Self::stateful_dir(root, module, table);
282 if dir.exists() {
283 fs::remove_dir_all(&dir).map_err(|e| {
284 IntersticeError::Internal(format!("Failed to clear stateful dir: {}", e))
285 })?;
286 fs::create_dir_all(&dir).map_err(|e| {
287 IntersticeError::Internal(format!("Failed to recreate stateful dir: {}", e))
288 })?;
289 }
290 Ok(())
291 }
292
293 pub fn flush_stateful(&self) {
296 let Some(root) = &self.modules_root else {
297 return;
298 };
299 let batch = {
301 let mut guard = self.dirty_stateful.lock();
302 if guard.is_empty() {
303 return;
304 }
305 std::mem::take(&mut *guard)
306 };
307
308 for (key, row_opt) in batch {
309 let dir = Self::stateful_dir(root, &key.module, &key.table);
310 let _ = fs::create_dir_all(&dir);
311 let path = dir.join(format!("{}.row", Self::pk_filename(&key.pk)));
312 match row_opt {
313 Some(row) => {
314 if let Ok(encoded) = encode(&row) {
315 let tmp = path.with_extension("row.tmp");
316 if let Ok(mut f) = File::create(&tmp) {
317 let _ = f.write_all(&encoded);
318 let _ = fs::rename(&tmp, &path);
319 }
320 }
321 }
322 None => {
323 let _ = fs::remove_file(&path);
324 }
325 }
326 }
327 }
328
329 pub fn restore_table(&self, module: &str, table: &mut Table) -> Result<(), IntersticeError> {
330 let Some(root) = &self.modules_root else {
331 return Ok(());
332 };
333
334 if table.schema.persistence == PersistenceKind::Ephemeral {
335 let state =
336 self.get_or_create_state(module, &table.schema.name, PersistenceKind::Ephemeral)?;
337 let mut guard = state.lock();
338 guard.persistence = PersistenceKind::Ephemeral;
339 guard.last_snapshot_seq = 0;
340 guard.next_seq = 0;
341 return Ok(());
342 }
343
344 if table.schema.persistence == PersistenceKind::Stateful {
346 let dir = Self::stateful_dir(root, module, &table.schema.name);
347 if dir.exists() {
348 for entry in fs::read_dir(&dir).map_err(|e| {
349 IntersticeError::Internal(format!("Failed to read stateful dir: {}", e))
350 })? {
351 let entry = entry.map_err(|e| {
352 IntersticeError::Internal(format!("Failed to read dir entry: {}", e))
353 })?;
354 let path = entry.path();
355 if path.extension().and_then(|e| e.to_str()) == Some("row") {
356 let bytes = fs::read(&path).map_err(|e| {
357 IntersticeError::Internal(format!("Failed to read row file: {}", e))
358 })?;
359 let row: Row = decode(&bytes).map_err(|e| {
360 IntersticeError::Internal(format!("Failed to decode row: {}", e))
361 })?;
362 table.insert(row)?;
363 }
364 }
365 }
366 let state = self.get_or_create_state(module, &table.schema.name, PersistenceKind::Stateful)?;
367 let mut guard = state.lock();
368 guard.persistence = PersistenceKind::Stateful;
369 return Ok(());
370 }
371
372 let table_name = table.schema.name.clone();
374 let module_paths = self.ensure_module_dirs(root, module)?;
375 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table_name));
376 let log_path = module_paths.logs.join(format!("{}.log", table_name));
377 let snapshot = Self::read_snapshot_file(&snapshot_path)?;
378
379 table.restore_from_rows(snapshot.rows)?;
380 let mut last_seq = snapshot.last_seq;
381
382 Self::read_log_entries(&log_path, |entry| {
383 if entry.seq > snapshot.last_seq {
384 TableStore::apply_entry(table, &entry.operation)?;
385 last_seq = entry.seq;
386 }
387 Ok(())
388 })?;
389
390 let state =
391 self.get_or_create_state(module, &table_name, table.schema.persistence.clone())?;
392 let mut guard = state.lock();
393 guard.persistence = table.schema.persistence.clone();
394 guard.last_snapshot_seq = last_seq;
395 guard.next_seq = last_seq.saturating_add(1);
396
397 Ok(())
398 }
399
400 pub fn clear_all(&self) -> Result<(), IntersticeError> {
401 let Some(root) = &self.modules_root else {
402 return Ok(());
403 };
404
405 self.wal_writers.lock().clear();
407
408 if root.exists() {
409 for entry in fs::read_dir(root).map_err(|err| {
410 IntersticeError::Internal(format!("Unable to read modules dir: {err}"))
411 })? {
412 if let Ok(entry) = entry {
413 let path = entry.path();
414 if path.is_dir() {
415 let logs = path.join("logs");
416 if logs.exists() {
417 fs::remove_dir_all(&logs).map_err(|err| {
418 IntersticeError::Internal(format!(
419 "Failed to clear logs for {:?}: {}",
420 logs, err
421 ))
422 })?;
423 }
424 fs::create_dir_all(&logs).map_err(|err| {
425 IntersticeError::Internal(format!(
426 "Failed to recreate logs dir {:?}: {}",
427 logs, err
428 ))
429 })?;
430
431 let snapshots = path.join("snapshots");
432 if snapshots.exists() {
433 fs::remove_dir_all(&snapshots).map_err(|err| {
434 IntersticeError::Internal(format!(
435 "Failed to clear snapshots for {:?}: {}",
436 snapshots, err
437 ))
438 })?;
439 }
440 fs::create_dir_all(&snapshots).map_err(|err| {
441 IntersticeError::Internal(format!(
442 "Failed to recreate snapshots dir {:?}: {}",
443 snapshots, err
444 ))
445 })?;
446
447 let stateful = path.join("stateful");
448 if stateful.exists() {
449 fs::remove_dir_all(&stateful).map_err(|err| {
450 IntersticeError::Internal(format!(
451 "Failed to clear stateful for {:?}: {}",
452 stateful, err
453 ))
454 })?;
455 }
456 }
458 }
459 }
460 }
461
462 self.tables.lock().clear();
463 Ok(())
464 }
465
466 pub fn cleanup_module(&self, module: &str) {
467 self.tables.lock().retain(|key, _| key.module != module);
468 if let Some(root) = &self.modules_root {
470 let module_log_dir = root.join(module).join("logs");
471 self.wal_writers
472 .lock()
473
474 .retain(|path, _| !path.starts_with(&module_log_dir));
475 }
476 if let Some(root) = &self.modules_root {
478 let stateful_module_dir = root.join(module).join("stateful");
479 if stateful_module_dir.exists() {
480 let _ = fs::remove_dir_all(&stateful_module_dir);
481 }
482 }
483 }
484
485 fn get_or_create_state(
486 &self,
487 module: &str,
488 table: &str,
489 persistence: PersistenceKind,
490 ) -> Result<Arc<Mutex<TableState>>, IntersticeError> {
491 let mut tables = self.tables.lock();
492 if let Some(state) = tables.get(&TableKey::new(module, table)) {
493 return Ok(state.clone());
494 }
495
496 let state = Arc::new(Mutex::new(TableState::new(persistence)));
497 tables.insert(TableKey::new(module, table), state.clone());
498 Ok(state)
499 }
500
501 fn ensure_module_dirs(
502 &self,
503 root: &Path,
504 module: &str,
505 ) -> Result<ModulePaths, IntersticeError> {
506 let module_dir = root.join(module);
507 let logs = module_dir.join("logs");
508 let snapshots = module_dir.join("snapshots");
509 fs::create_dir_all(&logs).map_err(|err| {
510 IntersticeError::Internal(format!(
511 "Failed to create logs dir for module {}: {}",
512 module, err
513 ))
514 })?;
515 fs::create_dir_all(&snapshots).map_err(|err| {
516 IntersticeError::Internal(format!(
517 "Failed to create snapshots dir for module {}: {}",
518 module, err
519 ))
520 })?;
521 Ok(ModulePaths { logs, snapshots })
522 }
523
524 fn log_path(&self, root: &Path, module: &str, table: &str) -> PathBuf {
525 root.join(module)
526 .join("logs")
527 .join(format!("{}.log", table))
528 }
529
530 fn write_log_entry_async(
533 &self,
534 path: &PathBuf,
535 entry: &TableLogEntry,
536 module: &str,
537 ) -> Result<(), IntersticeError> {
538 let encoded = encode(entry).map_err(|err| {
539 IntersticeError::Internal(format!("Failed to encode log entry: {err}"))
540 })?;
541 let length = (encoded.len() as u32).to_le_bytes();
542
543 {
545 let mut writers = self.wal_writers.lock();
546 if let Some(writer) = writers.get_mut(path) {
547 writer.file.write_all(&length).map_err(|err| {
548 IntersticeError::Internal(format!("Failed to write log length: {err}"))
549 })?;
550 writer.file.write_all(&encoded).map_err(|err| {
551 IntersticeError::Internal(format!("Failed to write log entry: {err}"))
552 })?;
553 writer.dirty = true;
554 return Ok(());
555 }
556 }
557
558 if let Some(root) = &self.modules_root {
560 self.ensure_module_dirs(root, module)?;
561 }
562 let file = OpenOptions::new()
563 .create(true)
564 .append(true)
565 .open(path)
566 .map_err(|err| {
567 IntersticeError::Internal(format!(
568 "Failed to open log file {:?}: {}",
569 path, err
570 ))
571 })?;
572
573 let mut writers = self.wal_writers.lock();
574 let writer = writers
575 .entry(path.clone())
576 .or_insert(WalWriter { file, path: path.clone(), dirty: false });
577 writer.file.write_all(&length).map_err(|err| {
578 IntersticeError::Internal(format!("Failed to write log length: {err}"))
579 })?;
580 writer.file.write_all(&encoded).map_err(|err| {
581 IntersticeError::Internal(format!("Failed to write log entry: {err}"))
582 })?;
583 writer.dirty = true;
584 Ok(())
585 }
586
587 fn read_log_entries<F>(path: &Path, mut visitor: F) -> Result<(), IntersticeError>
588 where
589 F: FnMut(TableLogEntry) -> Result<(), IntersticeError>,
590 {
591 if !path.exists() {
592 return Ok(());
593 }
594
595 let mut file = File::open(path).map_err(|err| {
596 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
597 })?;
598 file.seek(SeekFrom::Start(0))
599 .map_err(|err| IntersticeError::Internal(format!("Failed to seek log file: {err}")))?;
600
601 loop {
602 let mut len_buf = [0u8; 4];
603 if file.read_exact(&mut len_buf).is_err() {
604 break;
605 }
606 let length = u32::from_le_bytes(len_buf) as usize;
607 let mut buffer = vec![0u8; length];
608 file.read_exact(&mut buffer).map_err(|err| {
609 IntersticeError::Internal(format!("Failed to read log entry: {err}"))
610 })?;
611 let entry: TableLogEntry = decode(&buffer).map_err(|err| {
612 IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
613 })?;
614 visitor(entry)?;
615 }
616
617 Ok(())
618 }
619
620 fn write_snapshot_file(path: &Path, seq: u64, rows: &[Row]) -> Result<(), IntersticeError> {
621 let snapshot = TableSnapshot {
622 version: SNAPSHOT_VERSION,
623 last_seq: seq,
624 rows: rows.to_vec(),
625 };
626 let encoded = encode(&snapshot).map_err(|err| {
627 IntersticeError::Internal(format!("Failed to encode snapshot: {err}"))
628 })?;
629 let tmp_path = path.with_extension("snap.tmp");
630 {
631 let mut file = File::create(&tmp_path).map_err(|err| {
632 IntersticeError::Internal(format!(
633 "Failed to create snapshot temp file {:?}: {}",
634 tmp_path, err
635 ))
636 })?;
637 file.write_all(&encoded).map_err(|err| {
638 IntersticeError::Internal(format!("Failed to write snapshot: {err}"))
639 })?;
640 file.sync_all().map_err(|err| {
641 IntersticeError::Internal(format!("Failed to sync snapshot: {err}"))
642 })?;
643 }
644 fs::rename(&tmp_path, path).map_err(|err| {
645 IntersticeError::Internal(format!("Failed to finalize snapshot {:?}: {}", path, err))
646 })?;
647 Ok(())
648 }
649
650 fn read_snapshot_file(path: &Path) -> Result<TableSnapshot, IntersticeError> {
651 if !path.exists() {
652 return Ok(TableSnapshot {
653 version: SNAPSHOT_VERSION,
654 last_seq: 0,
655 rows: Vec::new(),
656 });
657 }
658 let bytes = fs::read(path).map_err(|err| {
659 IntersticeError::Internal(format!("Failed to read snapshot {:?}: {}", path, err))
660 })?;
661 decode(&bytes)
662 .map_err(|err| IntersticeError::Internal(format!("Failed to decode snapshot: {err}")))
663 }
664
665 fn compact_log(path: &Path, keep_after_seq: u64) -> Result<(), IntersticeError> {
666 if !path.exists() {
667 return Ok(());
668 }
669 let tmp_path = path.with_extension("log.tmp");
670 let mut reader = File::open(path).map_err(|err| {
671 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
672 })?;
673 let mut writer = File::create(&tmp_path).map_err(|err| {
674 IntersticeError::Internal(format!("Failed to create temp log: {err}"))
675 })?;
676
677 loop {
678 let mut len_buf = [0u8; 4];
679 if reader.read_exact(&mut len_buf).is_err() {
680 break;
681 }
682 let length = u32::from_le_bytes(len_buf) as usize;
683 let mut buffer = vec![0u8; length];
684 reader.read_exact(&mut buffer).map_err(|err| {
685 IntersticeError::Internal(format!("Failed to read log entry: {err}"))
686 })?;
687 let entry: TableLogEntry = decode(&buffer).map_err(|err| {
688 IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
689 })?;
690 if entry.seq > keep_after_seq {
691 writer.write_all(&len_buf).map_err(|err| {
692 IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
693 })?;
694 writer.write_all(&buffer).map_err(|err| {
695 IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
696 })?;
697 }
698 }
699
700 writer.sync_all().map_err(|err| {
701 IntersticeError::Internal(format!("Failed to sync compacted log: {err}"))
702 })?;
703 fs::rename(&tmp_path, path).map_err(|err| {
704 IntersticeError::Internal(format!("Failed to replace log file: {err}"))
705 })?;
706 Ok(())
707 }
708
709 fn apply_entry(table: &mut Table, op: &LogOperation) -> Result<(), IntersticeError> {
710 match op {
711 LogOperation::Insert { row, .. } => {
712 let row = row.clone().ok_or_else(|| {
713 IntersticeError::Internal("Missing row data for log insert".into())
714 })?;
715 table.insert(row)?;
716 }
717 LogOperation::Update { row, .. } => {
718 let row = row.clone().ok_or_else(|| {
719 IntersticeError::Internal("Missing row data for log update".into())
720 })?;
721 table.update(row)?;
722 }
723 LogOperation::Delete { primary_key } => {
724 let _ = table.delete(primary_key)?;
725 }
726 LogOperation::Clear => {
727 table.clear();
728 }
729 }
730 Ok(())
731 }
732
733 pub fn forget_module(&self, module: &str) {
734 self.cleanup_module(module);
735 }
736}
737
738struct ModulePaths {
739 logs: PathBuf,
740 snapshots: PathBuf,
741}
742
743#[derive(Serialize, Deserialize)]
744struct TableLogEntry {
745 seq: u64,
746 timestamp_ms: u64,
747 operation: LogOperation,
748}
749
750impl TableLogEntry {
751 fn new(seq: u64, operation: LogOperation) -> Self {
752 let timestamp_ms = SystemTime::now()
753 .duration_since(UNIX_EPOCH)
754 .unwrap_or_default()
755 .as_millis() as u64;
756 Self {
757 seq,
758 timestamp_ms,
759 operation,
760 }
761 }
762}
763
764#[derive(Serialize, Deserialize)]
765struct TableSnapshot {
766 version: u16,
767 last_seq: u64,
768 rows: Vec<Row>,
769}