Skip to main content

embeddenator_fs/fs/versioned/
journal.rs

1//! Hybrid Journaling System for Durable Writes
2//! ============================================
3//!
4//! This module provides a combination of simple fsync barriers for atomic single-file
5//! operations and a lightweight Write-Ahead Log (WAL) for complex multi-file transactions.
6//!
7//! # Why a Journal?
8//!
9//! Filesystems need journals to survive crashes. Without one, a power failure during
10//! a write operation can leave the filesystem in an inconsistent state:
11//!
12//! - **Torn writes**: Half-written data appears as corruption
13//! - **Lost updates**: Data in memory never reaches disk
14//! - **Dangling references**: Metadata points to freed blocks
15//! - **Orphaned data**: Blocks allocated but not referenced
16//!
17//! The journal ensures that either:
18//! 1. An operation completes fully and is recoverable, OR
19//! 2. An operation is completely rolled back (never partially applied)
20//!
21//! This is the "A" in ACID: Atomicity.
22//!
23//! # Why Hybrid (fsync + WAL)?
24//!
25//! A pure WAL approach (like SQLite or PostgreSQL) adds latency to every write.
26//! A pure fsync approach can't handle multi-file transactions atomically.
27//!
28//! We use a **hybrid** approach optimized for the engram use case:
29//!
30//! | Operation Type | Strategy | Why |
31//! |----------------|----------|-----|
32//! | Single file < 64KB | fsync barrier | Simple, fast, no journal overhead |
33//! | Single file ≥ 64KB | WAL | Allows streaming writes + recovery |
34//! | Multi-file transaction | WAL | Atomic commit of all changes |
35//! | Metadata-only change | WAL | Small payload, fast |
36//!
37//! This hybrid approach gives us:
38//! - **Fast common case**: Single small files use simple fsync (~1ms)
39//! - **Safe complex case**: Multi-file ops get full WAL protection
40//! - **Bounded recovery time**: Journal is checkpointed regularly
41//!
42//! # Why These Durability Modes?
43//!
44//! Different users have different needs:
45//!
46//! ## Immediate Mode
47//! - **Use when**: Data loss is unacceptable (financial, medical)
48//! - **Cost**: ~5-10ms per write (SSD), ~15-30ms (HDD)
49//! - **Guarantee**: Committed = on persistent storage
50//!
51//! ## GroupCommit Mode (Default)
52//! - **Use when**: Balance of safety and performance
53//! - **Cost**: ~5ms batch latency, amortized across many writes
54//! - **Guarantee**: Up to 5ms of data loss on crash
55//! - **Why default**: Most users want good performance with reasonable safety
56//!
57//! ## Relaxed Mode  
58//! - **Use when**: Performance critical, data is regenerable
59//! - **Cost**: Near-zero latency
60//! - **Guarantee**: OS decides when to flush (could be 30+ seconds)
61//! - **Risk**: Significant data loss on crash
62//!
63//! # Journal Record Format
64//!
65//! Why this specific format:
66//!
67//! ```text
68//! ┌────────────────────────────────────────────────────────────┐
69//! │ Record Header (32 bytes)                                   │
70//! ├────────────────────────────────────────────────────────────┤
71//! │ magic: u32          = 0x454D4252 ("EMBR")                  │
72//! │   └─ Why: Detect corruption/wrong file immediately        │
73//! │ version: u16        = 1                                    │
74//! │   └─ Why: Allow format evolution without breaking old files│
75//! │ flags: u16          = (committed | checkpointed | ...)     │
76//! │   └─ Why: Track transaction state for recovery            │
77//! │ txn_id: u64         = transaction ID                       │
78//! │   └─ Why: Ordering for replay, debugging                  │
79//! │ timestamp: u64      = Unix timestamp (nanos)               │
80//! │   └─ Why: Debugging, determining recovery point           │
81//! │ payload_len: u32    = length of payload                    │
82//! │   └─ Why: Know how much to read without parsing payload   │
83//! │ checksum: u32       = CRC32 of header + payload            │
84//! │   └─ Why: Detect corruption before acting on bad data     │
85//! ├────────────────────────────────────────────────────────────┤
86//! │ Payload (variable)                                         │
87//! │ - Serialized operations                                    │
88//! │   └─ Why: Variable length allows efficient small writes   │
89//! └────────────────────────────────────────────────────────────┘
90//! ```
91//!
92//! # Why 32-byte Header?
93//!
94//! - **Alignment**: 32 bytes aligns to cache lines on most CPUs
95//! - **Atomic read**: Can read header in single I/O operation
96//! - **Space efficient**: Minimal overhead per record
97//! - **Room for growth**: Version field allows adding fields later
98//!
99//! # Recovery Process
100//!
101//! On mount after crash:
102//!
103//! 1. **Scan journal** from start to end
104//! 2. **Verify checksums** - skip corrupted records
105//! 3. **Find committed transactions** - ignore pending/aborted
106//! 4. **Replay in order** - apply operations to engram
107//! 5. **Verify engram** - check manifest and chunk integrity
108//! 6. **Checkpoint journal** - truncate replayed records
109//!
110//! Recovery is O(journal_size), typically <1 second for <100MB journal.
111//!
112//! # Durability Modes
113//!
114//! - `Immediate`: fsync after every write (safest, slowest)
115//! - `GroupCommit`: batch fsync every N ms or M operations
116//! - `Relaxed`: OS-managed flush (fastest, potential data loss on crash)
117
118use std::collections::VecDeque;
119use std::fs::{File, OpenOptions};
120use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
121use std::path::{Path, PathBuf};
122use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
123use std::sync::{Arc, Condvar, Mutex};
124use std::thread::{self, JoinHandle};
125use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
126
127use super::types::ChunkId;
128
129/// Journal magic number: "EMBR" in little-endian
130const JOURNAL_MAGIC: u32 = 0x52424D45;
131
132/// Current journal format version
133const JOURNAL_VERSION: u16 = 1;
134
135/// Header size in bytes
136const HEADER_SIZE: usize = 32;
137
138/// Default group commit interval (5ms)
139const DEFAULT_GROUP_COMMIT_MS: u64 = 5;
140
141/// Default max pending operations before flush
142const DEFAULT_MAX_PENDING_OPS: usize = 100;
143
144/// Threshold for using WAL vs simple fsync (64KB)
145const WAL_THRESHOLD_BYTES: usize = 64 * 1024;
146
147/// Record flags
148#[repr(u16)]
149#[derive(Clone, Copy, Debug, PartialEq, Eq)]
150pub enum RecordFlags {
151    /// Transaction is pending (written but not committed)
152    Pending = 0,
153    /// Transaction is committed
154    Committed = 1,
155    /// Transaction has been checkpointed (can be truncated)
156    Checkpointed = 2,
157    /// Transaction was aborted
158    Aborted = 3,
159}
160
161impl RecordFlags {
162    fn from_u16(v: u16) -> Option<Self> {
163        match v {
164            0 => Some(Self::Pending),
165            1 => Some(Self::Committed),
166            2 => Some(Self::Checkpointed),
167            3 => Some(Self::Aborted),
168            _ => None,
169        }
170    }
171}
172
173/// Durability mode for journal writes
174#[derive(Clone, Copy, Debug, PartialEq, Eq)]
175pub enum DurabilityMode {
176    /// fsync after every write (safest)
177    Immediate,
178    /// Batch fsync with configurable interval/count
179    GroupCommit {
180        max_delay_ms: u64,
181        max_pending_ops: usize,
182    },
183    /// OS-managed flush (fastest, potential data loss)
184    Relaxed,
185}
186
187impl Default for DurabilityMode {
188    fn default() -> Self {
189        // Default to group commit for balance of safety and performance
190        Self::GroupCommit {
191            max_delay_ms: DEFAULT_GROUP_COMMIT_MS,
192            max_pending_ops: DEFAULT_MAX_PENDING_OPS,
193        }
194    }
195}
196
197/// Journal record header
198#[derive(Clone, Debug)]
199pub struct RecordHeader {
200    pub magic: u32,
201    pub version: u16,
202    pub flags: RecordFlags,
203    pub txn_id: u64,
204    pub timestamp: u64,
205    pub payload_len: u32,
206    pub checksum: u32,
207}
208
209impl RecordHeader {
210    /// Create a new record header
211    pub fn new(txn_id: u64, flags: RecordFlags, payload_len: u32) -> Self {
212        let timestamp = SystemTime::now()
213            .duration_since(UNIX_EPOCH)
214            .unwrap_or_default()
215            .as_nanos() as u64;
216
217        Self {
218            magic: JOURNAL_MAGIC,
219            version: JOURNAL_VERSION,
220            flags,
221            txn_id,
222            timestamp,
223            payload_len,
224            checksum: 0, // Computed on serialize
225        }
226    }
227
228    /// Serialize header to bytes
229    pub fn to_bytes(&self, payload: &[u8]) -> Vec<u8> {
230        let mut buf = Vec::with_capacity(HEADER_SIZE);
231
232        buf.extend_from_slice(&self.magic.to_le_bytes());
233        buf.extend_from_slice(&self.version.to_le_bytes());
234        buf.extend_from_slice(&(self.flags as u16).to_le_bytes());
235        buf.extend_from_slice(&self.txn_id.to_le_bytes());
236        buf.extend_from_slice(&self.timestamp.to_le_bytes());
237        buf.extend_from_slice(&self.payload_len.to_le_bytes());
238
239        // Compute checksum over header (without checksum field) + payload
240        let checksum = crc32_compute(&buf, payload);
241        buf.extend_from_slice(&checksum.to_le_bytes());
242
243        buf
244    }
245
246    /// Parse header from bytes
247    pub fn from_bytes(data: &[u8]) -> io::Result<Self> {
248        if data.len() < HEADER_SIZE {
249            return Err(io::Error::new(
250                io::ErrorKind::InvalidData,
251                "header too short",
252            ));
253        }
254
255        let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
256        if magic != JOURNAL_MAGIC {
257            return Err(io::Error::new(
258                io::ErrorKind::InvalidData,
259                format!("invalid magic: {:08x}", magic),
260            ));
261        }
262
263        let version = u16::from_le_bytes(data[4..6].try_into().unwrap());
264        if version != JOURNAL_VERSION {
265            return Err(io::Error::new(
266                io::ErrorKind::InvalidData,
267                format!("unsupported version: {}", version),
268            ));
269        }
270
271        let flags = RecordFlags::from_u16(u16::from_le_bytes(data[6..8].try_into().unwrap()))
272            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "invalid flags"))?;
273
274        let txn_id = u64::from_le_bytes(data[8..16].try_into().unwrap());
275        let timestamp = u64::from_le_bytes(data[16..24].try_into().unwrap());
276        let payload_len = u32::from_le_bytes(data[24..28].try_into().unwrap());
277        let checksum = u32::from_le_bytes(data[28..32].try_into().unwrap());
278
279        Ok(Self {
280            magic,
281            version,
282            flags,
283            txn_id,
284            timestamp,
285            payload_len,
286            checksum,
287        })
288    }
289
290    /// Verify checksum against payload
291    pub fn verify_checksum(&self, payload: &[u8]) -> bool {
292        let mut header_bytes = Vec::with_capacity(28);
293        header_bytes.extend_from_slice(&self.magic.to_le_bytes());
294        header_bytes.extend_from_slice(&self.version.to_le_bytes());
295        header_bytes.extend_from_slice(&(self.flags as u16).to_le_bytes());
296        header_bytes.extend_from_slice(&self.txn_id.to_le_bytes());
297        header_bytes.extend_from_slice(&self.timestamp.to_le_bytes());
298        header_bytes.extend_from_slice(&self.payload_len.to_le_bytes());
299
300        let computed = crc32_compute(&header_bytes, payload);
301        computed == self.checksum
302    }
303}
304
305/// Simple CRC32 computation (no external dependency)
306fn crc32_compute(header: &[u8], payload: &[u8]) -> u32 {
307    // CRC32-ISO polynomial
308    const POLYNOMIAL: u32 = 0xEDB88320;
309
310    let mut crc = 0xFFFFFFFF_u32;
311
312    for &byte in header.iter().chain(payload.iter()) {
313        crc ^= byte as u32;
314        for _ in 0..8 {
315            if crc & 1 != 0 {
316                crc = (crc >> 1) ^ POLYNOMIAL;
317            } else {
318                crc >>= 1;
319            }
320        }
321    }
322
323    !crc
324}
325
326/// Serialized operation for the journal
327#[derive(Clone, Debug)]
328pub enum JournalOp {
329    WriteChunk {
330        chunk_id: ChunkId,
331        data_hash: [u8; 32],
332        data_len: usize,
333    },
334    DeleteChunk {
335        chunk_id: ChunkId,
336    },
337    WriteFile {
338        path: String,
339        chunk_ids: Vec<ChunkId>,
340        size: usize,
341    },
342    DeleteFile {
343        path: String,
344    },
345    UpdateRoot {
346        root_hash: [u8; 32],
347    },
348    Barrier,
349}
350
351impl JournalOp {
352    /// Serialize operation to bytes
353    pub fn to_bytes(&self) -> Vec<u8> {
354        let mut buf = Vec::new();
355
356        match self {
357            JournalOp::WriteChunk {
358                chunk_id,
359                data_hash,
360                data_len,
361            } => {
362                buf.push(1);
363                buf.extend_from_slice(&(*chunk_id as u64).to_le_bytes());
364                buf.extend_from_slice(data_hash);
365                buf.extend_from_slice(&(*data_len as u64).to_le_bytes());
366            }
367            JournalOp::DeleteChunk { chunk_id } => {
368                buf.push(2);
369                buf.extend_from_slice(&(*chunk_id as u64).to_le_bytes());
370            }
371            JournalOp::WriteFile {
372                path,
373                chunk_ids,
374                size,
375            } => {
376                buf.push(3);
377                let path_bytes = path.as_bytes();
378                buf.extend_from_slice(&(path_bytes.len() as u32).to_le_bytes());
379                buf.extend_from_slice(path_bytes);
380                buf.extend_from_slice(&(chunk_ids.len() as u32).to_le_bytes());
381                for &id in chunk_ids {
382                    buf.extend_from_slice(&(id as u64).to_le_bytes());
383                }
384                buf.extend_from_slice(&(*size as u64).to_le_bytes());
385            }
386            JournalOp::DeleteFile { path } => {
387                buf.push(4);
388                let path_bytes = path.as_bytes();
389                buf.extend_from_slice(&(path_bytes.len() as u32).to_le_bytes());
390                buf.extend_from_slice(path_bytes);
391            }
392            JournalOp::UpdateRoot { root_hash } => {
393                buf.push(5);
394                buf.extend_from_slice(root_hash);
395            }
396            JournalOp::Barrier => {
397                buf.push(6);
398            }
399        }
400
401        buf
402    }
403
404    /// Deserialize operation from bytes
405    pub fn from_bytes(data: &[u8]) -> io::Result<(Self, usize)> {
406        if data.is_empty() {
407            return Err(io::Error::new(io::ErrorKind::InvalidData, "empty data"));
408        }
409
410        let op_type = data[0];
411        let mut pos = 1;
412
413        let op = match op_type {
414            1 => {
415                // WriteChunk
416                if data.len() < pos + 8 + 32 + 8 {
417                    return Err(io::Error::new(
418                        io::ErrorKind::InvalidData,
419                        "truncated WriteChunk",
420                    ));
421                }
422                let chunk_id =
423                    u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as ChunkId;
424                pos += 8;
425                let mut data_hash = [0u8; 32];
426                data_hash.copy_from_slice(&data[pos..pos + 32]);
427                pos += 32;
428                let data_len = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize;
429                pos += 8;
430                JournalOp::WriteChunk {
431                    chunk_id,
432                    data_hash,
433                    data_len,
434                }
435            }
436            2 => {
437                // DeleteChunk
438                if data.len() < pos + 8 {
439                    return Err(io::Error::new(
440                        io::ErrorKind::InvalidData,
441                        "truncated DeleteChunk",
442                    ));
443                }
444                let chunk_id =
445                    u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as ChunkId;
446                pos += 8;
447                JournalOp::DeleteChunk { chunk_id }
448            }
449            3 => {
450                // WriteFile
451                if data.len() < pos + 4 {
452                    return Err(io::Error::new(
453                        io::ErrorKind::InvalidData,
454                        "truncated WriteFile path_len",
455                    ));
456                }
457                let path_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
458                pos += 4;
459                if data.len() < pos + path_len {
460                    return Err(io::Error::new(
461                        io::ErrorKind::InvalidData,
462                        "truncated WriteFile path",
463                    ));
464                }
465                let path = String::from_utf8_lossy(&data[pos..pos + path_len]).into_owned();
466                pos += path_len;
467
468                if data.len() < pos + 4 {
469                    return Err(io::Error::new(
470                        io::ErrorKind::InvalidData,
471                        "truncated WriteFile chunk_count",
472                    ));
473                }
474                let chunk_count =
475                    u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
476                pos += 4;
477
478                let mut chunk_ids = Vec::with_capacity(chunk_count);
479                for _ in 0..chunk_count {
480                    if data.len() < pos + 8 {
481                        return Err(io::Error::new(
482                            io::ErrorKind::InvalidData,
483                            "truncated WriteFile chunk_id",
484                        ));
485                    }
486                    let id = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as ChunkId;
487                    pos += 8;
488                    chunk_ids.push(id);
489                }
490
491                if data.len() < pos + 8 {
492                    return Err(io::Error::new(
493                        io::ErrorKind::InvalidData,
494                        "truncated WriteFile size",
495                    ));
496                }
497                let size = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()) as usize;
498                pos += 8;
499
500                JournalOp::WriteFile {
501                    path,
502                    chunk_ids,
503                    size,
504                }
505            }
506            4 => {
507                // DeleteFile
508                if data.len() < pos + 4 {
509                    return Err(io::Error::new(
510                        io::ErrorKind::InvalidData,
511                        "truncated DeleteFile",
512                    ));
513                }
514                let path_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
515                pos += 4;
516                if data.len() < pos + path_len {
517                    return Err(io::Error::new(
518                        io::ErrorKind::InvalidData,
519                        "truncated DeleteFile path",
520                    ));
521                }
522                let path = String::from_utf8_lossy(&data[pos..pos + path_len]).into_owned();
523                pos += path_len;
524                JournalOp::DeleteFile { path }
525            }
526            5 => {
527                // UpdateRoot
528                if data.len() < pos + 32 {
529                    return Err(io::Error::new(
530                        io::ErrorKind::InvalidData,
531                        "truncated UpdateRoot",
532                    ));
533                }
534                let mut root_hash = [0u8; 32];
535                root_hash.copy_from_slice(&data[pos..pos + 32]);
536                pos += 32;
537                JournalOp::UpdateRoot { root_hash }
538            }
539            6 => JournalOp::Barrier,
540            _ => {
541                return Err(io::Error::new(
542                    io::ErrorKind::InvalidData,
543                    format!("unknown op type: {}", op_type),
544                ))
545            }
546        };
547
548        Ok((op, pos))
549    }
550}
551
552/// A pending write waiting for group commit
553struct PendingWrite {
554    #[allow(dead_code)]
555    txn_id: u64,
556    data: Vec<u8>,
557    committed: Arc<(Mutex<bool>, Condvar)>,
558}
559
560/// Journal for durable writes
561pub struct Journal {
562    /// Path to journal file
563    path: PathBuf,
564    /// Durability mode
565    mode: DurabilityMode,
566    /// Journal file writer (protected by mutex for group commit)
567    writer: Mutex<BufWriter<File>>,
568    /// Next transaction ID
569    next_txn_id: AtomicU64,
570    /// Whether journal is open
571    open: AtomicBool,
572    /// Pending writes for group commit
573    pending: Mutex<VecDeque<PendingWrite>>,
574    /// Last flush time
575    last_flush: Mutex<Instant>,
576    /// Background flush thread handle
577    flush_thread: Mutex<Option<JoinHandle<()>>>,
578    /// Signal to stop flush thread
579    stop_signal: Arc<AtomicBool>,
580}
581
582impl Journal {
583    /// Create or open a journal at the given path
584    pub fn open(path: impl AsRef<Path>, mode: DurabilityMode) -> io::Result<Arc<Self>> {
585        let path = path.as_ref().to_path_buf();
586
587        // Open or create journal file
588        let file = OpenOptions::new()
589            .create(true)
590            .truncate(false)
591            .read(true)
592            .write(true)
593            .open(&path)?;
594
595        let journal = Arc::new(Self {
596            path,
597            mode,
598            writer: Mutex::new(BufWriter::new(file)),
599            next_txn_id: AtomicU64::new(1),
600            open: AtomicBool::new(true),
601            pending: Mutex::new(VecDeque::new()),
602            last_flush: Mutex::new(Instant::now()),
603            flush_thread: Mutex::new(None),
604            stop_signal: Arc::new(AtomicBool::new(false)),
605        });
606
607        // Start background flush thread for group commit mode
608        if let DurabilityMode::GroupCommit { max_delay_ms, .. } = mode {
609            let journal_clone = Arc::clone(&journal);
610            let stop = Arc::clone(&journal.stop_signal);
611
612            let handle = thread::spawn(move || {
613                while !stop.load(Ordering::Relaxed) {
614                    thread::sleep(Duration::from_millis(max_delay_ms));
615                    if !stop.load(Ordering::Relaxed) {
616                        let _ = journal_clone.flush_pending();
617                    }
618                }
619            });
620
621            *journal.flush_thread.lock().unwrap() = Some(handle);
622        }
623
624        Ok(journal)
625    }
626
627    /// Write a transaction record to the journal
628    pub fn write_transaction(&self, ops: &[JournalOp]) -> io::Result<u64> {
629        if !self.open.load(Ordering::Acquire) {
630            return Err(io::Error::other("journal closed"));
631        }
632
633        let txn_id = self.next_txn_id.fetch_add(1, Ordering::AcqRel);
634
635        // Serialize operations
636        let mut payload = Vec::new();
637        for op in ops {
638            payload.extend_from_slice(&op.to_bytes());
639        }
640
641        // Create record
642        let header = RecordHeader::new(txn_id, RecordFlags::Committed, payload.len() as u32);
643        let header_bytes = header.to_bytes(&payload);
644
645        match self.mode {
646            DurabilityMode::Immediate => {
647                // Write and fsync immediately
648                let mut writer = self.writer.lock().unwrap();
649                writer.write_all(&header_bytes)?;
650                writer.write_all(&payload)?;
651                writer.flush()?;
652                writer.get_ref().sync_all()?;
653            }
654            DurabilityMode::GroupCommit {
655                max_pending_ops, ..
656            } => {
657                // Add to pending queue
658                let committed = Arc::new((Mutex::new(false), Condvar::new()));
659                let committed_clone = Arc::clone(&committed);
660
661                let mut record_data = header_bytes;
662                record_data.extend_from_slice(&payload);
663
664                {
665                    let mut pending = self.pending.lock().unwrap();
666                    pending.push_back(PendingWrite {
667                        txn_id,
668                        data: record_data,
669                        committed: committed_clone,
670                    });
671
672                    // Flush if we've hit the pending limit
673                    if pending.len() >= max_pending_ops {
674                        drop(pending);
675                        self.flush_pending()?;
676                    }
677                }
678
679                // Wait for commit
680                let (lock, cvar) = &*committed;
681                let mut done = lock.lock().unwrap();
682                while !*done {
683                    done = cvar.wait(done).unwrap();
684                }
685            }
686            DurabilityMode::Relaxed => {
687                // Write without fsync
688                let mut writer = self.writer.lock().unwrap();
689                writer.write_all(&header_bytes)?;
690                writer.write_all(&payload)?;
691                // Don't fsync - let OS handle it
692            }
693        }
694
695        Ok(txn_id)
696    }
697
698    /// Flush all pending writes to disk
699    pub fn flush_pending(&self) -> io::Result<()> {
700        let writes: Vec<PendingWrite> = {
701            let mut pending = self.pending.lock().unwrap();
702            pending.drain(..).collect()
703        };
704
705        if writes.is_empty() {
706            return Ok(());
707        }
708
709        // Write all pending records
710        {
711            let mut writer = self.writer.lock().unwrap();
712            for write in &writes {
713                writer.write_all(&write.data)?;
714            }
715            writer.flush()?;
716            writer.get_ref().sync_all()?;
717        }
718
719        // Signal all waiters
720        for write in writes {
721            let (lock, cvar) = &*write.committed;
722            let mut done = lock.lock().unwrap();
723            *done = true;
724            cvar.notify_one();
725        }
726
727        *self.last_flush.lock().unwrap() = Instant::now();
728
729        Ok(())
730    }
731
732    /// Write a simple fsync barrier (for small single-file ops)
733    pub fn write_barrier(&self) -> io::Result<()> {
734        self.write_transaction(&[JournalOp::Barrier])?;
735        Ok(())
736    }
737
738    /// Read and replay journal records
739    pub fn replay(&self) -> io::Result<Vec<(u64, Vec<JournalOp>)>> {
740        let mut file = OpenOptions::new().read(true).open(&self.path)?;
741        file.seek(SeekFrom::Start(0))?;
742
743        let mut transactions = Vec::new();
744        let mut header_buf = [0u8; HEADER_SIZE];
745
746        loop {
747            match file.read_exact(&mut header_buf) {
748                Ok(()) => {}
749                Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
750                Err(e) => return Err(e),
751            }
752
753            let header = RecordHeader::from_bytes(&header_buf)?;
754
755            // Read payload
756            let mut payload = vec![0u8; header.payload_len as usize];
757            file.read_exact(&mut payload)?;
758
759            // Verify checksum
760            if !header.verify_checksum(&payload) {
761                return Err(io::Error::new(
762                    io::ErrorKind::InvalidData,
763                    "checksum mismatch",
764                ));
765            }
766
767            // Skip non-committed records
768            if header.flags != RecordFlags::Committed {
769                continue;
770            }
771
772            // Parse operations
773            let mut ops = Vec::new();
774            let mut pos = 0;
775            while pos < payload.len() {
776                let (op, consumed) = JournalOp::from_bytes(&payload[pos..])?;
777                ops.push(op);
778                pos += consumed;
779            }
780
781            transactions.push((header.txn_id, ops));
782        }
783
784        Ok(transactions)
785    }
786
787    /// Checkpoint the journal (truncate replayed records)
788    pub fn checkpoint(&self) -> io::Result<()> {
789        // Flush any pending writes first
790        self.flush_pending()?;
791
792        // Truncate the journal file
793        let mut writer = self.writer.lock().unwrap();
794        writer.get_ref().set_len(0)?;
795        writer.seek(SeekFrom::Start(0))?;
796        writer.get_ref().sync_all()?;
797
798        Ok(())
799    }
800
801    /// Close the journal
802    pub fn close(&self) -> io::Result<()> {
803        self.open.store(false, Ordering::Release);
804        self.stop_signal.store(true, Ordering::Release);
805
806        // Flush remaining writes
807        self.flush_pending()?;
808
809        // Wait for flush thread to finish
810        if let Some(handle) = self.flush_thread.lock().unwrap().take() {
811            let _ = handle.join();
812        }
813
814        Ok(())
815    }
816
817    /// Check if an operation should use WAL vs simple barrier
818    pub fn should_use_wal(data_size: usize, op_count: usize) -> bool {
819        data_size > WAL_THRESHOLD_BYTES || op_count > 1
820    }
821}
822
823impl Drop for Journal {
824    fn drop(&mut self) {
825        let _ = self.close();
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use super::*;
832    use tempfile::tempdir;
833
834    #[test]
835    fn test_record_header_roundtrip() {
836        let payload = b"test payload data";
837        let header = RecordHeader::new(42, RecordFlags::Committed, payload.len() as u32);
838
839        let header_bytes = header.to_bytes(payload);
840        let parsed = RecordHeader::from_bytes(&header_bytes).unwrap();
841
842        assert_eq!(parsed.magic, JOURNAL_MAGIC);
843        assert_eq!(parsed.version, JOURNAL_VERSION);
844        assert_eq!(parsed.flags, RecordFlags::Committed);
845        assert_eq!(parsed.txn_id, 42);
846        assert_eq!(parsed.payload_len, payload.len() as u32);
847        assert!(parsed.verify_checksum(payload));
848    }
849
850    #[test]
851    fn test_journal_op_roundtrip() {
852        let ops = vec![
853            JournalOp::WriteChunk {
854                chunk_id: 123,
855                data_hash: [0xAB; 32],
856                data_len: 4096,
857            },
858            JournalOp::WriteFile {
859                path: "/etc/passwd".to_string(),
860                chunk_ids: vec![1, 2, 3],
861                size: 1024,
862            },
863            JournalOp::DeleteFile {
864                path: "/tmp/test".to_string(),
865            },
866            JournalOp::Barrier,
867        ];
868
869        for op in ops {
870            let bytes = op.to_bytes();
871            let (parsed, _) = JournalOp::from_bytes(&bytes).unwrap();
872
873            match (&op, &parsed) {
874                (
875                    JournalOp::WriteChunk {
876                        chunk_id: a,
877                        data_len: b,
878                        ..
879                    },
880                    JournalOp::WriteChunk {
881                        chunk_id: c,
882                        data_len: d,
883                        ..
884                    },
885                ) => {
886                    assert_eq!(a, c);
887                    assert_eq!(b, d);
888                }
889                (
890                    JournalOp::WriteFile {
891                        path: a,
892                        chunk_ids: b,
893                        ..
894                    },
895                    JournalOp::WriteFile {
896                        path: c,
897                        chunk_ids: d,
898                        ..
899                    },
900                ) => {
901                    assert_eq!(a, c);
902                    assert_eq!(b, d);
903                }
904                (JournalOp::DeleteFile { path: a }, JournalOp::DeleteFile { path: b }) => {
905                    assert_eq!(a, b);
906                }
907                (JournalOp::Barrier, JournalOp::Barrier) => {}
908                _ => panic!("mismatched op types"),
909            }
910        }
911    }
912
913    #[test]
914    fn test_journal_write_replay() {
915        let dir = tempdir().unwrap();
916        let journal_path = dir.path().join("test.journal");
917
918        // Write some transactions
919        {
920            let journal = Journal::open(&journal_path, DurabilityMode::Immediate).unwrap();
921
922            journal
923                .write_transaction(&[JournalOp::WriteChunk {
924                    chunk_id: 1,
925                    data_hash: [0x11; 32],
926                    data_len: 100,
927                }])
928                .unwrap();
929
930            journal
931                .write_transaction(&[
932                    JournalOp::WriteFile {
933                        path: "/test".to_string(),
934                        chunk_ids: vec![1],
935                        size: 100,
936                    },
937                    JournalOp::Barrier,
938                ])
939                .unwrap();
940
941            journal.close().unwrap();
942        }
943
944        // Replay
945        {
946            let journal = Journal::open(&journal_path, DurabilityMode::Immediate).unwrap();
947            let txns = journal.replay().unwrap();
948
949            assert_eq!(txns.len(), 2);
950            assert_eq!(txns[0].0, 1); // txn_id
951            assert_eq!(txns[1].0, 2);
952
953            // First transaction has 1 op
954            assert_eq!(txns[0].1.len(), 1);
955            // Second transaction has 2 ops
956            assert_eq!(txns[1].1.len(), 2);
957        }
958    }
959
960    #[test]
961    fn test_should_use_wal() {
962        // Small single op - no WAL
963        assert!(!Journal::should_use_wal(1024, 1));
964
965        // Large single op - use WAL
966        assert!(Journal::should_use_wal(100 * 1024, 1));
967
968        // Multiple ops - use WAL
969        assert!(Journal::should_use_wal(1024, 5));
970    }
971}