Skip to main content

cqlite_core/storage/write_engine/
wal.rs

1//! Write-ahead log (WAL) for crash recovery
2//!
3//! Provides durability guarantees for mutations before they reach the memtable.
4//! Every mutation is fsync'd to the WAL before being acknowledged.
5//!
6//! ## WAL Entry Format
7//!
8//! Each entry in the WAL follows this binary format:
9//!
10//! ```text
11//! [u32 LE: entry_length] (4 bytes)
12//! [u32 LE: crc32]        (4 bytes)
13//! [bytes: serialized Mutation] (entry_length bytes)
14//! ```
15//!
16//! The CRC32 checksum is computed over the serialized mutation bytes only.
17//! This format allows for:
18//! - Detection of corrupted entries during replay
19//! - Safe truncation at partial writes (crash during append)
20//! - Sequential append with minimal overhead
21//!
22//! ## Memory Budget
23//!
24//! - 4 KB buffer for sequential append (configurable)
25//! - Flushes to disk on explicit sync() or buffer full
26//!
27//! ## Crash Recovery
28//!
29//! On startup, replay() reads all valid entries:
30//! - Corrupted entries: logged as warnings, skipped
31//! - Truncated entries: stop replay (incomplete write)
32//! - Valid entries: returned in order for memtable replay
33
34use crate::error::{Error, Result};
35use crate::storage::write_engine::mutation::Mutation;
36use crc32fast::Hasher;
37use std::fs::{File, OpenOptions};
38use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
39use std::path::{Path, PathBuf};
40
41/// Sync directory metadata to ensure file entries are persisted
42///
43/// On POSIX systems this is critical for crash safety - without syncing the
44/// directory, newly created or renamed files may not appear after a crash.
45///
46/// Windows does not allow opening a directory as a file (ERROR_ACCESS_DENIED).
47/// NTFS commits directory metadata together with the contained file's data
48/// when `sync_all` is called on the file itself, so an explicit directory
49/// sync is unnecessary on Windows and we skip it.
50#[cfg(unix)]
51fn sync_directory(dir: &Path) -> Result<()> {
52    let dir_file = File::open(dir)
53        .map_err(|e| Error::Storage(format!("Failed to open directory for sync: {}", e)))?;
54
55    dir_file
56        .sync_all()
57        .map_err(|e| Error::Storage(format!("Failed to sync directory: {}", e)))?;
58
59    Ok(())
60}
61
62#[cfg(not(unix))]
63fn sync_directory(_dir: &Path) -> Result<()> {
64    Ok(())
65}
66
67/// Validate WAL directory path for security
68///
69/// This prevents path traversal attacks and ensures the directory is safe to use.
70///
71/// # Security Checks
72///
73/// - Directory must exist
74/// - Path is canonicalized to resolve symlinks and `..' sequences
75/// - Path must not contain control characters
76///
77/// # Arguments
78///
79/// * `dir` - Directory path to validate
80///
81/// # Errors
82///
83/// Returns an error if validation fails
84fn validate_wal_directory(dir: &Path) -> Result<PathBuf> {
85    // Check directory exists
86    if !dir.exists() {
87        return Err(Error::InvalidPath(format!(
88            "WAL directory does not exist: {:?}",
89            dir
90        )));
91    }
92
93    if !dir.is_dir() {
94        return Err(Error::InvalidPath(format!(
95            "WAL path is not a directory: {:?}",
96            dir
97        )));
98    }
99
100    // Canonicalize to resolve symlinks and '..' sequences
101    let canonical = dir
102        .canonicalize()
103        .map_err(|e| Error::InvalidPath(format!("Failed to canonicalize WAL directory: {}", e)))?;
104
105    // Check for control characters in the path
106    let path_str = canonical.to_string_lossy();
107    if path_str.chars().any(|c| c.is_control()) {
108        return Err(Error::InvalidPath(
109            "WAL directory path contains control characters".to_string(),
110        ));
111    }
112
113    Ok(canonical)
114}
115
116/// Set secure file permissions on Unix platforms
117///
118/// This restricts WAL file access to the owner only (0o600)
119#[cfg(unix)]
120fn set_secure_permissions(file: &File) -> Result<()> {
121    use std::os::unix::fs::PermissionsExt;
122
123    let mut perms = file
124        .metadata()
125        .map_err(|e| Error::Storage(format!("Failed to read file metadata: {}", e)))?
126        .permissions();
127
128    perms.set_mode(0o600);
129
130    file.set_permissions(perms)
131        .map_err(|e| Error::Storage(format!("Failed to set file permissions: {}", e)))?;
132
133    Ok(())
134}
135
136/// Set secure file permissions (no-op on non-Unix platforms)
137#[cfg(not(unix))]
138fn set_secure_permissions(_file: &File) -> Result<()> {
139    // No-op on Windows - NTFS permissions are handled differently
140    Ok(())
141}
142
143/// Write-ahead log for crash recovery
144///
145/// Provides durable storage for mutations before they reach the memtable.
146/// Every mutation is serialized to an append-only log and fsync'd to disk.
147///
148/// ## Usage
149///
150/// ```no_run
151/// use cqlite_core::storage::write_engine::{WriteAheadLog, Mutation};
152/// use std::path::Path;
153///
154/// # fn example() -> cqlite_core::error::Result<()> {
155/// // Create a new WAL
156/// let mut wal = WriteAheadLog::create(Path::new("/data"))?;
157///
158/// // Append mutations (serialized with CRC32)
159/// // let mutation = Mutation::new(...);
160/// // wal.append(&mutation)?;
161///
162/// // Explicit sync to disk
163/// wal.sync()?;
164///
165/// // On recovery, replay all valid entries
166/// // let mutations = wal.replay()?;
167/// # Ok(())
168/// # }
169/// ```
170#[derive(Debug)]
171pub struct WriteAheadLog {
172    /// Buffered writer for sequential appends
173    file: BufWriter<File>,
174    /// Path to the WAL file
175    path: PathBuf,
176    /// Buffer size (4KB default) - stored for diagnostic purposes
177    #[allow(dead_code)]
178    buffer_size: usize,
179    /// Current size of the WAL file (in bytes)
180    current_size: u64,
181}
182
183impl WriteAheadLog {
184    /// Default buffer size (4 KB)
185    pub const DEFAULT_BUFFER_SIZE: usize = 4096;
186
187    /// WAL file name
188    pub const WAL_FILENAME: &'static str = "commitlog.wal";
189
190    /// Create a new WAL in the specified directory
191    ///
192    /// This creates a new WAL file with the default buffer size (4 KB).
193    /// If a WAL already exists in the directory, it will be truncated.
194    ///
195    /// # Arguments
196    ///
197    /// * `dir` - Directory where the WAL file will be created
198    ///
199    /// # Returns
200    ///
201    /// A new `WriteAheadLog` instance ready for appending.
202    ///
203    /// # Errors
204    ///
205    /// Returns an error if the directory doesn't exist or the file cannot be created.
206    pub fn create(dir: &Path) -> Result<Self> {
207        Self::create_with_buffer_size(dir, Self::DEFAULT_BUFFER_SIZE)
208    }
209
210    /// Create a new WAL with a custom buffer size
211    ///
212    /// # Arguments
213    ///
214    /// * `dir` - Directory where the WAL file will be created
215    /// * `buffer_size` - Size of the append buffer in bytes
216    pub fn create_with_buffer_size(dir: &Path, buffer_size: usize) -> Result<Self> {
217        // Validate directory path for security
218        let validated_dir = validate_wal_directory(dir)?;
219        let path = validated_dir.join(Self::WAL_FILENAME);
220
221        let file = OpenOptions::new()
222            .create(true)
223            .write(true)
224            .truncate(true)
225            .open(&path)
226            .map_err(|e| Error::Storage(format!("Failed to create WAL at {:?}: {}", path, e)))?;
227
228        // Set secure file permissions (Unix: 0o600)
229        set_secure_permissions(&file)?;
230
231        // Sync directory to ensure file entry is persisted
232        sync_directory(&validated_dir)?;
233
234        Ok(Self {
235            file: BufWriter::with_capacity(buffer_size, file),
236            path,
237            buffer_size,
238            current_size: 0,
239        })
240    }
241
242    /// Open an existing WAL file for appending
243    ///
244    /// This opens an existing WAL and seeks to the end, ready for new appends.
245    /// Use this for recovery scenarios where you want to append to an existing log.
246    ///
247    /// # Arguments
248    ///
249    /// * `path` - Path to the existing WAL file
250    ///
251    /// # Returns
252    ///
253    /// A `WriteAheadLog` positioned at the end of the file.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if the file doesn't exist or cannot be opened.
258    pub fn open_existing(path: &Path) -> Result<Self> {
259        let file = OpenOptions::new()
260            .read(true)
261            .append(true)
262            .open(path)
263            .map_err(|e| Error::Storage(format!("Failed to open WAL at {:?}: {}", path, e)))?;
264
265        let metadata = file
266            .metadata()
267            .map_err(|e| Error::Storage(format!("Failed to read WAL metadata: {}", e)))?;
268
269        let current_size = metadata.len();
270
271        Ok(Self {
272            file: BufWriter::with_capacity(Self::DEFAULT_BUFFER_SIZE, file),
273            path: path.to_path_buf(),
274            buffer_size: Self::DEFAULT_BUFFER_SIZE,
275            current_size,
276        })
277    }
278
279    /// Append a mutation to the WAL
280    ///
281    /// This serializes the mutation using bincode and writes it to the buffer.
282    /// The entry is not guaranteed to be on disk until `sync()` is called.
283    ///
284    /// # Entry Format
285    ///
286    /// ```text
287    /// [u32 LE: entry_length]
288    /// [u32 LE: crc32]
289    /// [bytes: serialized mutation]
290    /// ```
291    ///
292    /// # Arguments
293    ///
294    /// * `mutation` - The mutation to append
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if serialization fails or the write fails.
299    pub fn append(&mut self, mutation: &Mutation) -> Result<()> {
300        // Serialize mutation using bincode
301        let mutation_bytes = bincode::serialize(mutation)
302            .map_err(|e| Error::Storage(format!("Failed to serialize mutation: {}", e)))?;
303
304        let entry_length = mutation_bytes.len() as u32;
305
306        // Calculate CRC32 over the mutation bytes
307        let mut hasher = Hasher::new();
308        hasher.update(&mutation_bytes);
309        let crc32 = hasher.finalize();
310
311        // Write entry: [length][crc32][mutation_bytes]
312        self.file
313            .write_all(&entry_length.to_le_bytes())
314            .map_err(|e| Error::Storage(format!("Failed to write entry length: {}", e)))?;
315
316        self.file
317            .write_all(&crc32.to_le_bytes())
318            .map_err(|e| Error::Storage(format!("Failed to write CRC32: {}", e)))?;
319
320        self.file
321            .write_all(&mutation_bytes)
322            .map_err(|e| Error::Storage(format!("Failed to write mutation bytes: {}", e)))?;
323
324        // Update size (8 bytes header + mutation bytes)
325        self.current_size += 8 + entry_length as u64;
326
327        Ok(())
328    }
329
330    /// Sync the WAL to disk (fsync)
331    ///
332    /// This flushes the buffer and calls fsync to ensure all data is written
333    /// to persistent storage. This is required for durability guarantees.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if the flush or sync operation fails.
338    pub fn sync(&mut self) -> Result<()> {
339        self.file
340            .flush()
341            .map_err(|e| Error::Storage(format!("Failed to flush WAL buffer: {}", e)))?;
342
343        self.file
344            .get_ref()
345            .sync_all()
346            .map_err(|e| Error::Storage(format!("Failed to sync WAL to disk: {}", e)))?;
347
348        Ok(())
349    }
350
351    /// Replay all valid entries from the WAL
352    ///
353    /// Reads the WAL from the beginning and deserializes all valid entries.
354    /// This is used during crash recovery to rebuild the memtable.
355    ///
356    /// ## Corruption Handling
357    ///
358    /// - **Corrupted entries** (CRC mismatch): Logged as warnings, skipped
359    /// - **Truncated entries** (incomplete write): Stops replay, returns valid entries
360    /// - **Valid entries**: Deserialized and returned in order
361    ///
362    /// # Returns
363    ///
364    /// A vector of all valid mutations read from the WAL.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if the WAL file cannot be opened or read.
369    pub fn replay(&self) -> Result<Vec<Mutation>> {
370        let mut file = File::open(&self.path)
371            .map_err(|e| Error::Storage(format!("Failed to open WAL for replay: {}", e)))?;
372
373        let mut mutations = Vec::new();
374        let mut offset = 0u64;
375
376        loop {
377            // Read entry header: [length][crc32]
378            let mut header = [0u8; 8];
379            match file.read_exact(&mut header) {
380                Ok(_) => {}
381                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
382                    // End of file or truncated header - stop replay
383                    break;
384                }
385                Err(e) => {
386                    return Err(Error::Storage(format!(
387                        "Failed to read WAL header at offset {}: {}",
388                        offset, e
389                    )));
390                }
391            }
392
393            let entry_length = u32::from_le_bytes([header[0], header[1], header[2], header[3]]);
394            let expected_crc = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
395
396            // Sanity check: entry length should be reasonable (<16MB)
397            if entry_length > 16 * 1024 * 1024 {
398                log::warn!(
399                    "WAL entry at offset {} has unreasonable length {} - stopping replay",
400                    offset,
401                    entry_length
402                );
403                break;
404            }
405
406            // Read mutation bytes
407            let mut mutation_bytes = vec![0u8; entry_length as usize];
408            match file.read_exact(&mut mutation_bytes) {
409                Ok(_) => {}
410                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
411                    // Truncated entry - stop replay
412                    log::warn!(
413                        "WAL entry at offset {} is truncated (expected {} bytes) - stopping replay",
414                        offset,
415                        entry_length
416                    );
417                    break;
418                }
419                Err(e) => {
420                    return Err(Error::Storage(format!(
421                        "Failed to read WAL entry at offset {}: {}",
422                        offset, e
423                    )));
424                }
425            }
426
427            // Verify CRC32
428            let mut hasher = Hasher::new();
429            hasher.update(&mutation_bytes);
430            let actual_crc = hasher.finalize();
431
432            if actual_crc != expected_crc {
433                log::warn!(
434                    "WAL entry at offset {} has CRC mismatch (expected 0x{:08x}, got 0x{:08x}) - skipping",
435                    offset,
436                    expected_crc,
437                    actual_crc
438                );
439                offset += 8 + entry_length as u64;
440                continue;
441            }
442
443            // Deserialize mutation
444            match bincode::deserialize::<Mutation>(&mutation_bytes) {
445                Ok(mutation) => {
446                    mutations.push(mutation);
447                }
448                Err(e) => {
449                    log::warn!(
450                        "WAL entry at offset {} failed to deserialize: {} - skipping",
451                        offset,
452                        e
453                    );
454                }
455            }
456
457            offset += 8 + entry_length as u64;
458        }
459
460        Ok(mutations)
461    }
462
463    /// Truncate the WAL (clear all entries)
464    ///
465    /// This is used after a successful flush to memtable/SSTable, removing
466    /// old entries that are no longer needed for recovery.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the truncate operation fails.
471    pub fn truncate(&mut self) -> Result<()> {
472        // Flush any pending writes first
473        self.file
474            .flush()
475            .map_err(|e| Error::Storage(format!("Failed to flush before truncate: {}", e)))?;
476
477        // Truncate the file to zero length
478        self.file
479            .get_mut()
480            .set_len(0)
481            .map_err(|e| Error::Storage(format!("Failed to truncate WAL: {}", e)))?;
482
483        // Fsync after truncate to ensure operation is persisted
484        self.file
485            .get_ref()
486            .sync_all()
487            .map_err(|e| Error::Storage(format!("Failed to sync after truncate: {}", e)))?;
488
489        // Seek to beginning
490        self.file
491            .get_mut()
492            .seek(SeekFrom::Start(0))
493            .map_err(|e| Error::Storage(format!("Failed to seek after truncate: {}", e)))?;
494
495        self.current_size = 0;
496
497        Ok(())
498    }
499
500    /// Get the current size of the WAL in bytes
501    pub fn size(&self) -> u64 {
502        self.current_size
503    }
504
505    /// Get the path to the WAL file
506    pub fn path(&self) -> &Path {
507        &self.path
508    }
509
510    /// Rotate the WAL (create a new one, keeping the old)
511    ///
512    /// This creates a new WAL file with a timestamp suffix and returns a new
513    /// `WriteAheadLog` instance. The old WAL file is left intact for archival
514    /// or backup purposes.
515    ///
516    /// The old file is renamed to: `commitlog.wal.{timestamp}`
517    ///
518    /// # Arguments
519    ///
520    /// * `dir` - Directory where the new WAL will be created
521    ///
522    /// # Returns
523    ///
524    /// A new `WriteAheadLog` instance ready for appending.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if the rotation fails.
529    pub fn rotate(mut self, dir: &Path) -> Result<Self> {
530        // Flush and sync the current WAL
531        self.sync()?;
532
533        // Generate timestamp suffix
534        let timestamp = std::time::SystemTime::now()
535            .duration_since(std::time::UNIX_EPOCH)
536            .unwrap()
537            .as_secs();
538
539        let old_path = self.path.clone();
540        let archived_path = dir.join(format!("commitlog.wal.{}", timestamp));
541
542        // Drop the writer to close the file
543        drop(self.file);
544
545        // Rename the old WAL
546        std::fs::rename(&old_path, &archived_path)
547            .map_err(|e| Error::Storage(format!("Failed to rename WAL during rotation: {}", e)))?;
548
549        // Sync directory to ensure rename is persisted
550        sync_directory(dir)?;
551
552        // Create a new WAL
553        Self::create(dir)
554    }
555
556    /// Delete an old WAL file
557    ///
558    /// This is used to clean up archived WAL files after a successful flush
559    /// or when they are no longer needed for recovery.
560    ///
561    /// # Arguments
562    ///
563    /// * `path` - Path to the WAL file to delete
564    ///
565    /// # Errors
566    ///
567    /// Returns an error if the delete operation fails.
568    pub fn delete_old(path: &Path) -> Result<()> {
569        std::fs::remove_file(path)
570            .map_err(|e| Error::Storage(format!("Failed to delete old WAL: {}", e)))?;
571        Ok(())
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use crate::storage::write_engine::mutation::{
579        CellOperation, ClusteringKey, Mutation, PartitionKey, TableId,
580    };
581    use crate::types::Value;
582    use tempfile::TempDir;
583
584    fn create_test_mutation(id: i32, name: &str) -> Mutation {
585        let table_id = TableId::new("test_ks", "test_table");
586        let pk = PartitionKey::single("id", Value::Integer(id));
587        let ops = vec![CellOperation::Write {
588            column: "name".to_string(),
589            value: Value::Text(name.to_string()),
590        }];
591
592        Mutation::new(table_id, pk, None, ops, 1234567890, None)
593    }
594
595    #[test]
596    fn test_wal_create() {
597        let temp_dir = TempDir::new().unwrap();
598        let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
599
600        assert_eq!(wal.size(), 0);
601        assert!(wal.path().exists());
602    }
603
604    #[test]
605    fn test_wal_append_and_sync() {
606        let temp_dir = TempDir::new().unwrap();
607        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
608
609        let mutation = create_test_mutation(1, "Alice");
610        wal.append(&mutation).unwrap();
611
612        assert!(wal.size() > 0);
613
614        wal.sync().unwrap();
615    }
616
617    #[test]
618    fn test_wal_replay_empty() {
619        let temp_dir = TempDir::new().unwrap();
620        let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
621
622        let mutations = wal.replay().unwrap();
623        assert_eq!(mutations.len(), 0);
624    }
625
626    #[test]
627    fn test_wal_replay_single_entry() {
628        let temp_dir = TempDir::new().unwrap();
629        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
630
631        let mutation = create_test_mutation(1, "Alice");
632        wal.append(&mutation).unwrap();
633        wal.sync().unwrap();
634
635        let mutations = wal.replay().unwrap();
636        assert_eq!(mutations.len(), 1);
637        assert_eq!(mutations[0].table.keyspace, "test_ks");
638        assert_eq!(mutations[0].table.table, "test_table");
639    }
640
641    #[test]
642    fn test_wal_replay_multiple_entries() {
643        let temp_dir = TempDir::new().unwrap();
644        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
645
646        for i in 0..10 {
647            let mutation = create_test_mutation(i, &format!("User{}", i));
648            wal.append(&mutation).unwrap();
649        }
650        wal.sync().unwrap();
651
652        let mutations = wal.replay().unwrap();
653        assert_eq!(mutations.len(), 10);
654
655        for (i, mutation) in mutations.iter().enumerate() {
656            assert_eq!(mutation.table.keyspace, "test_ks");
657            match &mutation.operations[0] {
658                CellOperation::Write { column, value } => {
659                    assert_eq!(column, "name");
660                    if let Value::Text(name) = value {
661                        assert_eq!(name, &format!("User{}", i));
662                    } else {
663                        panic!("Expected Text value");
664                    }
665                }
666                _ => panic!("Expected Write operation"),
667            }
668        }
669    }
670
671    #[test]
672    fn test_wal_truncate() {
673        let temp_dir = TempDir::new().unwrap();
674        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
675
676        let mutation = create_test_mutation(1, "Alice");
677        wal.append(&mutation).unwrap();
678        wal.sync().unwrap();
679
680        assert!(wal.size() > 0);
681
682        wal.truncate().unwrap();
683        assert_eq!(wal.size(), 0);
684
685        let mutations = wal.replay().unwrap();
686        assert_eq!(mutations.len(), 0);
687    }
688
689    #[test]
690    fn test_wal_crc_corruption() {
691        let temp_dir = TempDir::new().unwrap();
692        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
693
694        let mutation = create_test_mutation(1, "Alice");
695        wal.append(&mutation).unwrap();
696        wal.sync().unwrap();
697
698        // Corrupt the CRC32 field (bytes 4-7)
699        let wal_path = wal.path().to_path_buf();
700        drop(wal);
701
702        let mut file = OpenOptions::new().write(true).open(&wal_path).unwrap();
703        file.seek(SeekFrom::Start(4)).unwrap();
704        file.write_all(&[0xFF, 0xFF, 0xFF, 0xFF]).unwrap();
705        file.sync_all().unwrap();
706        drop(file);
707
708        // Replay should skip the corrupted entry
709        let wal = WriteAheadLog::open_existing(&wal_path).unwrap();
710        let mutations = wal.replay().unwrap();
711        assert_eq!(mutations.len(), 0);
712    }
713
714    #[test]
715    fn test_wal_truncated_entry() {
716        let temp_dir = TempDir::new().unwrap();
717        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
718
719        let mutation = create_test_mutation(1, "Alice");
720        wal.append(&mutation).unwrap();
721        wal.sync().unwrap();
722
723        let wal_path = wal.path().to_path_buf();
724        let original_size = wal.size();
725        drop(wal);
726
727        // Truncate the file to simulate incomplete write
728        let file = OpenOptions::new().write(true).open(&wal_path).unwrap();
729        file.set_len(original_size - 10).unwrap();
730        drop(file);
731
732        // Replay should stop at truncated entry
733        let wal = WriteAheadLog::open_existing(&wal_path).unwrap();
734        let mutations = wal.replay().unwrap();
735        assert_eq!(mutations.len(), 0);
736    }
737
738    #[test]
739    fn test_wal_rotate() {
740        let temp_dir = TempDir::new().unwrap();
741        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
742
743        let mutation = create_test_mutation(1, "Alice");
744        wal.append(&mutation).unwrap();
745        wal.sync().unwrap();
746
747        // Rotate the WAL
748        let wal = wal.rotate(temp_dir.path()).unwrap();
749
750        // New WAL should be empty
751        assert_eq!(wal.size(), 0);
752
753        // Old WAL should be archived
754        let archived_files: Vec<_> = std::fs::read_dir(temp_dir.path())
755            .unwrap()
756            .filter_map(|e| e.ok())
757            .filter(|e| {
758                e.file_name()
759                    .to_string_lossy()
760                    .starts_with("commitlog.wal.")
761            })
762            .collect();
763
764        assert_eq!(archived_files.len(), 1);
765    }
766
767    #[test]
768    fn test_wal_delete_old() {
769        let temp_dir = TempDir::new().unwrap();
770        let wal_path = temp_dir.path().join("test.wal");
771
772        // Create a dummy WAL file
773        File::create(&wal_path).unwrap();
774        assert!(wal_path.exists());
775
776        // Delete it
777        WriteAheadLog::delete_old(&wal_path).unwrap();
778        assert!(!wal_path.exists());
779    }
780
781    #[test]
782    fn test_wal_open_existing() {
783        let temp_dir = TempDir::new().unwrap();
784        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
785
786        let mutation1 = create_test_mutation(1, "Alice");
787        wal.append(&mutation1).unwrap();
788        wal.sync().unwrap();
789
790        let wal_path = wal.path().to_path_buf();
791        drop(wal);
792
793        // Reopen the WAL
794        let mut wal = WriteAheadLog::open_existing(&wal_path).unwrap();
795
796        // Append another entry
797        let mutation2 = create_test_mutation(2, "Bob");
798        wal.append(&mutation2).unwrap();
799        wal.sync().unwrap();
800
801        // Replay should get both entries
802        let mutations = wal.replay().unwrap();
803        assert_eq!(mutations.len(), 2);
804    }
805
806    #[test]
807    fn test_wal_with_clustering_key() {
808        let temp_dir = TempDir::new().unwrap();
809        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
810
811        let table_id = TableId::new("test_ks", "test_table");
812        let pk = PartitionKey::single("id", Value::Integer(1));
813        let ck = Some(ClusteringKey::single("ts", Value::Timestamp(1000)));
814        let ops = vec![CellOperation::Write {
815            column: "value".to_string(),
816            value: Value::Text("test".to_string()),
817        }];
818
819        let mutation = Mutation::new(table_id, pk, ck, ops, 1234567890, None);
820        wal.append(&mutation).unwrap();
821        wal.sync().unwrap();
822
823        let mutations = wal.replay().unwrap();
824        assert_eq!(mutations.len(), 1);
825        assert!(mutations[0].clustering_key.is_some());
826    }
827
828    #[test]
829    fn test_wal_with_ttl() {
830        let temp_dir = TempDir::new().unwrap();
831        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
832
833        let table_id = TableId::new("test_ks", "test_table");
834        let pk = PartitionKey::single("id", Value::Integer(1));
835        let ops = vec![CellOperation::Write {
836            column: "value".to_string(),
837            value: Value::Text("test".to_string()),
838        }];
839
840        let mutation = Mutation::new(table_id, pk, None, ops, 1234567890, Some(3600));
841        wal.append(&mutation).unwrap();
842        wal.sync().unwrap();
843
844        let mutations = wal.replay().unwrap();
845        assert_eq!(mutations.len(), 1);
846        assert_eq!(mutations[0].ttl_seconds, Some(3600));
847    }
848
849    #[test]
850    fn test_wal_delete_operation() {
851        let temp_dir = TempDir::new().unwrap();
852        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
853
854        let table_id = TableId::new("test_ks", "test_table");
855        let pk = PartitionKey::single("id", Value::Integer(1));
856        let ops = vec![CellOperation::Delete {
857            column: "name".to_string(),
858        }];
859
860        let mutation = Mutation::new(table_id, pk, None, ops, 1234567890, None);
861        wal.append(&mutation).unwrap();
862        wal.sync().unwrap();
863
864        let mutations = wal.replay().unwrap();
865        assert_eq!(mutations.len(), 1);
866        assert!(matches!(
867            &mutations[0].operations[0],
868            CellOperation::Delete { .. }
869        ));
870    }
871
872    #[test]
873    fn test_wal_delete_row_operation() {
874        let temp_dir = TempDir::new().unwrap();
875        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
876
877        let table_id = TableId::new("test_ks", "test_table");
878        let pk = PartitionKey::single("id", Value::Integer(1));
879        let ops = vec![CellOperation::DeleteRow];
880
881        let mutation = Mutation::new(table_id, pk, None, ops, 1234567890, None);
882        wal.append(&mutation).unwrap();
883        wal.sync().unwrap();
884
885        let mutations = wal.replay().unwrap();
886        assert_eq!(mutations.len(), 1);
887        assert!(matches!(
888            &mutations[0].operations[0],
889            CellOperation::DeleteRow
890        ));
891    }
892
893    #[test]
894    fn test_wal_buffer_size() {
895        let temp_dir = TempDir::new().unwrap();
896        let wal = WriteAheadLog::create_with_buffer_size(temp_dir.path(), 8192).unwrap();
897
898        assert_eq!(wal.buffer_size, 8192);
899    }
900
901    #[test]
902    fn test_wal_directory_sync_on_create() {
903        // Test that directory is synced after WAL creation
904        let temp_dir = TempDir::new().unwrap();
905        let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
906
907        // Verify WAL file exists
908        assert!(wal.path().exists());
909
910        // The sync operation should have completed without error
911        // (we can't directly test that fsync was called, but we verify no error)
912    }
913
914    #[test]
915    fn test_wal_directory_sync_on_rotate() {
916        // Test that directory is synced after WAL rotation
917        let temp_dir = TempDir::new().unwrap();
918        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
919
920        let mutation = create_test_mutation(1, "Alice");
921        wal.append(&mutation).unwrap();
922        wal.sync().unwrap();
923
924        // Rotate WAL
925        let new_wal = wal.rotate(temp_dir.path()).unwrap();
926
927        // Verify new WAL exists
928        assert!(new_wal.path().exists());
929
930        // Verify archived WAL exists
931        let archived_files: Vec<_> = std::fs::read_dir(temp_dir.path())
932            .unwrap()
933            .filter_map(|e| e.ok())
934            .filter(|e| {
935                e.file_name()
936                    .to_string_lossy()
937                    .starts_with("commitlog.wal.")
938            })
939            .collect();
940
941        assert_eq!(archived_files.len(), 1);
942    }
943
944    #[test]
945    fn test_wal_fsync_after_truncate() {
946        // Test that fsync is called after truncate
947        let temp_dir = TempDir::new().unwrap();
948        let mut wal = WriteAheadLog::create(temp_dir.path()).unwrap();
949
950        let mutation = create_test_mutation(1, "Alice");
951        wal.append(&mutation).unwrap();
952        wal.sync().unwrap();
953
954        let size_before = wal.size();
955        assert!(size_before > 0);
956
957        // Truncate should sync to disk
958        wal.truncate().unwrap();
959
960        assert_eq!(wal.size(), 0);
961
962        // Verify file is actually empty
963        let metadata = std::fs::metadata(wal.path()).unwrap();
964        assert_eq!(metadata.len(), 0);
965    }
966
967    #[test]
968    fn test_validate_wal_directory_nonexistent() {
969        // Test that validation fails for non-existent directory
970        let nonexistent = PathBuf::from("/nonexistent/path/that/does/not/exist");
971        let result = validate_wal_directory(&nonexistent);
972
973        assert!(result.is_err());
974        match result {
975            Err(Error::InvalidPath(_)) => {}
976            _ => panic!("Expected InvalidPath error"),
977        }
978    }
979
980    #[test]
981    fn test_validate_wal_directory_is_file() {
982        // Test that validation fails when path is a file, not a directory
983        let temp_dir = TempDir::new().unwrap();
984        let file_path = temp_dir.path().join("not_a_dir");
985        File::create(&file_path).unwrap();
986
987        let result = validate_wal_directory(&file_path);
988
989        assert!(result.is_err());
990        match result {
991            Err(Error::InvalidPath(_)) => {}
992            _ => panic!("Expected InvalidPath error"),
993        }
994    }
995
996    #[test]
997    fn test_validate_wal_directory_valid() {
998        // Test that validation succeeds for valid directory
999        let temp_dir = TempDir::new().unwrap();
1000        let result = validate_wal_directory(temp_dir.path());
1001
1002        assert!(result.is_ok());
1003        let canonical = result.unwrap();
1004        assert!(canonical.is_absolute());
1005    }
1006
1007    #[test]
1008    #[cfg(unix)]
1009    fn test_wal_file_permissions() {
1010        use std::os::unix::fs::PermissionsExt;
1011
1012        // Test that WAL files have secure permissions (0o600) on Unix
1013        let temp_dir = TempDir::new().unwrap();
1014        let wal = WriteAheadLog::create(temp_dir.path()).unwrap();
1015
1016        let metadata = std::fs::metadata(wal.path()).unwrap();
1017        let permissions = metadata.permissions();
1018        let mode = permissions.mode();
1019
1020        // Check that permissions are 0o600 (owner read/write only)
1021        // Mask with 0o777 to get only permission bits
1022        assert_eq!(mode & 0o777, 0o600);
1023    }
1024
1025    #[test]
1026    fn test_wal_create_validates_directory() {
1027        // Test that WAL creation validates the directory path
1028        let temp_dir = TempDir::new().unwrap();
1029
1030        // This should succeed because temp_dir exists
1031        let result = WriteAheadLog::create(temp_dir.path());
1032        assert!(result.is_ok());
1033
1034        // This should fail because the directory doesn't exist
1035        let nonexistent = temp_dir.path().join("nonexistent");
1036        let result = WriteAheadLog::create(&nonexistent);
1037        assert!(result.is_err());
1038    }
1039
1040    #[test]
1041    fn test_sync_directory_invalid_path() {
1042        // Test that sync_directory fails for invalid paths
1043        let invalid_path = PathBuf::from("/nonexistent/path");
1044        let result = sync_directory(&invalid_path);
1045
1046        assert!(result.is_err());
1047        match result {
1048            Err(Error::Storage(_)) => {}
1049            _ => panic!("Expected Storage error"),
1050        }
1051    }
1052}