Skip to main content

luci/storage/
single_file.rs

1use std::fs::{File, OpenOptions};
2#[cfg(unix)]
3use std::os::unix::fs::FileExt;
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::core::{FieldId, LuciError, Result, SegmentId};
8
9use crate::storage::allocator::BlockAllocator;
10use crate::storage::block::{BLOCK_SIZE, Extent, HEADER_SIZE};
11use crate::storage::directory::{MetadataSnapshot, SegmentEntry, VectorIndexEntry};
12use crate::storage::header::{FileHeader, RootPointer, xxh3_checksum};
13
14/// Production storage backend: all data in a single `.luci` file using the
15/// block allocator and two-root-pointer atomic commit.
16///
17/// See [[architecture-storage-format]] for the full design and [[architecture-storage-format#Atomic Commit Protocol]]
18/// for the commit sequence.
19///
20/// # Crash Safety
21///
22/// At any point, both root pointers reference intact (non-overwritten) metadata
23/// blocks. A crash mid-commit is recovered by falling back to the still-valid
24/// root. Uncommitted segment writes are orphaned blocks that the allocator does
25/// not reference — they are harmlessly reclaimed on the next file open.
26#[cfg(unix)]
27pub struct SingleFileDirectory {
28    file: Arc<File>,
29    lock: crate::storage::lock::FileLock,
30    header: FileHeader,
31    allocator: BlockAllocator,
32    /// Last committed metadata — the source of truth for reads.
33    committed: MetadataSnapshot,
34    /// Segments written but not yet committed.
35    pending_segments: Vec<SegmentEntry>,
36    /// Segments to remove on next commit.
37    pending_removals: Vec<SegmentId>,
38    /// Vector indexes written but not yet committed. Replaces any
39    /// already-committed entry for the same `FieldId` on commit. See
40    /// [[global-vector-indices]].
41    pending_vector_indexes: Vec<VectorIndexEntry>,
42    /// Vector indexes to remove on next commit.
43    pending_vector_index_removals: Vec<FieldId>,
44    /// Monotonically increasing commit counter.
45    generation: u64,
46    /// Whether user metadata has been modified since last commit.
47    metadata_dirty: bool,
48    /// Timeout for acquiring the cross-process write lock. Default: 5 seconds.
49    write_timeout: std::time::Duration,
50}
51
52#[cfg(unix)]
53impl SingleFileDirectory {
54    /// Create a new `.luci` file at the given path.
55    ///
56    /// Fails if a file already exists at `path`.
57    pub fn create(path: impl AsRef<Path>) -> Result<Self> {
58        let file = OpenOptions::new()
59            .read(true)
60            .write(true)
61            .create_new(true)
62            .open(path.as_ref())?;
63
64        // Acquire SHARED lock for cross-process coordination.
65        let mut lock = crate::storage::lock::FileLock::new(&file);
66        lock.lock_shared()?;
67
68        let file = Arc::new(file);
69        let header = FileHeader::new();
70        file.write_all_at(&header.to_bytes(), 0)?;
71        file.sync_all()?;
72
73        Ok(Self {
74            file,
75            lock,
76            header,
77            allocator: BlockAllocator::new(),
78            committed: MetadataSnapshot::empty(),
79            pending_segments: Vec::new(),
80            pending_removals: Vec::new(),
81            pending_vector_indexes: Vec::new(),
82            pending_vector_index_removals: Vec::new(),
83            generation: 0,
84            metadata_dirty: false,
85            write_timeout: std::time::Duration::from_secs(5),
86        })
87    }
88
89    /// Open an existing `.luci` file, performing crash recovery.
90    ///
91    /// Validates root pointers, loads metadata from the best available root,
92    /// and reconstructs the block allocator. If the active root's checksum is
93    /// invalid (torn write), falls back to the inactive root and repairs the
94    /// header.
95    ///
96    /// See [[architecture-storage-format#Crash Recovery]].
97    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
98        let file = OpenOptions::new()
99            .read(true)
100            .write(true)
101            .open(path.as_ref())?;
102
103        // Acquire SHARED lock for cross-process coordination.
104        let mut lock = crate::storage::lock::FileLock::new(&file);
105        lock.lock_shared()?;
106
107        let file = Arc::new(file);
108
109        // Read header.
110        let mut header_buf = [0u8; HEADER_SIZE as usize];
111        file.read_exact_at(&mut header_buf, 0)?;
112        let mut header = FileHeader::from_bytes(&header_buf)?;
113
114        // Load metadata with fallback.
115        let (committed, used_inactive) = load_metadata(&file, &header)?;
116
117        // If we fell back to the inactive root, repair the header so the
118        // valid root is now marked active.
119        if used_inactive {
120            header.active_root = header.active_root.inactive();
121            file.write_all_at(&header.to_bytes(), 0)?;
122            file.sync_all()?;
123        }
124
125        // Reconstruct allocator from committed state.
126        let allocator =
127            BlockAllocator::from_state(committed.total_blocks, committed.free_list.clone());
128
129        let generation = committed
130            .segments
131            .iter()
132            .map(|s| s.generation)
133            .max()
134            .unwrap_or(0);
135
136        Ok(Self {
137            file,
138            lock,
139            header,
140            allocator,
141            committed,
142            pending_segments: Vec::new(),
143            pending_removals: Vec::new(),
144            pending_vector_indexes: Vec::new(),
145            pending_vector_index_removals: Vec::new(),
146            generation,
147            metadata_dirty: false,
148            write_timeout: std::time::Duration::from_secs(5),
149        })
150    }
151
152    /// Return a shared reference to the underlying file handle.
153    ///
154    /// Used by the reader path to avoid opening a separate fd,
155    /// which would break `fcntl` lock semantics (closing any fd
156    /// releases all locks for the process on that inode).
157    ///
158    /// See [[architecture-cross-process-locking#Critical Constraint]].
159    pub fn file_handle(&self) -> Arc<File> {
160        self.file.clone()
161    }
162
163    /// Set the timeout for acquiring the cross-process write lock.
164    ///
165    /// Default: 5 seconds. If another process holds the write lock,
166    /// retries with exponential backoff until the timeout expires,
167    /// then returns `WriterLocked`.
168    pub fn set_write_timeout(&mut self, timeout: std::time::Duration) {
169        self.write_timeout = timeout;
170    }
171
172    /// Open a read-only view using an existing file handle.
173    ///
174    /// Does not open a new fd or acquire any locks. The caller's
175    /// process already holds the appropriate lock via the writer's fd.
176    /// Used by `refresh_reader()` to reload segment data without
177    /// breaking `fcntl` lock semantics.
178    pub fn open_from_handle(file: Arc<File>) -> Result<Self> {
179        let mut header_buf = [0u8; HEADER_SIZE as usize];
180        file.read_exact_at(&mut header_buf, 0)?;
181        let header = FileHeader::from_bytes(&header_buf)?;
182
183        let (committed, _used_inactive) = load_metadata(&file, &header)?;
184        // Don't repair header — this is a read-only snapshot.
185
186        let allocator =
187            BlockAllocator::from_state(committed.total_blocks, committed.free_list.clone());
188
189        let generation = committed
190            .segments
191            .iter()
192            .map(|s| s.generation)
193            .max()
194            .unwrap_or(0);
195
196        // Create a no-op lock (UNLOCKED) — locking is managed by the
197        // writer's fd, not this reader view.
198        let lock = crate::storage::lock::FileLock::new(&file);
199
200        Ok(Self {
201            file,
202            lock,
203            header,
204            allocator,
205            committed,
206            pending_segments: Vec::new(),
207            pending_removals: Vec::new(),
208            pending_vector_indexes: Vec::new(),
209            pending_vector_index_removals: Vec::new(),
210            generation,
211            metadata_dirty: false,
212            write_timeout: std::time::Duration::from_secs(5),
213        })
214    }
215
216    /// Acquire RESERVED lock and refresh allocator from disk.
217    ///
218    /// Called automatically before the first write in a session. Ensures
219    /// the block allocator reflects the latest committed state on disk
220    /// (another process may have committed since we opened).
221    ///
222    /// This follows SQLite's pattern: acquire RESERVED at first write,
223    /// re-read metadata to get an authoritative freelist, then proceed.
224    /// RESERVED guarantees no other process is writing, so the on-disk
225    /// metadata is stable.
226    ///
227    /// See [[architecture-cross-process-locking]].
228    fn begin_write(&mut self) -> Result<()> {
229        if self.lock.level() >= crate::storage::lock::LockLevel::Reserved {
230            return Ok(()); // Already in write mode
231        }
232
233        self.lock.lock_reserved(self.write_timeout)?;
234
235        // Re-read committed metadata from disk. Another process may have
236        // committed since we opened, changing the freelist and segments.
237        let mut header_buf = [0u8; HEADER_SIZE as usize];
238        self.file.read_exact_at(&mut header_buf, 0)?;
239        self.header = FileHeader::from_bytes(&header_buf)?;
240
241        let (committed, _used_inactive) = load_metadata(&self.file, &self.header)?;
242        self.allocator =
243            BlockAllocator::from_state(committed.total_blocks, committed.free_list.clone());
244        self.generation = committed
245            .segments
246            .iter()
247            .map(|s| s.generation)
248            .max()
249            .unwrap_or(0);
250        self.committed = committed;
251
252        Ok(())
253    }
254
255    /// Write segment data to allocated blocks.
256    ///
257    /// The segment is not visible to readers until [`commit`](Self::commit)
258    /// is called. Data is written to disk immediately so that `commit` only
259    /// needs to write metadata and flip the header.
260    ///
261    /// Automatically acquires RESERVED lock on first write (blocks other
262    /// writers). The lock is held until `commit()` completes.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if `data` is empty or the write fails.
267    /// Returns `WriterLocked` if another process holds RESERVED.
268    pub fn write_segment(&mut self, segment_id: SegmentId, data: &[u8]) -> Result<()> {
269        if data.is_empty() {
270            return Err(LuciError::InvalidQuery("cannot write empty segment".into()));
271        }
272
273        self.begin_write()?;
274
275        let blocks_needed =
276            ((data.len() as u64 + BLOCK_SIZE as u64 - 1) / BLOCK_SIZE as u64) as u32;
277        let extent = self.allocator.allocate(blocks_needed)?;
278
279        self.file.write_all_at(data, extent.start.byte_offset())?;
280
281        self.pending_segments.push(SegmentEntry::new(
282            segment_id,
283            extent,
284            self.generation + 1,
285            data.len() as u64,
286        ));
287
288        Ok(())
289    }
290
291    /// Read committed segment data by segment ID.
292    ///
293    /// Only committed segments are visible. Returns `LuciError::IndexNotFound`
294    /// if the segment does not exist in the committed state.
295    pub fn read_segment(&self, segment_id: SegmentId) -> Result<Vec<u8>> {
296        let entry = self
297            .committed
298            .segments
299            .iter()
300            .find(|e| e.segment_id == segment_id)
301            .ok_or_else(|| LuciError::IndexNotFound(format!("segment {segment_id}")))?;
302
303        let mut buf = vec![0u8; entry.data_len as usize];
304        self.file
305            .read_exact_at(&mut buf, entry.extent.start.byte_offset())?;
306        Ok(buf)
307    }
308
309    /// Atomically commit all pending segment writes.
310    ///
311    /// Implements the six-step atomic commit protocol from [[architecture-storage-format]]:
312    ///
313    /// 1. Segment data already written to blocks (by `write_segment`)
314    /// 2. Serialize new metadata to a freshly allocated block (copy-on-write)
315    /// 3. Compute checksum, update the inactive root pointer
316    /// 4. `fsync` the data file
317    /// 5. Write the 4 KB header with the flipped active root flag
318    /// 6. `fsync` the header
319    ///
320    /// The old inactive root's metadata block is freed and included in the new
321    /// free list — but never overwritten until after this commit succeeds, so
322    /// both roots remain valid at all times for crash recovery.
323    pub fn commit(&mut self) -> Result<()> {
324        if self.pending_segments.is_empty()
325            && self.pending_removals.is_empty()
326            && self.pending_vector_indexes.is_empty()
327            && self.pending_vector_index_removals.is_empty()
328            && !self.metadata_dirty
329        {
330            return Ok(());
331        }
332
333        // Preserve user_metadata (mapping + deletion bitmap) staged since the
334        // last refresh. `begin_write` below re-reads committed state from disk
335        // to pick up another process's commits, which would otherwise drop the
336        // deletions staged via `set_user_metadata` for a metadata-only commit.
337        let staged_user_metadata = self.committed.user_metadata.clone();
338
339        // Acquire RESERVED and refresh allocator + header + committed from disk
340        // BEFORE allocating the metadata block or flipping the header. On the
341        // normal (write_segment) path this early-returns. On a metadata-only
342        // commit (a deletion with no buffered docs) this is the only
343        // `begin_write`, and it MUST run first: `begin_write` re-reads the
344        // header, so running it after the header flip below would discard the
345        // flip and allocate the meta block from a stale allocator — silently
346        // reverting the commit on reopen. See
347        // luci-index/tests/deletion_persistence.rs.
348        self.begin_write()?;
349        self.committed.user_metadata = staged_user_metadata;
350
351        self.metadata_dirty = false;
352
353        self.generation += 1;
354
355        // Free the old inactive root's metadata block. This is safe because
356        // the active root is our fallback — we never touch its block.
357        let old_inactive_meta = self.header.inactive_root_pointer().block_id;
358        if let Some(block_id) = old_inactive_meta {
359            self.allocator.free(Extent::new(block_id, 1));
360        }
361
362        // Allocate a fresh block for the new metadata.
363        let meta_extent = self.allocator.allocate(1)?;
364        let meta_block = meta_extent.start;
365
366        // Build the new snapshot: committed + pending - removals.
367        let mut segments = self.committed.segments.clone();
368        segments.extend(self.pending_segments.drain(..));
369
370        // Remove merged source segments and free their blocks.
371        if !self.pending_removals.is_empty() {
372            segments.retain(|entry| {
373                if self.pending_removals.contains(&entry.segment_id) {
374                    self.allocator.free(entry.extent);
375                    false
376                } else {
377                    true
378                }
379            });
380            self.pending_removals.clear();
381        }
382
383        // Apply vector-index writes + removals. A new write for a
384        // field that already has a committed extent replaces it: free
385        // the old extent, drop the old entry, then push the new entry.
386        // The new entry's blocks were already allocated and written in
387        // `write_vector_index` (the same write-then-flip-metadata
388        // pattern segments use).
389        let mut vector_indexes = self.committed.vector_indexes.clone();
390        if !self.pending_vector_index_removals.is_empty() {
391            vector_indexes.retain(|entry| {
392                if self.pending_vector_index_removals.contains(&entry.field_id) {
393                    self.allocator.free(entry.extent);
394                    false
395                } else {
396                    true
397                }
398            });
399            self.pending_vector_index_removals.clear();
400        }
401        for pending in self.pending_vector_indexes.drain(..) {
402            if let Some(pos) = vector_indexes
403                .iter()
404                .position(|e| e.field_id == pending.field_id)
405            {
406                self.allocator.free(vector_indexes[pos].extent);
407                vector_indexes.remove(pos);
408            }
409            vector_indexes.push(pending);
410        }
411
412        let snapshot = MetadataSnapshot {
413            segments,
414            vector_indexes,
415            total_blocks: self.allocator.total_blocks(),
416            free_list: self.allocator.free_list().to_vec(),
417            user_metadata: self.committed.user_metadata.clone(),
418        };
419
420        assert!(
421            snapshot.fits_in_single_block(),
422            "metadata overflow chaining not yet implemented"
423        );
424
425        // Serialize metadata into a full block (zero-padded for checksum).
426        let meta_bytes = snapshot.to_bytes();
427        let mut block_buf = vec![0u8; BLOCK_SIZE as usize];
428        block_buf[..meta_bytes.len()].copy_from_slice(&meta_bytes);
429
430        // Step 2: write metadata block.
431        self.file
432            .write_all_at(&block_buf, meta_block.byte_offset())?;
433
434        // Step 3: compute checksum, update header.
435        let checksum = xxh3_checksum(&block_buf);
436        self.header.commit(meta_block, checksum);
437
438        // fsync data (potentially slow — readers not blocked, we're in RESERVED).
439        self.file.sync_all()?;
440
441        // Escalate to EXCLUSIVE (brief — blocks readers for header flip only).
442        self.lock.lock_exclusive()?;
443
444        // Write header with flipped active root.
445        self.file.write_all_at(&self.header.to_bytes(), 0)?;
446
447        // fsync header (brief).
448        self.file.sync_all()?;
449
450        // Downgrade to SHARED (readers unblocked, other writers can proceed).
451        self.lock.downgrade_to_shared()?;
452
453        // Update in-memory committed state.
454        self.committed = snapshot;
455
456        Ok(())
457    }
458
459    /// The currently committed segment entries.
460    pub fn segments(&self) -> &[SegmentEntry] {
461        &self.committed.segments
462    }
463
464    /// The current commit generation.
465    pub fn generation(&self) -> u64 {
466        self.generation
467    }
468
469    /// Set opaque user metadata to be persisted on the next commit.
470    pub fn set_user_metadata(&mut self, metadata: Vec<u8>) {
471        self.committed.user_metadata = metadata;
472        self.metadata_dirty = true;
473    }
474
475    /// Get the persisted user metadata (empty if none).
476    pub fn user_metadata(&self) -> &[u8] {
477        &self.committed.user_metadata
478    }
479
480    /// Total number of free (reusable) blocks tracked by the allocator.
481    ///
482    /// Exposed for integration testing (free-list reclamation validation).
483    pub fn free_block_count(&self) -> u64 {
484        self.allocator.free_block_count()
485    }
486
487    /// Total number of data blocks the file spans.
488    ///
489    /// Exposed for integration testing.
490    pub fn total_blocks(&self) -> u64 {
491        self.allocator.total_blocks()
492    }
493
494    /// Mark segments for removal on the next commit.
495    pub fn remove_segments(&mut self, segment_ids: &[SegmentId]) {
496        self.pending_removals.extend_from_slice(segment_ids);
497    }
498
499    /// Write a per-field vector index. The bytes are written to a fresh
500    /// extent immediately; the entry becomes visible to readers on the
501    /// next `commit()`. If the field already had a committed entry, its
502    /// old extent is freed during commit.
503    pub fn write_vector_index(&mut self, field_id: FieldId, data: &[u8]) -> Result<()> {
504        if data.is_empty() {
505            return Err(LuciError::InvalidQuery(
506                "cannot write empty vector index".into(),
507            ));
508        }
509
510        self.begin_write()?;
511
512        let blocks_needed =
513            ((data.len() as u64 + BLOCK_SIZE as u64 - 1) / BLOCK_SIZE as u64) as u32;
514        let extent = self.allocator.allocate(blocks_needed)?;
515
516        self.file.write_all_at(data, extent.start.byte_offset())?;
517
518        // Replace any earlier pending write for the same field so the
519        // commit-phase replace logic sees only the latest version.
520        self.pending_vector_indexes
521            .retain(|e| e.field_id != field_id);
522        self.pending_vector_indexes.push(VectorIndexEntry::new(
523            field_id,
524            extent,
525            data.len() as u64,
526        ));
527
528        Ok(())
529    }
530
531    /// Read committed vector-index bytes for `field_id`. Returns `None`
532    /// if no committed index exists for that field.
533    pub fn read_vector_index(&self, field_id: FieldId) -> Result<Option<Vec<u8>>> {
534        let entry = match self
535            .committed
536            .vector_indexes
537            .iter()
538            .find(|e| e.field_id == field_id)
539        {
540            Some(e) => e,
541            None => return Ok(None),
542        };
543
544        let mut buf = vec![0u8; entry.data_len as usize];
545        self.file
546            .read_exact_at(&mut buf, entry.extent.start.byte_offset())?;
547        Ok(Some(buf))
548    }
549
550    /// List the fields that have a committed vector index.
551    pub fn vector_index_fields(&self) -> Vec<FieldId> {
552        self.committed
553            .vector_indexes
554            .iter()
555            .map(|e| e.field_id)
556            .collect()
557    }
558
559    /// Mark the vector index for `field_id` for removal on next commit.
560    pub fn remove_vector_index(&mut self, field_id: FieldId) {
561        self.pending_vector_index_removals.push(field_id);
562    }
563}
564
565#[cfg(unix)]
566impl crate::storage::Storage for SingleFileDirectory {
567    fn write_segment(&mut self, segment_id: SegmentId, data: &[u8]) -> Result<()> {
568        self.write_segment(segment_id, data)
569    }
570    fn read_segment(&self, segment_id: SegmentId) -> Result<Vec<u8>> {
571        self.read_segment(segment_id)
572    }
573    fn commit(&mut self) -> Result<()> {
574        self.commit()
575    }
576    fn segments(&self) -> &[SegmentEntry] {
577        self.segments()
578    }
579    fn generation(&self) -> u64 {
580        self.generation()
581    }
582    fn set_user_metadata(&mut self, metadata: Vec<u8>) {
583        self.set_user_metadata(metadata)
584    }
585    fn user_metadata(&self) -> &[u8] {
586        self.user_metadata()
587    }
588    fn remove_segments(&mut self, segment_ids: &[SegmentId]) {
589        self.remove_segments(segment_ids)
590    }
591    fn write_vector_index(&mut self, field_id: FieldId, data: &[u8]) -> Result<()> {
592        self.write_vector_index(field_id, data)
593    }
594    fn read_vector_index(&self, field_id: FieldId) -> Result<Option<Vec<u8>>> {
595        self.read_vector_index(field_id)
596    }
597    fn vector_index_fields(&self) -> Vec<FieldId> {
598        self.vector_index_fields()
599    }
600    fn remove_vector_index(&mut self, field_id: FieldId) {
601        self.remove_vector_index(field_id)
602    }
603    fn set_write_timeout(&mut self, timeout: std::time::Duration) {
604        self.set_write_timeout(timeout)
605    }
606}
607
608/// Load metadata from the best available root pointer.
609///
610/// Returns `(snapshot, used_inactive)` where `used_inactive` is true if the
611/// active root was invalid and we fell back to the inactive root.
612#[cfg(unix)]
613fn load_metadata(file: &File, header: &FileHeader) -> Result<(MetadataSnapshot, bool)> {
614    // Try active root.
615    if let Some(snap) = try_load_root(file, header.active_root_pointer())? {
616        return Ok((snap, false));
617    }
618
619    // Try inactive root.
620    if let Some(snap) = try_load_root(file, header.inactive_root_pointer())? {
621        return Ok((snap, true));
622    }
623
624    // Both roots empty → fresh (never committed) file.
625    if !header.active_root_pointer().is_populated()
626        && !header.inactive_root_pointer().is_populated()
627    {
628        return Ok((MetadataSnapshot::empty(), false));
629    }
630
631    // At least one root was populated but both failed validation.
632    Err(LuciError::IndexCorrupted(
633        "both root pointers failed checksum validation".into(),
634    ))
635}
636
637/// Try to load metadata from a single root pointer.
638///
639/// Returns `None` if the root is empty or its checksum doesn't match.
640#[cfg(unix)]
641fn try_load_root(file: &File, root: &RootPointer) -> Result<Option<MetadataSnapshot>> {
642    let block_id = match root.block_id {
643        Some(id) => id,
644        None => return Ok(None),
645    };
646
647    let mut block_buf = vec![0u8; BLOCK_SIZE as usize];
648    file.read_exact_at(&mut block_buf, block_id.byte_offset())?;
649
650    let computed = xxh3_checksum(&block_buf);
651    if computed != root.checksum {
652        return Ok(None);
653    }
654
655    let snap = MetadataSnapshot::from_bytes(&block_buf)?;
656    Ok(Some(snap))
657}
658
659#[cfg(test)]
660#[cfg(unix)]
661mod tests {
662    use super::*;
663    use crate::storage::header::FORMAT_VERSION;
664    use std::fs;
665
666    /// Create a temp directory for test files. Returns the path.
667    fn test_dir() -> std::path::PathBuf {
668        let dir = std::env::temp_dir().join(format!("luci_test_{}", std::process::id()));
669        fs::create_dir_all(&dir).unwrap();
670        dir
671    }
672
673    fn test_path(name: &str) -> std::path::PathBuf {
674        test_dir().join(name)
675    }
676
677    #[test]
678    fn create_new_file() {
679        let path = test_path("create_new.luci");
680        let _ = fs::remove_file(&path);
681
682        let dir = SingleFileDirectory::create(&path).unwrap();
683        assert!(dir.segments().is_empty());
684        assert_eq!(dir.generation(), 0);
685
686        // File should exist with at least the header.
687        let meta = fs::metadata(&path).unwrap();
688        assert!(meta.len() >= HEADER_SIZE as u64);
689
690        fs::remove_file(&path).unwrap();
691    }
692
693    #[test]
694    fn create_fails_if_exists() {
695        let path = test_path("create_exists.luci");
696        let _ = fs::remove_file(&path);
697
698        let _dir = SingleFileDirectory::create(&path).unwrap();
699        let err = SingleFileDirectory::create(&path);
700        assert!(err.is_err());
701
702        fs::remove_file(&path).unwrap();
703    }
704
705    #[test]
706    fn write_commit_read() {
707        let path = test_path("write_commit_read.luci");
708        let _ = fs::remove_file(&path);
709
710        let mut dir = SingleFileDirectory::create(&path).unwrap();
711
712        let data = b"hello luci segment data!";
713        dir.write_segment(SegmentId::new(1), data).unwrap();
714        dir.commit().unwrap();
715
716        let read_back = dir.read_segment(SegmentId::new(1)).unwrap();
717        assert_eq!(read_back, data);
718        assert_eq!(dir.segments().len(), 1);
719        assert_eq!(dir.generation(), 1);
720
721        fs::remove_file(&path).unwrap();
722    }
723
724    #[test]
725    fn multiple_segments() {
726        let path = test_path("multiple_segments.luci");
727        let _ = fs::remove_file(&path);
728
729        let mut dir = SingleFileDirectory::create(&path).unwrap();
730
731        dir.write_segment(SegmentId::new(1), b"segment-one")
732            .unwrap();
733        dir.write_segment(SegmentId::new(2), b"segment-two-longer")
734            .unwrap();
735        dir.commit().unwrap();
736
737        assert_eq!(dir.segments().len(), 2);
738        assert_eq!(dir.read_segment(SegmentId::new(1)).unwrap(), b"segment-one");
739        assert_eq!(
740            dir.read_segment(SegmentId::new(2)).unwrap(),
741            b"segment-two-longer"
742        );
743
744        fs::remove_file(&path).unwrap();
745    }
746
747    #[test]
748    fn reopen_after_commit() {
749        let path = test_path("reopen.luci");
750        let _ = fs::remove_file(&path);
751
752        // Create and write.
753        {
754            let mut dir = SingleFileDirectory::create(&path).unwrap();
755            dir.write_segment(SegmentId::new(1), b"persistent data")
756                .unwrap();
757            dir.commit().unwrap();
758        }
759
760        // Reopen and read.
761        {
762            let dir = SingleFileDirectory::open(&path).unwrap();
763            assert_eq!(dir.segments().len(), 1);
764            assert_eq!(
765                dir.read_segment(SegmentId::new(1)).unwrap(),
766                b"persistent data"
767            );
768            assert_eq!(dir.generation(), 1);
769        }
770
771        fs::remove_file(&path).unwrap();
772    }
773
774    #[test]
775    fn uncommitted_writes_not_visible_after_reopen() {
776        let path = test_path("uncommitted.luci");
777        let _ = fs::remove_file(&path);
778
779        {
780            let mut dir = SingleFileDirectory::create(&path).unwrap();
781            dir.write_segment(SegmentId::new(1), b"committed").unwrap();
782            dir.commit().unwrap();
783
784            // Write but don't commit.
785            dir.write_segment(SegmentId::new(2), b"uncommitted")
786                .unwrap();
787        }
788
789        {
790            let dir = SingleFileDirectory::open(&path).unwrap();
791            assert_eq!(dir.segments().len(), 1);
792            assert_eq!(dir.read_segment(SegmentId::new(1)).unwrap(), b"committed");
793            assert!(dir.read_segment(SegmentId::new(2)).is_err());
794        }
795
796        fs::remove_file(&path).unwrap();
797    }
798
799    #[test]
800    fn multiple_commits() {
801        let path = test_path("multi_commit.luci");
802        let _ = fs::remove_file(&path);
803
804        let mut dir = SingleFileDirectory::create(&path).unwrap();
805
806        dir.write_segment(SegmentId::new(1), b"first").unwrap();
807        dir.commit().unwrap();
808        assert_eq!(dir.generation(), 1);
809
810        dir.write_segment(SegmentId::new(2), b"second").unwrap();
811        dir.commit().unwrap();
812        assert_eq!(dir.generation(), 2);
813
814        dir.write_segment(SegmentId::new(3), b"third").unwrap();
815        dir.commit().unwrap();
816        assert_eq!(dir.generation(), 3);
817
818        assert_eq!(dir.segments().len(), 3);
819        assert_eq!(dir.read_segment(SegmentId::new(1)).unwrap(), b"first");
820        assert_eq!(dir.read_segment(SegmentId::new(2)).unwrap(), b"second");
821        assert_eq!(dir.read_segment(SegmentId::new(3)).unwrap(), b"third");
822
823        fs::remove_file(&path).unwrap();
824    }
825
826    #[test]
827    fn reopen_after_multiple_commits() {
828        let path = test_path("reopen_multi.luci");
829        let _ = fs::remove_file(&path);
830
831        {
832            let mut dir = SingleFileDirectory::create(&path).unwrap();
833            dir.write_segment(SegmentId::new(1), b"aaa").unwrap();
834            dir.commit().unwrap();
835            dir.write_segment(SegmentId::new(2), b"bbb").unwrap();
836            dir.commit().unwrap();
837            dir.write_segment(SegmentId::new(3), b"ccc").unwrap();
838            dir.commit().unwrap();
839        }
840
841        {
842            let dir = SingleFileDirectory::open(&path).unwrap();
843            assert_eq!(dir.segments().len(), 3);
844            assert_eq!(dir.read_segment(SegmentId::new(1)).unwrap(), b"aaa");
845            assert_eq!(dir.read_segment(SegmentId::new(2)).unwrap(), b"bbb");
846            assert_eq!(dir.read_segment(SegmentId::new(3)).unwrap(), b"ccc");
847            assert_eq!(dir.generation(), 3);
848        }
849
850        fs::remove_file(&path).unwrap();
851    }
852
853    #[test]
854    fn large_segment_spanning_multiple_blocks() {
855        let path = test_path("large_segment.luci");
856        let _ = fs::remove_file(&path);
857
858        let mut dir = SingleFileDirectory::create(&path).unwrap();
859
860        // Write data larger than one block (256 KB).
861        let data = vec![0xABu8; BLOCK_SIZE as usize * 3 + 1000];
862        dir.write_segment(SegmentId::new(1), &data).unwrap();
863        dir.commit().unwrap();
864
865        let read_back = dir.read_segment(SegmentId::new(1)).unwrap();
866        assert_eq!(read_back, data);
867
868        // Verify it allocated 4 blocks (3 full + 1 partial).
869        assert_eq!(dir.segments()[0].extent.count, 4);
870
871        fs::remove_file(&path).unwrap();
872    }
873
874    #[test]
875    fn read_nonexistent_segment_is_error() {
876        let path = test_path("read_nonexist.luci");
877        let _ = fs::remove_file(&path);
878
879        let dir = SingleFileDirectory::create(&path).unwrap();
880        let err = dir.read_segment(SegmentId::new(999));
881        assert!(err.is_err());
882
883        fs::remove_file(&path).unwrap();
884    }
885
886    #[test]
887    fn empty_commit_is_noop() {
888        let path = test_path("empty_commit.luci");
889        let _ = fs::remove_file(&path);
890
891        let mut dir = SingleFileDirectory::create(&path).unwrap();
892        // Committing with no pending segments should be a no-op.
893        dir.commit().unwrap();
894        assert_eq!(dir.generation(), 0);
895        assert!(dir.segments().is_empty());
896
897        fs::remove_file(&path).unwrap();
898    }
899
900    #[test]
901    fn simulated_torn_header_falls_back() {
902        let path = test_path("torn_header.luci");
903        let _ = fs::remove_file(&path);
904
905        // Create file with two commits so both roots are populated.
906        {
907            let mut dir = SingleFileDirectory::create(&path).unwrap();
908            dir.write_segment(SegmentId::new(1), b"first-commit")
909                .unwrap();
910            dir.commit().unwrap();
911            dir.write_segment(SegmentId::new(2), b"second-commit")
912                .unwrap();
913            dir.commit().unwrap();
914        }
915
916        // Corrupt the active root's checksum in the header to simulate a
917        // torn header write.
918        {
919            let file = OpenOptions::new()
920                .read(true)
921                .write(true)
922                .open(&path)
923                .unwrap();
924            let mut header_buf = [0u8; HEADER_SIZE as usize];
925            file.read_exact_at(&mut header_buf, 0).unwrap();
926            let header = FileHeader::from_bytes(&header_buf).unwrap();
927
928            // Corrupt the active root's checksum.
929            let checksum_offset = match header.active_root {
930                crate::storage::ActiveRoot::A => 24usize, // OFF_ROOT_A_CHECKSUM
931                crate::storage::ActiveRoot::B => 40usize, // OFF_ROOT_B_CHECKSUM
932            };
933            header_buf[checksum_offset] ^= 0xFF;
934            file.write_all_at(&header_buf, 0).unwrap();
935            file.sync_all().unwrap();
936        }
937
938        // Open should recover using the inactive root (first commit).
939        {
940            let dir = SingleFileDirectory::open(&path).unwrap();
941            assert_eq!(dir.segments().len(), 1);
942            assert_eq!(
943                dir.read_segment(SegmentId::new(1)).unwrap(),
944                b"first-commit"
945            );
946            // Second commit's segment should not be visible.
947            assert!(dir.read_segment(SegmentId::new(2)).is_err());
948        }
949
950        fs::remove_file(&path).unwrap();
951    }
952
953    /// Test 11: a v2 file committed by a new binary is re-stamped to the current
954    /// `FORMAT_VERSION` on ANY write — here a metadata-only commit — so an old
955    /// binary then rejects it loudly rather than silently misreading a v3
956    /// `KeywordBlocked` column. Guards the [[code-must-not-lie]]
957    /// silent-`_id`-drop hole. The stamp survives only because `commit()` is
958    /// refresh-first. See [[optimization-keyword-dict-offset-index]].
959    #[test]
960    fn version_stamped_on_any_commit() {
961        let path = test_path("version_stamp.luci");
962        let _ = fs::remove_file(&path);
963
964        // Create a normal file, then overwrite its header with a fabricated v2
965        // one (mirroring create()'s own write_all_at) to simulate an old binary.
966        SingleFileDirectory::create(&path).unwrap();
967        {
968            let file = OpenOptions::new()
969                .read(true)
970                .write(true)
971                .open(&path)
972                .unwrap();
973            let v2 = FileHeader::with_format_version(2).to_bytes();
974            file.write_all_at(&v2, 0).unwrap();
975            file.sync_all().unwrap();
976        }
977        // Sanity: the on-disk version is now 2.
978        {
979            let file = OpenOptions::new().read(true).open(&path).unwrap();
980            let mut buf = [0u8; HEADER_SIZE as usize];
981            file.read_exact_at(&mut buf, 0).unwrap();
982            assert_eq!(FileHeader::from_bytes(&buf).unwrap().format_version, 2);
983        }
984
985        // The new binary opens the v2 file and does a metadata-only commit
986        // (set_user_metadata marks the directory dirty with no segments — the
987        // same shape as a bare deletion commit).
988        {
989            let mut dir = SingleFileDirectory::open(&path).unwrap();
990            dir.set_user_metadata(b"deletion-marker".to_vec());
991            dir.commit().unwrap();
992        }
993
994        // commit() re-stamped the header to the writing binary's FORMAT_VERSION.
995        // The complementary "old binary rejects a v3 file" half is covered by
996        // header.rs::future_version_is_rejected.
997        {
998            let file = OpenOptions::new().read(true).open(&path).unwrap();
999            let mut buf = [0u8; HEADER_SIZE as usize];
1000            file.read_exact_at(&mut buf, 0).unwrap();
1001            assert_eq!(
1002                FileHeader::from_bytes(&buf).unwrap().format_version,
1003                FORMAT_VERSION
1004            );
1005        }
1006
1007        fs::remove_file(&path).unwrap();
1008    }
1009}