1use std::collections::HashMap;
2use std::fs::{self, File, OpenOptions};
3use std::io::{Read, Seek, SeekFrom, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use interstice_abi::{decode, encode, IndexKey, PersistenceKind, Row};
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 error::IntersticeError,
13 runtime::table::Table,
14};
15
16const SNAPSHOT_VERSION: u16 = 1;
17const SNAPSHOT_INTERVAL: u64 = 256;
18
19#[derive(Clone, Debug)]
20pub struct SnapshotPlan {
21 pub module: String,
22 pub table: String,
23 pub seq: u64,
24}
25
26#[derive(Clone, Debug, Serialize, Deserialize)]
27pub enum LogOperation {
28 Insert {
29 primary_key: IndexKey,
30 row: Option<Row>,
31 },
32 Update {
33 primary_key: IndexKey,
34 row: Option<Row>,
35 },
36 Delete {
37 primary_key: IndexKey,
38 },
39}
40
41#[derive(Clone, Debug, Hash, PartialEq, Eq)]
42struct TableKey {
43 module: String,
44 table: String,
45}
46
47impl TableKey {
48 fn new(module: &str, table: &str) -> Self {
49 Self {
50 module: module.to_string(),
51 table: table.to_string(),
52 }
53 }
54}
55
56#[derive(Debug)]
57struct TableState {
58 persistence: PersistenceKind,
59 next_seq: u64,
60 last_snapshot_seq: u64,
61}
62
63impl TableState {
64 fn new(persistence: PersistenceKind) -> Self {
65 Self {
66 persistence,
67 next_seq: 0,
68 last_snapshot_seq: 0,
69 }
70 }
71}
72
73pub struct TableStore {
74 modules_root: Option<PathBuf>,
75 tables: Mutex<HashMap<TableKey, Arc<Mutex<TableState>>>>,
76}
77
78impl TableStore {
79 pub fn new(root: Option<PathBuf>) -> Self {
80 Self {
81 modules_root: root,
82 tables: Mutex::new(HashMap::new()),
83 }
84 }
85
86 pub fn in_memory() -> Self {
87 Self::new(None)
88 }
89
90 pub fn record_logged_operation(
91 &self,
92 module: &str,
93 table: &str,
94 operation: LogOperation,
95 ) -> Result<Option<SnapshotPlan>, IntersticeError> {
96 let Some(root) = &self.modules_root else {
97 return Ok(None);
98 };
99 let state = self.get_or_create_state(module, table, PersistenceKind::Logged)?;
100 let log_path = self.log_path(root, module, table);
101 let mut guard = state.lock().unwrap();
102 guard.persistence = PersistenceKind::Logged;
103 let seq = guard.next_seq;
104 guard.next_seq += 1;
105
106 let entry = TableLogEntry::new(seq, operation);
107 Self::append_log_entry(&log_path, &entry)?;
108
109 let needs_snapshot = seq.saturating_sub(guard.last_snapshot_seq) >= SNAPSHOT_INTERVAL;
110 if needs_snapshot {
111 Ok(Some(SnapshotPlan {
112 module: module.to_string(),
113 table: table.to_string(),
114 seq,
115 }))
116 } else {
117 Ok(None)
118 }
119 }
120
121 pub fn snapshot_logged_table(
122 &self,
123 plan: SnapshotPlan,
124 rows: Vec<Row>,
125 ) -> Result<(), IntersticeError> {
126 let Some(root) = &self.modules_root else {
127 return Ok(());
128 };
129
130 let state = self.get_or_create_state(&plan.module, &plan.table, PersistenceKind::Logged)?;
131 let module_paths = self.ensure_module_dirs(root, &plan.module)?;
132 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", plan.table));
133 let log_path = module_paths.logs.join(format!("{}.log", plan.table));
134
135 {
136 let mut guard = state.lock().unwrap();
137 Self::write_snapshot_file(&snapshot_path, plan.seq, &rows)?;
138 Self::compact_log(&log_path, plan.seq)?;
139 guard.last_snapshot_seq = plan.seq;
140 }
141
142 Ok(())
143 }
144
145 pub fn persist_stateful_operation(
146 &self,
147 module: &str,
148 table: &str,
149 operation: LogOperation,
150 rows: Vec<Row>,
151 ) -> Result<(), IntersticeError> {
152 let Some(root) = &self.modules_root else {
153 return Ok(());
154 };
155
156 let state = self.get_or_create_state(module, table, PersistenceKind::Stateful)?;
157 let module_paths = self.ensure_module_dirs(root, module)?;
158 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table));
159 let log_path = module_paths.logs.join(format!("{}.log", table));
160
161 let mut guard = state.lock().unwrap();
162 let seq = guard.next_seq;
163 guard.next_seq += 1;
164
165 Self::write_snapshot_file(&snapshot_path, seq, &rows)?;
166 Self::append_log_entry(&log_path, &TableLogEntry::new(seq, operation))?;
167 guard.last_snapshot_seq = seq;
168 Ok(())
169 }
170
171 pub fn restore_table(
172 &self,
173 module: &str,
174 table: &mut Table,
175 ) -> Result<(), IntersticeError> {
176 let Some(root) = &self.modules_root else {
177 return Ok(());
178 };
179
180 let table_name = table.schema.name.clone();
181 let module_paths = self.ensure_module_dirs(root, module)?;
182 let snapshot_path = module_paths.snapshots.join(format!("{}.snap", table_name));
183 let log_path = module_paths.logs.join(format!("{}.log", table_name));
184 let snapshot = Self::read_snapshot_file(&snapshot_path)?;
185
186 table.restore_from_rows(snapshot.rows)?;
187 let mut last_seq = snapshot.last_seq;
188
189 if table.schema.persistence != PersistenceKind::Stateful {
190 Self::read_log_entries(&log_path, |entry| {
191 if entry.seq > snapshot.last_seq {
192 TableStore::apply_entry(table, &entry.operation)?;
193 last_seq = entry.seq;
194 }
195 Ok(())
196 })?;
197 }
198
199 let state = self.get_or_create_state(module, &table_name, table.schema.persistence.clone())?;
200 let mut guard = state.lock().unwrap();
201 guard.persistence = table.schema.persistence.clone();
202 guard.last_snapshot_seq = last_seq;
203 guard.next_seq = last_seq.saturating_add(1);
204
205 Ok(())
206 }
207
208 pub fn clear_all(&self) -> Result<(), IntersticeError> {
209 let Some(root) = &self.modules_root else {
210 return Ok(());
211 };
212
213 if root.exists() {
214 for entry in fs::read_dir(root).map_err(|err| {
215 IntersticeError::Internal(format!("Unable to read modules dir: {err}"))
216 })? {
217 if let Ok(entry) = entry {
218 let path = entry.path();
219 if path.is_dir() {
220 let logs = path.join("logs");
221 if logs.exists() {
222 fs::remove_dir_all(&logs).map_err(|err| {
223 IntersticeError::Internal(format!(
224 "Failed to clear logs for {:?}: {}",
225 logs, err
226 ))
227 })?;
228 }
229 fs::create_dir_all(&logs).map_err(|err| {
230 IntersticeError::Internal(format!(
231 "Failed to recreate logs dir {:?}: {}",
232 logs, err
233 ))
234 })?;
235
236 let snapshots = path.join("snapshots");
237 if snapshots.exists() {
238 fs::remove_dir_all(&snapshots).map_err(|err| {
239 IntersticeError::Internal(format!(
240 "Failed to clear snapshots for {:?}: {}",
241 snapshots, err
242 ))
243 })?;
244 }
245 fs::create_dir_all(&snapshots).map_err(|err| {
246 IntersticeError::Internal(format!(
247 "Failed to recreate snapshots dir {:?}: {}",
248 snapshots, err
249 ))
250 })?;
251 }
252 }
253 }
254 }
255
256 self.tables.lock().unwrap().clear();
257 Ok(())
258 }
259
260 pub fn cleanup_module(&self, module: &str) {
261 let mut tables = self.tables.lock().unwrap();
262 tables.retain(|key, _| key.module != module);
263 }
264
265 fn get_or_create_state(
266 &self,
267 module: &str,
268 table: &str,
269 persistence: PersistenceKind,
270 ) -> Result<Arc<Mutex<TableState>>, IntersticeError> {
271 let mut tables = self.tables.lock().unwrap();
272 if let Some(state) = tables.get(&TableKey::new(module, table)) {
273 return Ok(state.clone());
274 }
275
276 let state = Arc::new(Mutex::new(TableState::new(persistence)));
277 tables.insert(TableKey::new(module, table), state.clone());
278 Ok(state)
279 }
280
281 fn ensure_module_dirs(
282 &self,
283 root: &Path,
284 module: &str,
285 ) -> Result<ModulePaths, IntersticeError> {
286 let module_dir = root.join(module);
287 let logs = module_dir.join("logs");
288 let snapshots = module_dir.join("snapshots");
289 fs::create_dir_all(&logs).map_err(|err| {
290 IntersticeError::Internal(format!(
291 "Failed to create logs dir for module {}: {}",
292 module, err
293 ))
294 })?;
295 fs::create_dir_all(&snapshots).map_err(|err| {
296 IntersticeError::Internal(format!(
297 "Failed to create snapshots dir for module {}: {}",
298 module, err
299 ))
300 })?;
301 Ok(ModulePaths { logs, snapshots })
302 }
303
304 fn log_path(&self, root: &Path, module: &str, table: &str) -> PathBuf {
305 root.join(module).join("logs").join(format!("{}.log", table))
306 }
307
308 fn append_log_entry(path: &Path, entry: &TableLogEntry) -> Result<(), IntersticeError> {
309 let encoded = encode(entry).map_err(|err| {
310 IntersticeError::Internal(format!("Failed to encode log entry: {err}"))
311 })?;
312 let mut file = OpenOptions::new()
313 .create(true)
314 .append(true)
315 .open(path)
316 .map_err(|err| {
317 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
318 })?;
319
320 let length = (encoded.len() as u32).to_le_bytes();
321 file.write_all(&length).map_err(|err| {
322 IntersticeError::Internal(format!("Failed to write log length: {err}"))
323 })?;
324 file.write_all(&encoded).map_err(|err| {
325 IntersticeError::Internal(format!("Failed to write log entry: {err}"))
326 })?;
327 file.sync_data().map_err(|err| {
328 IntersticeError::Internal(format!("Failed to sync log file: {err}"))
329 })?;
330 Ok(())
331 }
332
333 fn read_log_entries<F>(
334 path: &Path,
335 mut visitor: F,
336 ) -> Result<(), IntersticeError>
337 where
338 F: FnMut(TableLogEntry) -> Result<(), IntersticeError>,
339 {
340 if !path.exists() {
341 return Ok(());
342 }
343
344 let mut file = File::open(path).map_err(|err| {
345 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
346 })?;
347 file.seek(SeekFrom::Start(0)).map_err(|err| {
348 IntersticeError::Internal(format!("Failed to seek log file: {err}"))
349 })?;
350
351 loop {
352 let mut len_buf = [0u8; 4];
353 if file.read_exact(&mut len_buf).is_err() {
354 break;
355 }
356 let length = u32::from_le_bytes(len_buf) as usize;
357 let mut buffer = vec![0u8; length];
358 file.read_exact(&mut buffer).map_err(|err| {
359 IntersticeError::Internal(format!("Failed to read log entry: {err}"))
360 })?;
361 let entry: TableLogEntry = decode(&buffer).map_err(|err| {
362 IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
363 })?;
364 visitor(entry)?;
365 }
366
367 Ok(())
368 }
369
370 fn write_snapshot_file(
371 path: &Path,
372 seq: u64,
373 rows: &[Row],
374 ) -> Result<(), IntersticeError> {
375 let snapshot = TableSnapshot {
376 version: SNAPSHOT_VERSION,
377 last_seq: seq,
378 rows: rows.to_vec(),
379 };
380 let encoded = encode(&snapshot).map_err(|err| {
381 IntersticeError::Internal(format!("Failed to encode snapshot: {err}"))
382 })?;
383 let tmp_path = path.with_extension("snap.tmp");
384 {
385 let mut file = File::create(&tmp_path).map_err(|err| {
386 IntersticeError::Internal(format!(
387 "Failed to create snapshot temp file {:?}: {}",
388 tmp_path, err
389 ))
390 })?;
391 file.write_all(&encoded).map_err(|err| {
392 IntersticeError::Internal(format!("Failed to write snapshot: {err}"))
393 })?;
394 file.sync_all().map_err(|err| {
395 IntersticeError::Internal(format!("Failed to sync snapshot: {err}"))
396 })?;
397 }
398 fs::rename(&tmp_path, path).map_err(|err| {
399 IntersticeError::Internal(format!(
400 "Failed to finalize snapshot {:?}: {}",
401 path, err
402 ))
403 })?;
404 Ok(())
405 }
406
407 fn read_snapshot_file(path: &Path) -> Result<TableSnapshot, IntersticeError> {
408 if !path.exists() {
409 return Ok(TableSnapshot {
410 version: SNAPSHOT_VERSION,
411 last_seq: 0,
412 rows: Vec::new(),
413 });
414 }
415 let bytes = fs::read(path).map_err(|err| {
416 IntersticeError::Internal(format!("Failed to read snapshot {:?}: {}", path, err))
417 })?;
418 decode(&bytes).map_err(|err| {
419 IntersticeError::Internal(format!("Failed to decode snapshot: {err}"))
420 })
421 }
422
423 fn compact_log(path: &Path, keep_after_seq: u64) -> Result<(), IntersticeError> {
424 if !path.exists() {
425 return Ok(());
426 }
427 let tmp_path = path.with_extension("log.tmp");
428 let mut reader = File::open(path).map_err(|err| {
429 IntersticeError::Internal(format!("Failed to open log file {:?}: {}", path, err))
430 })?;
431 let mut writer = File::create(&tmp_path).map_err(|err| {
432 IntersticeError::Internal(format!("Failed to create temp log: {err}"))
433 })?;
434
435 loop {
436 let mut len_buf = [0u8; 4];
437 if reader.read_exact(&mut len_buf).is_err() {
438 break;
439 }
440 let length = u32::from_le_bytes(len_buf) as usize;
441 let mut buffer = vec![0u8; length];
442 reader.read_exact(&mut buffer).map_err(|err| {
443 IntersticeError::Internal(format!("Failed to read log entry: {err}"))
444 })?;
445 let entry: TableLogEntry = decode(&buffer).map_err(|err| {
446 IntersticeError::Internal(format!("Failed to decode log entry: {err}"))
447 })?;
448 if entry.seq > keep_after_seq {
449 writer.write_all(&len_buf).map_err(|err| {
450 IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
451 })?;
452 writer.write_all(&buffer).map_err(|err| {
453 IntersticeError::Internal(format!("Failed to write compacted log: {err}"))
454 })?;
455 }
456 }
457
458 writer.sync_all().map_err(|err| {
459 IntersticeError::Internal(format!("Failed to sync compacted log: {err}"))
460 })?;
461 fs::rename(&tmp_path, path).map_err(|err| {
462 IntersticeError::Internal(format!("Failed to replace log file: {err}"))
463 })?;
464 Ok(())
465 }
466
467 fn apply_entry(table: &mut Table, op: &LogOperation) -> Result<(), IntersticeError> {
468 match op {
469 LogOperation::Insert { row, .. } => {
470 let row = row.clone().ok_or_else(|| {
471 IntersticeError::Internal("Missing row data for log insert".into())
472 })?;
473 table.insert(row)?;
474 }
475 LogOperation::Update { row, .. } => {
476 let row = row.clone().ok_or_else(|| {
477 IntersticeError::Internal("Missing row data for log update".into())
478 })?;
479 table.update(row)?;
480 }
481 LogOperation::Delete { primary_key } => {
482 let _ = table.delete(primary_key)?;
483 }
484 }
485 Ok(())
486 }
487
488 pub fn forget_module(&self, module: &str) {
489 self.cleanup_module(module);
490 }
491}
492
493struct ModulePaths {
494 logs: PathBuf,
495 snapshots: PathBuf,
496}
497
498#[derive(Serialize, Deserialize)]
499struct TableLogEntry {
500 seq: u64,
501 timestamp_ms: u64,
502 operation: LogOperation,
503}
504
505impl TableLogEntry {
506 fn new(seq: u64, operation: LogOperation) -> Self {
507 let timestamp_ms = SystemTime::now()
508 .duration_since(UNIX_EPOCH)
509 .unwrap_or_default()
510 .as_millis() as u64;
511 Self {
512 seq,
513 timestamp_ms,
514 operation,
515 }
516 }
517}
518
519#[derive(Serialize, Deserialize)]
520struct TableSnapshot {
521 version: u16,
522 last_seq: u64,
523 rows: Vec<Row>,
524}