1use std::fs;
2use std::path::{Path, PathBuf};
3use std::time::{Duration, SystemTime, UNIX_EPOCH};
4
5use anyhow::{Context, Result};
6use serde::{Deserialize, Serialize};
7
8use crate::atomic::write_atomic;
9use crate::blob_store::BlobStore;
10use crate::commit::CommitStore;
11use crate::index::SearchIndexStore;
12use crate::lock::acquire_lock;
13use crate::manifest::ManifestStore;
14use crate::object_store::ObjectStore;
15use crate::refs::RefsStore;
16use crate::state::StateStore;
17use crate::wal::{Wal, WalRecoveryReport};
18
19const DB_CONFIG_SCHEMA_VERSION: u32 = 2;
20
21#[derive(Debug, Clone)]
22pub struct Database {
23 pub root: PathBuf,
24 pub blob_store: BlobStore,
25 pub object_store: ObjectStore,
26 pub manifest_store: ManifestStore,
27 pub state_store: StateStore,
28 pub commit_store: CommitStore,
29 pub index_store: SearchIndexStore,
30 pub refs: RefsStore,
31 pub wal: Wal,
32 pub config: Config,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36pub struct Config {
37 pub schema_version: u32,
38 pub hashing: String,
39 pub created_at: u64,
40 pub verify_on_read: bool,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44struct LegacyConfigV1 {
45 version: u32,
46 hashing: String,
47 created_at: u64,
48}
49
50impl Database {
51 pub fn init(path: impl AsRef<Path>) -> Result<()> {
52 let root = path.as_ref();
53 fs::create_dir_all(root).with_context(|| format!("failed creating {}", root.display()))?;
54 fs::create_dir_all(root.join("blobs"))?;
55 fs::create_dir_all(root.join("objects"))?;
56 fs::create_dir_all(root.join("refs").join("heads"))?;
57 fs::create_dir_all(root.join("refs").join("states"))?;
58 fs::create_dir_all(root.join("index"))?;
59 fs::create_dir_all(root.join("wal"))?;
60 fs::create_dir_all(root.join("meta"))?;
61
62 let cfg_path = root.join("meta").join("config.json");
63 if !cfg_path.exists() {
64 let cfg = Config {
65 schema_version: DB_CONFIG_SCHEMA_VERSION,
66 hashing: "blake3".into(),
67 created_at: now_unix(),
68 verify_on_read: false,
69 };
70 let bytes = serde_json::to_vec_pretty(&cfg)?;
71 write_atomic(&cfg_path, &bytes)?;
72 }
73
74 let db = Self::open(root)?;
75 let _ = db.state_store.empty_root()?;
76 Ok(())
77 }
78
79 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
80 let root = path.as_ref().to_path_buf();
81 let cfg_path = root.join("meta").join("config.json");
82 if !cfg_path.exists() {
83 return Err(anyhow::anyhow!(
84 "database not initialized at {}",
85 root.display()
86 ));
87 }
88
89 let _recovery_lock = acquire_lock(
90 root.join("meta").join("recovery.lock"),
91 Duration::from_secs(10),
92 )?;
93
94 let (config, migrated) = load_and_migrate_config(&cfg_path)?;
95 if migrated {
96 let bytes = serde_json::to_vec_pretty(&config)?;
97 write_atomic(&cfg_path, &bytes)?;
98 }
99
100 let blob_store = BlobStore::with_options(root.join("blobs"), config.verify_on_read);
101 let object_store = ObjectStore::with_options(root.join("objects"), config.verify_on_read);
102 let wal = Wal::new(root.join("wal"));
103 let refs = RefsStore::new(root.join("refs"), wal.clone());
104 let state_store = StateStore::new(object_store.clone(), blob_store.clone(), wal.clone());
105 let manifest_store = ManifestStore::new(object_store.clone());
106 let commit_store = CommitStore::new(object_store.clone());
107 let index_store = SearchIndexStore::new(root.join("index"));
108
109 blob_store.ensure_dir()?;
110 object_store.ensure_dir()?;
111 wal.ensure_dir()?;
112 refs.ensure_dirs()?;
113 index_store.ensure_dir()?;
114
115 let _report: WalRecoveryReport = wal.recover_refs(refs.root())?;
116
117 Ok(Self {
118 root,
119 blob_store,
120 object_store,
121 manifest_store,
122 state_store,
123 commit_store,
124 index_store,
125 refs,
126 wal,
127 config,
128 })
129 }
130}
131
132pub fn init(path: impl AsRef<Path>) -> Result<()> {
133 Database::init(path)
134}
135
136pub fn open(path: impl AsRef<Path>) -> Result<Database> {
137 Database::open(path)
138}
139
140fn load_and_migrate_config(cfg_path: &Path) -> Result<(Config, bool)> {
141 let raw = fs::read(cfg_path)
142 .with_context(|| format!("failed to read config {}", cfg_path.display()))?;
143
144 if let Ok(cfg) = serde_json::from_slice::<Config>(&raw) {
145 return Ok((migrate_config(cfg), false));
146 }
147
148 let old = serde_json::from_slice::<LegacyConfigV1>(&raw)
149 .with_context(|| format!("failed to parse config {}", cfg_path.display()))?;
150 let migrated = Config {
151 schema_version: DB_CONFIG_SCHEMA_VERSION,
152 hashing: old.hashing,
153 created_at: old.created_at,
154 verify_on_read: false,
155 };
156 Ok((migrate_config(migrated), true))
157}
158
159fn migrate_config(mut cfg: Config) -> Config {
160 if cfg.schema_version < DB_CONFIG_SCHEMA_VERSION {
161 cfg.schema_version = DB_CONFIG_SCHEMA_VERSION;
162 }
163 if cfg.hashing.is_empty() {
164 cfg.hashing = "blake3".into();
165 }
166 cfg
167}
168
169fn now_unix() -> u64 {
170 SystemTime::now()
171 .duration_since(UNIX_EPOCH)
172 .expect("clock drift before epoch")
173 .as_secs()
174}
175
176#[cfg(test)]
177mod tests {
178 use std::fs;
179
180 use tempfile::TempDir;
181
182 use super::*;
183 use crate::hash::hash_blob;
184 use crate::wal::{WalEntry, WalOp, WalPayload};
185
186 #[test]
187 fn init_creates_expected_layout() {
188 let tmp = TempDir::new().unwrap();
189 let db_root = tmp.path().join("neleus_db");
190 Database::init(&db_root).unwrap();
191
192 assert!(db_root.join("blobs").exists());
193 assert!(db_root.join("objects").exists());
194 assert!(db_root.join("refs").join("heads").exists());
195 assert!(db_root.join("index").exists());
196 assert!(db_root.join("wal").exists());
197 assert!(db_root.join("meta").join("config.json").exists());
198 }
199
200 #[test]
201 fn open_after_init_works() {
202 let tmp = TempDir::new().unwrap();
203 let db_root = tmp.path().join("neleus_db");
204 Database::init(&db_root).unwrap();
205 let db = Database::open(&db_root).unwrap();
206 assert_eq!(db.root, db_root);
207 }
208
209 #[test]
210 fn open_fails_without_init() {
211 let tmp = TempDir::new().unwrap();
212 assert!(Database::open(tmp.path()).is_err());
213 }
214
215 #[test]
216 fn init_is_idempotent() {
217 let tmp = TempDir::new().unwrap();
218 let db_root = tmp.path().join("neleus_db");
219 Database::init(&db_root).unwrap();
220 Database::init(&db_root).unwrap();
221 assert!(db_root.join("meta").join("config.json").exists());
222 }
223
224 #[test]
225 fn config_is_valid_json() {
226 let tmp = TempDir::new().unwrap();
227 let db_root = tmp.path().join("neleus_db");
228 Database::init(&db_root).unwrap();
229
230 let raw = fs::read(db_root.join("meta").join("config.json")).unwrap();
231 let v: serde_json::Value = serde_json::from_slice(&raw).unwrap();
232 assert_eq!(v["hashing"], "blake3");
233 assert_eq!(v["schema_version"], DB_CONFIG_SCHEMA_VERSION);
234 }
235
236 #[test]
237 fn interrupted_temp_write_does_not_corrupt_refs() {
238 let tmp = TempDir::new().unwrap();
239 let db_root = tmp.path().join("neleus_db");
240 Database::init(&db_root).unwrap();
241
242 let db = Database::open(&db_root).unwrap();
243 let stable = hash_blob(b"stable-commit");
244 db.refs.head_set("main", stable).unwrap();
245
246 let tmp_ref = db_root
247 .join("refs")
248 .join("heads")
249 .join(".main.tmp-crash-simulated");
250 fs::write(&tmp_ref, format!("{}\n", hash_blob(b"partial-commit"))).unwrap();
251
252 let reopened = Database::open(&db_root).unwrap();
253 let head = reopened.refs.head_get("main").unwrap();
254 assert_eq!(head, Some(stable));
255 }
256
257 #[test]
258 fn migrates_legacy_config() {
259 let tmp = TempDir::new().unwrap();
260 let db_root = tmp.path().join("neleus_db");
261 fs::create_dir_all(db_root.join("meta")).unwrap();
262 fs::create_dir_all(db_root.join("blobs")).unwrap();
263 fs::create_dir_all(db_root.join("objects")).unwrap();
264 fs::create_dir_all(db_root.join("refs").join("heads")).unwrap();
265 fs::create_dir_all(db_root.join("refs").join("states")).unwrap();
266 fs::create_dir_all(db_root.join("wal")).unwrap();
267
268 let legacy = LegacyConfigV1 {
269 version: 1,
270 hashing: "blake3".into(),
271 created_at: 1,
272 };
273 fs::write(
274 db_root.join("meta/config.json"),
275 serde_json::to_vec_pretty(&legacy).unwrap(),
276 )
277 .unwrap();
278
279 let db = Database::open(&db_root).unwrap();
280 assert_eq!(db.config.schema_version, DB_CONFIG_SCHEMA_VERSION);
281 assert!(!db.config.verify_on_read);
282 }
283
284 #[test]
285 fn wal_recovery_replays_pending_ref_update() {
286 let tmp = TempDir::new().unwrap();
287 let db_root = tmp.path().join("neleus_db");
288 Database::init(&db_root).unwrap();
289
290 let wal = Wal::new(db_root.join("wal"));
291 let hash = hash_blob(b"recovered-commit");
292 let entry = WalEntry {
293 schema_version: 1,
294 op: WalOp::RefHeadSet,
295 payload: WalPayload::RefUpdate {
296 name: "main".into(),
297 hash,
298 },
299 };
300 let _p = wal.begin_entry(&entry).unwrap();
301
302 let db = Database::open(&db_root).unwrap();
303 assert_eq!(db.refs.head_get("main").unwrap(), Some(hash));
304 assert!(db.wal.pending().unwrap().is_empty());
305 }
306
307 #[test]
308 fn wal_recovery_rolls_back_bad_entries() {
309 let tmp = TempDir::new().unwrap();
310 let db_root = tmp.path().join("neleus_db");
311 Database::init(&db_root).unwrap();
312
313 fs::write(db_root.join("wal").join("bad.wal"), b"not-cbor").unwrap();
314 let db = Database::open(&db_root).unwrap();
315 assert!(db.wal.pending().unwrap().is_empty());
316 }
317}