lucisearch 0.8.0

Embeddable, in-process search engine — the SQLite/DuckDB of Elasticsearch
Documentation
//! Cross-process file locking integration tests.
//!
//! These tests use fork() to create actual child processes that compete
//! for locks on the same `.luci` file, verifying the five-state protocol
//! prevents corruption and provides the expected concurrency guarantees.
//!
//! See [[architecture-cross-process-locking]].

#[cfg(unix)]
mod tests {
    use std::fs;
    use std::path::PathBuf;

    use luci::core::SegmentId;
    use luci::storage::SingleFileDirectory;

    fn test_dir() -> PathBuf {
        let dir = std::env::temp_dir().join(format!("luci_lock_test_{}", std::process::id()));
        fs::create_dir_all(&dir).unwrap();
        dir
    }

    fn cleanup(path: &std::path::Path) {
        let _ = fs::remove_file(path);
    }

    /// Two processes can open the same index for reading simultaneously.
    #[test]
    fn concurrent_readers() {
        let path = test_dir().join("concurrent_readers.luci");
        let _ = fs::remove_file(&path);

        // Create and populate an index.
        {
            let mut dir = SingleFileDirectory::create(&path).unwrap();
            dir.write_segment(SegmentId::new(1), &[42u8; 1000]).unwrap();
            dir.commit().unwrap();
        }

        unsafe {
            let pid = libc::fork();
            if pid == 0 {
                // Child: open and read.
                let dir = SingleFileDirectory::open(&path).unwrap();
                let data = dir.read_segment(SegmentId::new(1)).unwrap();
                assert_eq!(data.len(), 1000);
                assert_eq!(data[0], 42);
                libc::_exit(0);
            } else if pid > 0 {
                // Parent: also open and read concurrently.
                let dir = SingleFileDirectory::open(&path).unwrap();
                let data = dir.read_segment(SegmentId::new(1)).unwrap();
                assert_eq!(data.len(), 1000);
                assert_eq!(data[0], 42);

                // Wait for child.
                let mut status: libc::c_int = 0;
                libc::waitpid(pid, &mut status, 0);
                assert!(
                    libc::WIFEXITED(status) && libc::WEXITSTATUS(status) == 0,
                    "child process failed"
                );
            } else {
                panic!("fork failed");
            }
        }

        cleanup(&path);
    }

    /// Second writer is blocked when first writer holds RESERVED.
    ///
    /// Process A writes (acquires RESERVED), process B tries to write
    /// and gets WriterLocked.
    #[test]
    fn second_writer_blocked() {
        let path = test_dir().join("second_writer.luci");
        let _ = fs::remove_file(&path);

        // Create index.
        {
            let mut dir = SingleFileDirectory::create(&path).unwrap();
            dir.write_segment(SegmentId::new(1), &[1u8; 500]).unwrap();
            dir.commit().unwrap();
        }

        unsafe {
            // Parent opens and starts writing (holds RESERVED).
            let mut parent_dir = SingleFileDirectory::open(&path).unwrap();
            parent_dir
                .write_segment(SegmentId::new(2), &[2u8; 500])
                .unwrap();
            // Parent now holds RESERVED — don't commit yet.

            let pid = libc::fork();
            if pid == 0 {
                // Child: try to write — should fail with WriterLocked.
                let mut child_dir = SingleFileDirectory::open(&path).unwrap();
                let result = child_dir.write_segment(SegmentId::new(3), &[3u8; 500]);
                if result.is_err() {
                    // Expected: WriterLocked
                    libc::_exit(0);
                } else {
                    // Unexpected: write succeeded — lock not working
                    libc::_exit(1);
                }
            } else if pid > 0 {
                // Wait for child.
                let mut status: libc::c_int = 0;
                libc::waitpid(pid, &mut status, 0);
                assert!(
                    libc::WIFEXITED(status) && libc::WEXITSTATUS(status) == 0,
                    "child should have gotten WriterLocked (exit code: {})",
                    libc::WEXITSTATUS(status)
                );

                // Now parent commits.
                parent_dir.commit().unwrap();
            } else {
                panic!("fork failed");
            }
        }

        // Verify only parent's write persisted.
        {
            let dir = SingleFileDirectory::open(&path).unwrap();
            assert_eq!(dir.segments().len(), 2); // seg 1 + seg 2
        }

        cleanup(&path);
    }

    /// After writer releases, the next writer can proceed.
    ///
    /// Process A writes and commits (releases RESERVED → SHARED).
    /// Process B then writes and commits successfully.
    #[test]
    fn sequential_writers() {
        let path = test_dir().join("sequential_writers.luci");
        let _ = fs::remove_file(&path);

        // Create index.
        {
            let dir = SingleFileDirectory::create(&path).unwrap();
            drop(dir);
        }

        // Process A writes.
        {
            let mut dir = SingleFileDirectory::open(&path).unwrap();
            dir.write_segment(SegmentId::new(1), &[1u8; 500]).unwrap();
            dir.commit().unwrap();
        }

        // Process B writes (simulated — same process, but the lock was
        // released on drop so this works).
        {
            let mut dir = SingleFileDirectory::open(&path).unwrap();
            dir.write_segment(SegmentId::new(2), &[2u8; 500]).unwrap();
            dir.commit().unwrap();
        }

        // Verify both segments exist.
        {
            let dir = SingleFileDirectory::open(&path).unwrap();
            assert_eq!(dir.segments().len(), 2);
        }

        cleanup(&path);
    }

