Skip to main content

neleus_db/
db.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use anyhow::{Context, Result};
6use serde::{Deserialize, Serialize};
7
8use crate::atomic::write_atomic;
9use crate::blob_store::BlobStore;
10use crate::commit::CommitStore;
11use crate::index::SearchIndexStore;
12use crate::lock::acquire_lock;
13use crate::manifest::ManifestStore;
14use crate::object_store::ObjectStore;
15use crate::refs::RefsStore;
16use crate::state::StateStore;
17use crate::wal::{Wal, WalRecoveryReport};
18
19const DB_CONFIG_SCHEMA_VERSION: u32 = 2;
20
21#[derive(Debug, Clone)]
22pub struct Database {
23    pub root: PathBuf,
24    pub blob_store: BlobStore,
25    pub object_store: ObjectStore,
26    pub manifest_store: ManifestStore,
27    pub state_store: StateStore,
28    pub commit_store: CommitStore,
29    pub index_store: SearchIndexStore,
30    pub refs: RefsStore,
31    pub wal: Wal,
32    pub config: Config,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct Config {
37    pub schema_version: u32,
38    pub hashing: String,
39    pub created_at: u64,
40    pub verify_on_read: bool,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44struct LegacyConfigV1 {
45    version: u32,
46    hashing: String,
47    created_at: u64,
48}
49
50impl Database {
51    pub fn init(path: impl AsRef<Path>) -> Result<()> {
52        let root = path.as_ref();
53        fs::create_dir_all(root).with_context(|| format!("failed creating {}", root.display()))?;
54        fs::create_dir_all(root.join("blobs"))?;
55        fs::create_dir_all(root.join("objects"))?;
56        fs::create_dir_all(root.join("refs").join("heads"))?;
57        fs::create_dir_all(root.join("refs").join("states"))?;
58        fs::create_dir_all(root.join("index"))?;
59        fs::create_dir_all(root.join("wal"))?;
60        fs::create_dir_all(root.join("meta"))?;
61
62        let cfg_path = root.join("meta").join("config.json");
63        if !cfg_path.exists() {
64            let cfg = Config {
65                schema_version: DB_CONFIG_SCHEMA_VERSION,
66                hashing: "blake3".into(),
67                created_at: now_unix(),
68                verify_on_read: false,
69            };
70            let bytes = serde_json::to_vec_pretty(&cfg)?;
71            write_atomic(&cfg_path, &bytes)?;
72        }
73
74        let db = Self::open(root)?;
75        let _ = db.state_store.empty_root()?;
76        Ok(())
77    }
78
79    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
80        let root = path.as_ref().to_path_buf();
81        let cfg_path = root.join("meta").join("config.json");
82        if !cfg_path.exists() {
83            return Err(anyhow::anyhow!(
84                "database not initialized at {}",
85                root.display()
86            ));
87        }
88
89        let _recovery_lock = acquire_lock(
90            root.join("meta").join("recovery.lock"),
91            Duration::from_secs(10),
92        )?;
93
94        let (config, migrated) = load_and_migrate_config(&cfg_path)?;
95        if migrated {
96            let bytes = serde_json::to_vec_pretty(&config)?;
97            write_atomic(&cfg_path, &bytes)?;
98        }
99
100        let blob_store = BlobStore::with_options(root.join("blobs"), config.verify_on_read);
101        let object_store = ObjectStore::with_options(root.join("objects"), config.verify_on_read);
102        let wal = Wal::new(root.join("wal"));
103        let refs = RefsStore::new(root.join("refs"), wal.clone());
104        let state_store = StateStore::new(object_store.clone(), blob_store.clone(), wal.clone());
105        let manifest_store = ManifestStore::new(object_store.clone());
106        let commit_store = CommitStore::new(object_store.clone());
107        let index_store = SearchIndexStore::new(root.join("index"));
108
109        blob_store.ensure_dir()?;
110        object_store.ensure_dir()?;
111        wal.ensure_dir()?;
112        refs.ensure_dirs()?;
113        index_store.ensure_dir()?;
114
115        let _report: WalRecoveryReport = wal.recover_refs(refs.root())?;
116
117        Ok(Self {
118            root,
119            blob_store,
120            object_store,
121            manifest_store,
122            state_store,
123            commit_store,
124            index_store,
125            refs,
126            wal,
127            config,
128        })
129    }
130}
131
132pub fn init(path: impl AsRef<Path>) -> Result<()> {
133    Database::init(path)
134}
135
136pub fn open(path: impl AsRef<Path>) -> Result<Database> {
137    Database::open(path)
138}
139
140fn load_and_migrate_config(cfg_path: &Path) -> Result<(Config, bool)> {
141    let raw = fs::read(cfg_path)
142        .with_context(|| format!("failed to read config {}", cfg_path.display()))?;
143
144    if let Ok(cfg) = serde_json::from_slice::<Config>(&raw) {
145        return Ok((migrate_config(cfg), false));
146    }
147
148    let old = serde_json::from_slice::<LegacyConfigV1>(&raw)
149        .with_context(|| format!("failed to parse config {}", cfg_path.display()))?;
150    let migrated = Config {
151        schema_version: DB_CONFIG_SCHEMA_VERSION,
152        hashing: old.hashing,
153        created_at: old.created_at,
154        verify_on_read: false,
155    };
156    Ok((migrate_config(migrated), true))
157}
158
159fn migrate_config(mut cfg: Config) -> Config {
160    if cfg.schema_version < DB_CONFIG_SCHEMA_VERSION {
161        cfg.schema_version = DB_CONFIG_SCHEMA_VERSION;
162    }
163    if cfg.hashing.is_empty() {
164        cfg.hashing = "blake3".into();
165    }
166    cfg
167}
168
169fn now_unix() -> u64 {
170    SystemTime::now()
171        .duration_since(UNIX_EPOCH)
172        .expect("clock drift before epoch")
173        .as_secs()
174}
175
176#[cfg(test)]
177mod tests {
178    use std::fs;
179
180    use tempfile::TempDir;
181
182    use super::*;
183    use crate::hash::hash_blob;
184    use crate::wal::{WalEntry, WalOp, WalPayload};
185
186    #[test]
187    fn init_creates_expected_layout() {
188        let tmp = TempDir::new().unwrap();
189        let db_root = tmp.path().join("neleus_db");
190        Database::init(&db_root).unwrap();
191
192        assert!(db_root.join("blobs").exists());
193        assert!(db_root.join("objects").exists());
194        assert!(db_root.join("refs").join("heads").exists());
195        assert!(db_root.join("index").exists());
196        assert!(db_root.join("wal").exists());
197        assert!(db_root.join("meta").join("config.json").exists());
198    }
199
200    #[test]
201    fn open_after_init_works() {
202        let tmp = TempDir::new().unwrap();
203        let db_root = tmp.path().join("neleus_db");
204        Database::init(&db_root).unwrap();
205        let db = Database::open(&db_root).unwrap();
206        assert_eq!(db.root, db_root);
207    }
208
209    #[test]
210    fn open_fails_without_init() {
211        let tmp = TempDir::new().unwrap();
212        assert!(Database::open(tmp.path()).is_err());
213    }
214
215    #[test]
216    fn init_is_idempotent() {
217        let tmp = TempDir::new().unwrap();
218        let db_root = tmp.path().join("neleus_db");
219        Database::init(&db_root).unwrap();
220        Database::init(&db_root).unwrap();
221        assert!(db_root.join("meta").join("config.json").exists());
222    }
223
224    #[test]
225    fn config_is_valid_json() {
226        let tmp = TempDir::new().unwrap();
227        let db_root = tmp.path().join("neleus_db");
228        Database::init(&db_root).unwrap();
229
230        let raw = fs::read(db_root.join("meta").join("config.json")).unwrap();
231        let v: serde_json::Value = serde_json::from_slice(&raw).unwrap();
232        assert_eq!(v["hashing"], "blake3");
233        assert_eq!(v["schema_version"], DB_CONFIG_SCHEMA_VERSION);
234    }
235
236    #[test]
237    fn interrupted_temp_write_does_not_corrupt_refs() {
238        let tmp = TempDir::new().unwrap();
239        let db_root = tmp.path().join("neleus_db");
240        Database::init(&db_root).unwrap();
241
242        let db = Database::open(&db_root).unwrap();
243        let stable = hash_blob(b"stable-commit");
244        db.refs.head_set("main", stable).unwrap();
245
246        let tmp_ref = db_root
247            .join("refs")
248            .join("heads")
249            .join(".main.tmp-crash-simulated");
250        fs::write(&tmp_ref, format!("{}\n", hash_blob(b"partial-commit"))).unwrap();
251
252        let reopened = Database::open(&db_root).unwrap();
253        let head = reopened.refs.head_get("main").unwrap();
254        assert_eq!(head, Some(stable));
255    }
256
257    #[test]
258    fn migrates_legacy_config() {
259        let tmp = TempDir::new().unwrap();
260        let db_root = tmp.path().join("neleus_db");
261        fs::create_dir_all(db_root.join("meta")).unwrap();
262        fs::create_dir_all(db_root.join("blobs")).unwrap();
263        fs::create_dir_all(db_root.join("objects")).unwrap();
264        fs::create_dir_all(db_root.join("refs").join("heads")).unwrap();
265        fs::create_dir_all(db_root.join("refs").join("states")).unwrap();
266        fs::create_dir_all(db_root.join("wal")).unwrap();
267
268        let legacy = LegacyConfigV1 {
269            version: 1,
270            hashing: "blake3".into(),
271            created_at: 1,
272        };
273        fs::write(
274            db_root.join("meta/config.json"),
275            serde_json::to_vec_pretty(&legacy).unwrap(),
276        )
277        .unwrap();
278
279        let db = Database::open(&db_root).unwrap();
280        assert_eq!(db.config.schema_version, DB_CONFIG_SCHEMA_VERSION);
281        assert!(!db.config.verify_on_read);
282    }
283
284    #[test]
285    fn wal_recovery_replays_pending_ref_update() {
286        let tmp = TempDir::new().unwrap();
287        let db_root = tmp.path().join("neleus_db");
288        Database::init(&db_root).unwrap();
289
290        let wal = Wal::new(db_root.join("wal"));
291        let hash = hash_blob(b"recovered-commit");
292        let entry = WalEntry {
293            schema_version: 1,
294            op: WalOp::RefHeadSet,
295            payload: WalPayload::RefUpdate {
296                name: "main".into(),
297                hash,
298            },
299        };
300        let _p = wal.begin_entry(&entry).unwrap();
301
302        let db = Database::open(&db_root).unwrap();
303        assert_eq!(db.refs.head_get("main").unwrap(), Some(hash));
304        assert!(db.wal.pending().unwrap().is_empty());
305    }
306
307    #[test]
308    fn wal_recovery_rolls_back_bad_entries() {
309        let tmp = TempDir::new().unwrap();
310        let db_root = tmp.path().join("neleus_db");
311        Database::init(&db_root).unwrap();
312
313        fs::write(db_root.join("wal").join("bad.wal"), b"not-cbor").unwrap();
314        let db = Database::open(&db_root).unwrap();
315        assert!(db.wal.pending().unwrap().is_empty());
316    }
317}