Skip to main content

synwire_storage/
concurrency.rs

1//! Native concurrency helpers for Synwire storage backends.
2//!
3//! Synwire uses native backend concurrency rather than external file locks:
4//!
5//! - **`SQLite`**: WAL mode (Write-Ahead Logging) for concurrent reads alongside
6//!   writes without blocking.
7//! - **Binary blobs**: atomic rename (`rename(2)`) for crash-safe updates.
8//! - **`LanceDB` / tantivy**: leverage their own internal concurrency primitives.
9//!
10//! This module provides the helpers enforcing these conventions.
11
12use crate::StorageError;
13use rusqlite::Connection;
14use std::io;
15use std::path::Path;
16
17/// Enable WAL mode on the given `SQLite` connection.
18///
19/// WAL mode allows concurrent reads and a single writer without the writer
20/// blocking readers (unlike the default journal mode).  Must be called once
21/// per connection before any writes.
22///
23/// # Errors
24///
25/// Returns [`StorageError::Sqlite`] if the `PRAGMA journal_mode=WAL` command
26/// fails.
27pub fn ensure_wal_mode(conn: &Connection) -> Result<(), StorageError> {
28    conn.pragma_update(None, "journal_mode", "WAL")?;
29    conn.pragma_update(None, "synchronous", "NORMAL")?;
30    conn.pragma_update(None, "foreign_keys", "ON")?;
31    Ok(())
32}
33
34/// Atomically replace `dest` with the contents of `src` using a rename.
35///
36/// Writes to a temporary file adjacent to `dest`, then renames it into place.
37/// Because `rename(2)` is atomic on POSIX systems (and atomic at the
38/// filesystem level on Windows for files on the same volume), this prevents
39/// readers from observing a partially written file.
40///
41/// # Errors
42///
43/// Returns an I/O error if the source cannot be read, the temp file cannot be
44/// written, or the rename fails.
45pub fn atomic_write(dest: &Path, data: &[u8]) -> io::Result<()> {
46    let parent = dest.parent().ok_or_else(|| {
47        io::Error::new(io::ErrorKind::InvalidInput, "dest has no parent directory")
48    })?;
49    // Write to a sibling temp file.
50    let tmp_path = parent.join(format!(
51        ".tmp-{}",
52        dest.file_name()
53            .and_then(|n| n.to_str())
54            .unwrap_or("atomic")
55    ));
56    std::fs::write(&tmp_path, data)?;
57    // Atomically replace the destination.
58    std::fs::rename(&tmp_path, dest)
59}
60
61/// Open a `SQLite` database in WAL mode, creating it (and its parent
62/// directories) if it does not exist.
63///
64/// # Errors
65///
66/// Returns [`StorageError`] if the directory cannot be created or the
67/// database cannot be opened/configured.
68pub fn open_wal_database(path: &Path) -> Result<Connection, StorageError> {
69    if let Some(parent) = path.parent() {
70        std::fs::create_dir_all(parent)?;
71    }
72    let conn = Connection::open(path)?;
73    ensure_wal_mode(&conn)?;
74    Ok(conn)
75}
76
77#[cfg(test)]
78#[allow(clippy::expect_used, clippy::unwrap_used)]
79mod tests {
80    use super::*;
81    use tempfile::tempdir;
82
83    #[test]
84    fn wal_mode_is_set() {
85        let dir = tempdir().expect("tempdir");
86        let db_path = dir.path().join("test.db");
87        let conn = open_wal_database(&db_path).expect("open_wal_database");
88        let mode: String = conn
89            .query_row("PRAGMA journal_mode", [], |r| r.get(0))
90            .expect("PRAGMA journal_mode");
91        assert_eq!(mode, "wal");
92    }
93
94    #[test]
95    fn atomic_write_creates_file() {
96        let dir = tempdir().expect("tempdir");
97        let dest = dir.path().join("output.bin");
98        atomic_write(&dest, b"hello world").expect("atomic_write");
99        let contents = std::fs::read(&dest).expect("read");
100        assert_eq!(contents, b"hello world");
101    }
102
103    #[test]
104    fn atomic_write_replaces_existing() {
105        let dir = tempdir().expect("tempdir");
106        let dest = dir.path().join("output.bin");
107        atomic_write(&dest, b"v1").expect("atomic_write v1");
108        atomic_write(&dest, b"v2").expect("atomic_write v2");
109        let contents = std::fs::read(&dest).expect("read");
110        assert_eq!(contents, b"v2");
111    }
112}