    /// Writer refreshes allocator from disk before writing.
    ///
    /// Process A commits a segment. Process B (already open) writes a
    /// new segment — it should see A's committed state and not allocate
    /// overlapping blocks.
    #[test]
    fn allocator_refresh_on_write() {
        let path = test_dir().join("allocator_refresh.luci");
        let _ = fs::remove_file(&path);

        // Create and populate index.
        {
            let mut dir = SingleFileDirectory::create(&path).unwrap();
            dir.write_segment(SegmentId::new(1), &[1u8; 500]).unwrap();
            dir.commit().unwrap();
        }

        unsafe {
            let pid = libc::fork();
            if pid == 0 {
                // Child: open, write, commit a second segment.
                let mut dir = SingleFileDirectory::open(&path).unwrap();
                dir.write_segment(SegmentId::new(2), &[2u8; 500]).unwrap();
                dir.commit().unwrap();
                libc::_exit(0);
            } else if pid > 0 {
                // Wait for child to finish.
                let mut status: libc::c_int = 0;
                libc::waitpid(pid, &mut status, 0);
                assert!(
                    libc::WIFEXITED(status) && libc::WEXITSTATUS(status) == 0,
                    "child process failed"
                );

                // Parent: open and write a third segment. Must see both
                // seg 1 and seg 2, and not overlap with their blocks.
                let mut dir = SingleFileDirectory::open(&path).unwrap();
                assert_eq!(dir.segments().len(), 2);
                dir.write_segment(SegmentId::new(3), &[3u8; 500]).unwrap();
                dir.commit().unwrap();
            } else {
                panic!("fork failed");
            }
        }

        // Verify all three segments exist and are readable.
        {
            let dir = SingleFileDirectory::open(&path).unwrap();
            assert_eq!(dir.segments().len(), 3);
            let d1 = dir.read_segment(SegmentId::new(1)).unwrap();
            let d2 = dir.read_segment(SegmentId::new(2)).unwrap();
            let d3 = dir.read_segment(SegmentId::new(3)).unwrap();
            assert_eq!(d1[0], 1);
            assert_eq!(d2[0], 2);
            assert_eq!(d3[0], 3);
        }

        cleanup(&path);
    }

    /// Writer commits while no other process holds the file open.
    /// Then a fresh reader sees the committed data.
    ///
    /// (Note: a reader holding SHARED blocks the writer's EXCLUSIVE
    /// escalation during commit — this is correct behavior. The writer
    /// waits for the reader to release. Testing this without deadlock
    /// requires async coordination, deferred to a future test.)
    #[test]
    fn writer_then_reader_sees_data() {
        let path = test_dir().join("writer_then_reader.luci");
        let _ = fs::remove_file(&path);

        // Create with one segment.
        {
            let mut dir = SingleFileDirectory::create(&path).unwrap();
            dir.write_segment(SegmentId::new(1), &[1u8; 500]).unwrap();
            dir.commit().unwrap();
        }

        unsafe {
            let pid = libc::fork();
            if pid == 0 {
                // Child: write and commit a second segment.
                let mut dir = SingleFileDirectory::open(&path).unwrap();
                dir.write_segment(SegmentId::new(2), &[2u8; 500]).unwrap();
                dir.commit().unwrap();
                libc::_exit(0);
            } else if pid > 0 {
                // Wait for child to finish writing.
                let mut status: libc::c_int = 0;
                libc::waitpid(pid, &mut status, 0);
                assert!(
                    libc::WIFEXITED(status) && libc::WEXITSTATUS(status) == 0,
                    "child process failed"
                );

                // Parent opens fresh — sees both segments.
                let dir = SingleFileDirectory::open(&path).unwrap();
                assert_eq!(dir.segments().len(), 2);
            } else {
                panic!("fork failed");
            }
        }

        cleanup(&path);
    }

    /// Crash releases locks — next process can write.
    #[test]
    fn crash_releases_locks() {
        let path = test_dir().join("crash_release.luci");
        let _ = fs::remove_file(&path);

        // Create index.
        {
            let mut dir = SingleFileDirectory::create(&path).unwrap();
            dir.write_segment(SegmentId::new(1), &[1u8; 500]).unwrap();
            dir.commit().unwrap();
        }

        unsafe {
            let pid = libc::fork();
            if pid == 0 {
                // Child: open and start writing (holds RESERVED).
                let mut dir = SingleFileDirectory::open(&path).unwrap();
                dir.write_segment(SegmentId::new(99), &[99u8; 500]).unwrap();
                // Crash without committing — SIGKILL.
                libc::_exit(42); // Non-zero to distinguish from success.
            } else if pid > 0 {
                let mut status: libc::c_int = 0;
                libc::waitpid(pid, &mut status, 0);
                // Child exited (locks released by OS).

                // Parent should be able to write.
                let mut dir = SingleFileDirectory::open(&path).unwrap();
                dir.write_segment(SegmentId::new(2), &[2u8; 500]).unwrap();
                dir.commit().unwrap();
            } else {
                panic!("fork failed");
            }
        }

        // Verify: seg 1 and seg 2 exist, seg 99 does not (child crashed).
        {
            let dir = SingleFileDirectory::open(&path).unwrap();
            assert_eq!(dir.segments().len(), 2);
            let seg_ids: Vec<u64> = dir.segments().iter().map(|s| s.segment_id.0).collect();
            assert!(seg_ids.contains(&1));
            assert!(seg_ids.contains(&2));
            assert!(!seg_ids.contains(&99));
        }

        cleanup(&path);
    }
}