1use std::collections::HashMap;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use interstice_abi::{IndexKey, PersistenceKind, Row, decode, encode};
9use serde::{Deserialize, Serialize};
10
11use crate::{error::IntersticeError, runtime::table::Table};
12
13const SNAPSHOT_VERSION: u16 = 1;
14const SNAPSHOT_INTERVAL: u64 = 256;
15
16#[derive(Clone, Debug)]
17pub struct SnapshotPlan {
18 pub module: String,
19 pub table: String,
20 pub seq: u64,
21}
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
24pub enum LogOperation {
25 Insert {
26 primary_key: IndexKey,
27 row: Option<Row>,
28 },
29 Update {
30 primary_key: IndexKey,
31 row: Option<Row>,
32 },
33 Delete {
34 primary_key: IndexKey,
35 },
36 Clear,
37}
38
39#[derive(Clone, Debug, Hash, PartialEq, Eq)]
40struct TableKey {
41 module: String,
42 table: String,
43}
44
45impl TableKey {
46 fn new(module: &str, table: &str) -> Self {
47 Self {
48 module: module.to_string(),
49 table: table.to_string(),
50 }
51 }
52}
53
54#[derive(Debug)]
55struct TableState {
56 persistence: PersistenceKind,
57 next_seq: u64,
58 last_snapshot_seq: u64,
59}
60
61impl TableState {
62 fn new(persistence: PersistenceKind) -> Self {
63 Self {
64 persistence,
65 next_seq: 0,
66 last_snapshot_seq: 0,
67 }
68 }
69}
70
71pub struct TableStore {
72 modules_root: Option<PathBuf>,
73 tables: Mutex<HashMap<TableKey, Arc<Mutex<TableState>>>>,
74}
75
76impl TableStore {
77 pub fn new(root: Option<PathBuf>) -> Self {
78 Self {
79 modules_root: root,
80 tables: Mutex::new(HashMap::new()),
81 }
82 }
83
84 pub fn in_memory() -> Self {
85 Self::new(None)
86 }
87
88 pub fn record_logged_operation(
89 &self,
90 module: &str,
91 table: &str,
92 operation: LogOperation,
93 ) -> Result<Option<SnapshotPlan>, IntersticeError> {
94 let Some(root) = &self.modules_root else {
95 return Ok(None);
96 };
97 let state = self.get_or_create_state(module, table, PersistenceKind::Logged)?;
98 let log_path = self.log_path(root, module, table);
99 let mut guard = state.lock().unwrap();
100 guard.persistence = PersistenceKind::Logged;
101 let seq = guard.next_seq;
102 guard.next_seq += 1;
103
104 let entry = TableLogEntry::new(seq, operation);
105 Self::append_log_entry(&log_path, &entry)?;
106
107 let needs_snapshot = seq.saturating_sub(guard.last_snapshot_seq) >= SNAPSHOT_INTERVAL;
108 if needs_snapshot {
109 Ok(Some(SnapshotPlan {
110 module: module.to_string(),
111 table: table.to_string(),
112 seq,
113 }))
114 } else {
115 Ok(None)
116 }
117 }
118
119 pub fn snapshot_logged_table(
120 &self,
121 plan: SnapshotPlan,
122 rows: Vec<Row>,
123 ) -> Result<(), IntersticeError> {
124 let Some(root) = &self.modules_root else {
125 return Ok(());
126 };
127
128 let state = self.get_or_create_state(&plan.module, &plan.table, PersistenceKind::Logged)?;
129 let module_paths = self.ensure_module_dirs(root, &plan.module)?;
130 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", plan.table));
131 let log_path = module_paths.logs.join(format!("{}.log", plan.table));
132
133 {
134 let mut guard = state.lock().unwrap();
135 Self::write_snapshot_file(&snapshot_path, plan.seq, &rows)?;
136 Self::compact_log(&log_path, plan.seq)?;
137 guard.last_snapshot_seq = plan.seq;
138 }
139
140 Ok(())
141 }
142
143 pub fn persist_stateful_operation(
144 &self,
145 module: &str,
146 table: &str,
147 operation: LogOperation,
148 rows: Vec<Row>,
149 ) -> Result<(), IntersticeError> {
150 let Some(root) = &self.modules_root else {
151 return Ok(());
152 };
153
154 let state = self.get_or_create_state(module, table, PersistenceKind::Stateful)?;
155 let module_paths = self.ensure_module_dirs(root, module)?;
156 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table));
157 let log_path = module_paths.logs.join(format!("{}.log", table));
158
159 let mut guard = state.lock().unwrap();
160 let seq = guard.next_seq;
161 guard.next_seq += 1;
162
163 Self::write_snapshot_file(&snapshot_path, seq, &rows)?;
164 Self::append_log_entry(&log_path, &TableLogEntry::new(seq, operation))?;
165 guard.last_snapshot_seq = seq;
166 Ok(())
167 }
168
169 pub fn restore_table(&self, module: &str, table: &mut Table) -> Result<(), IntersticeError> {
170 let Some(root) = &self.modules_root else {
171 return Ok(());
172 };
173
174 if table.schema.persistence == PersistenceKind::Ephemeral {
175 let state =
176 self.get_or_create_state(module, &table.schema.name, PersistenceKind::Ephemeral)?;
177 let mut guard = state.lock().unwrap();
178 guard.persistence = PersistenceKind::Ephemeral;
179 guard.last_snapshot_seq = 0;
180 guard.next_seq = 0;
181 return Ok(());
182 }
183
184 let table_name = table.schema.name.clone();
185 let module_paths = self.ensure_module_dirs(root, module)?;
186 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table_name));
187 let log_path = module_paths.logs.join(format!("{}.log", table_name));
188 let snapshot = Self::read_snapshot_file(&snapshot_path)?;
189
190 table.restore_from_rows(snapshot.rows)?;
191 let mut last_seq = snapshot.last_seq;
192
193 if table.schema.persistence != PersistenceKind::Stateful {
194 Self::read_log_entries(&log_path, |entry| {
195 if entry.seq > snapshot.last_seq {
196 TableStore::apply_entry(table, &entry.operation)?;
197 last_seq = entry.seq;
198 }
199 Ok(())
200 })?;
201 }
202
203 let state =
204 self.get_or_create_state(module, &table_name, table.schema.persistence.clone())?;
205 let mut guard = state.lock().unwrap();
206 guard.persistence = table.schema.persistence.clone();
207 guard.last_snapshot_seq = last_seq;
208 guard.next_seq = last_seq.saturating_add(1);
209
210 Ok(())
211 }
212
213 pub fn clear_all(&self) -> Result<(), IntersticeError> {
214 let Some(root) = &self.modules_root else {
215 return Ok(());
216 };
217
218 if root.exists() {
219 for entry in fs::read_dir(root).map_err(|err| {
220 IntersticeError::Internal(format!("Unable to read modules dir: {err}"))
221 })? {
222 if let Ok(entry) = entry {
223 let path = entry.path();
224 if path.is_dir() {
225 let logs = path.join("logs");
226 if logs.exists() {
227 fs::remove_dir_all(&logs).map_err(|err| {
228 IntersticeError::Internal(format!(
229 "Failed to clear logs for {:?}: {}",
230 logs, err
231 ))
232 })?;
233 }
234 fs::create_dir_all(&logs).map_err(|err| {
235 IntersticeError::Internal(format!(
236 "Failed to recreate logs dir {:?}: {}",
237 logs, err
238 ))
239 })?;
240
241 let snapshots = path.join("snapshots");
242 if snapshots.exists() {
243 fs::remove_dir_all(&snapshots).map_err(|err| {
244 IntersticeError::Internal(format!(
245 "Failed to clear snapshots for {:?}: {}",
246 snapshots, err
247 ))
248 })?;
249 }
250 fs::create_dir_all(&snapshots).map_err(|err| {
251 IntersticeError::Internal(format!(
252 "Failed to recreate snapshots dir {:?}: {}",
253 snapshots, err
254 ))
255 })?;
256 }
257 }
258 }
259 }
260
261 self.tables.lock().unwrap().clear();
262 Ok(())
263 }
264
265 pub fn cleanup_module(&self, module: &str) {
266 let mut tables = self.tables.lock().unwrap();
267 tables.retain(|key, _| key.module != module);
268 }
269
270 fn get_or_create_state(
271 &self,
272 module: &str,
273 table: &str,
274 persistence: PersistenceKind,
275 ) -> Result<Arc<Mutex<TableState>>, IntersticeError> {
276 let mut tables = self.tables.lock().unwrap();
277 if let Some(state) = tables.get(&TableKey::new(module, table)) {
278 return Ok(state.clone());
279 }
280
281 let state = Arc::new(Mutex::new(TableState::new(persistence)));
282 tables.insert(TableKey::new(module, table), state.clone());
283 Ok(state)
284 }
285
286 fn ensure_module_dirs(
287 &self,
288 root: &Path,
289 module: &str,
290 ) -> Result<ModulePaths, IntersticeError> {
291 let module_dir = root.join(module);
292 let logs = module_dir.join("logs");
293 let snapshots = module_dir.join("snapshots");
294 fs::create_dir_all(&logs).map_err(|err| {
295 IntersticeError::Internal(format!(
296 "Failed to create logs dir for module {}: {}",
297 module, err
298 ))
299 })?;
300 fs::create_dir_all(&snapshots).map_err(|err| {
301 IntersticeError::Internal(format!(
302 "Failed to create snapshots dir for module {}: {}",
303 module, err
304 ))
305 })?;
306 Ok(ModulePaths { logs, snapshots })
307 }
308
309 fn log_path(&self, root: &Path, module: &str, table: &str) -> PathBuf {
310 root.join(module)
311 .join("logs")
312 .join(format!("{}.log", table))
313 }
314
315 fn append_log_entry(path: &Path, entry: &TableLogEntry) -> Result<(), IntersticeError> {
316 let encoded = encode(entry).map_err(|err| {
317 IntersticeError::Internal(format!("Failed to encode log entry: {err}"))
318 })?;
319 let mut file = OpenOptions::new()
320 .create(true)
321 .append(true)
322 .open(path)
323 .map_err(|err| {
324 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
325 })?;
326
327 let length = (encoded.len() as u32).to_le_bytes();
328 file.write_all(&length).map_err(|err| {
329 IntersticeError::Internal(format!("Failed to write log length: {err}"))
330 })?;
331 file.write_all(&encoded).map_err(|err| {
332 IntersticeError::Internal(format!("Failed to write log entry: {err}"))
333 })?;
334 file.sync_data()
335 .map_err(|err| IntersticeError::Internal(format!("Failed to sync log file: {err}")))?;
336 Ok(())
337 }
338
339 fn read_log_entries<F>(path: &Path, mut visitor: F) -> Result<(), IntersticeError>
340 where
341 F: FnMut(TableLogEntry) -> Result<(), IntersticeError>,
342 {
343 if !path.exists() {
344 return Ok(());
345 }
346
347 let mut file = File::open(path).map_err(|err| {
348 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
349 })?;
350 file.seek(SeekFrom::Start(0))
351 .map_err(|err| IntersticeError::Internal(format!("Failed to seek log file: {err}")))?;
352
353 loop {
354 let mut len_buf = [0u8; 4];
355 if file.read_exact(&mut len_buf).is_err() {
356 break;
357 }
358 let length = u32::from_le_bytes(len_buf) as usize;
359 let mut buffer = vec![0u8; length];
360 file.read_exact(&mut buffer).map_err(|err| {
361 IntersticeError::Internal(format!("Failed to read log entry: {err}"))
362 })?;
363 let entry: TableLogEntry = decode(&buffer).map_err(|err| {
364 IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
365 })?;
366 visitor(entry)?;
367 }
368
369 Ok(())
370 }
371
372 fn write_snapshot_file(path: &Path, seq: u64, rows: &[Row]) -> Result<(), IntersticeError> {
373 let snapshot = TableSnapshot {
374 version: SNAPSHOT_VERSION,
375 last_seq: seq,
376 rows: rows.to_vec(),
377 };
378 let encoded = encode(&snapshot).map_err(|err| {
379 IntersticeError::Internal(format!("Failed to encode snapshot: {err}"))
380 })?;
381 let tmp_path = path.with_extension("snap.tmp");
382 {
383 let mut file = File::create(&tmp_path).map_err(|err| {
384 IntersticeError::Internal(format!(
385 "Failed to create snapshot temp file {:?}: {}",
386 tmp_path, err
387 ))
388 })?;
389 file.write_all(&encoded).map_err(|err| {
390 IntersticeError::Internal(format!("Failed to write snapshot: {err}"))
391 })?;
392 file.sync_all().map_err(|err| {
393 IntersticeError::Internal(format!("Failed to sync snapshot: {err}"))
394 })?;
395 }
396 fs::rename(&tmp_path, path).map_err(|err| {
397 IntersticeError::Internal(format!("Failed to finalize snapshot {:?}: {}", path, err))
398 })?;
399 Ok(())
400 }
401
402 fn read_snapshot_file(path: &Path) -> Result<TableSnapshot, IntersticeError> {
403 if !path.exists() {
404 return Ok(TableSnapshot {
405 version: SNAPSHOT_VERSION,
406 last_seq: 0,
407 rows: Vec::new(),
408 });
409 }
410 let bytes = fs::read(path).map_err(|err| {
411 IntersticeError::Internal(format!("Failed to read snapshot {:?}: {}", path, err))
412 })?;
413 decode(&bytes)
414 .map_err(|err| IntersticeError::Internal(format!("Failed to decode snapshot: {err}")))
415 }
416
417 fn compact_log(path: &Path, keep_after_seq: u64) -> Result<(), IntersticeError> {
418 if !path.exists() {
419 return Ok(());
420 }
421 let tmp_path = path.with_extension("log.tmp");
422 let mut reader = File::open(path).map_err(|err| {
423 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
424 })?;
425 let mut writer = File::create(&tmp_path).map_err(|err| {
426 IntersticeError::Internal(format!("Failed to create temp log: {err}"))
427 })?;
428
429 loop {
430 let mut len_buf = [0u8; 4];
431 if reader.read_exact(&mut len_buf).is_err() {
432 break;
433 }
434 let length = u32::from_le_bytes(len_buf) as usize;
435 let mut buffer = vec![0u8; length];
436 reader.read_exact(&mut buffer).map_err(|err| {
437 IntersticeError::Internal(format!("Failed to read log entry: {err}"))
438 })?;
439 let entry: TableLogEntry = decode(&buffer).map_err(|err| {
440 IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
441 })?;
442 if entry.seq > keep_after_seq {
443 writer.write_all(&len_buf).map_err(|err| {
444 IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
445 })?;
446 writer.write_all(&buffer).map_err(|err| {
447 IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
448 })?;
449 }
450 }
451
452 writer.sync_all().map_err(|err| {
453 IntersticeError::Internal(format!("Failed to sync compacted log: {err}"))
454 })?;
455 fs::rename(&tmp_path, path).map_err(|err| {
456 IntersticeError::Internal(format!("Failed to replace log file: {err}"))
457 })?;
458 Ok(())
459 }
460
461 fn apply_entry(table: &mut Table, op: &LogOperation) -> Result<(), IntersticeError> {
462 match op {
463 LogOperation::Insert { row, .. } => {
464 let row = row.clone().ok_or_else(|| {
465 IntersticeError::Internal("Missing row data for log insert".into())
466 })?;
467 table.insert(row)?;
468 }
469 LogOperation::Update { row, .. } => {
470 let row = row.clone().ok_or_else(|| {
471 IntersticeError::Internal("Missing row data for log update".into())
472 })?;
473 table.update(row)?;
474 }
475 LogOperation::Delete { primary_key } => {
476 let _ = table.delete(primary_key)?;
477 }
478 LogOperation::Clear => {
479 table.clear();
480 }
481 }
482 Ok(())
483 }
484
485 pub fn forget_module(&self, module: &str) {
486 self.cleanup_module(module);
487 }
488}
489
490struct ModulePaths {
491 logs: PathBuf,
492 snapshots: PathBuf,
493}
494
495#[derive(Serialize, Deserialize)]
496struct TableLogEntry {
497 seq: u64,
498 timestamp_ms: u64,
499 operation: LogOperation,
500}
501
502impl TableLogEntry {
503 fn new(seq: u64, operation: LogOperation) -> Self {
504 let timestamp_ms = SystemTime::now()
505 .duration_since(UNIX_EPOCH)
506 .unwrap_or_default()
507 .as_millis() as u64;
508 Self {
509 seq,
510 timestamp_ms,
511 operation,
512 }
513 }
514}
515
516#[derive(Serialize, Deserialize)]
517struct TableSnapshot {
518 version: u16,
519 last_seq: u64,
520 rows: Vec<Row>,
521}