1use std::fs;
2use std::fs::OpenOptions;
3use std::path::{Path, PathBuf};
4
5use citadel_core::{Error, Result, KEY_FILE_SIZE, MERKLE_HASH_SIZE};
6use citadel_io::durable;
7#[cfg(not(target_arch = "wasm32"))]
8use citadel_io::mmap_io::MmapPageIO;
9use citadel_txn::integrity::IntegrityReport;
10use citadel_txn::manager::TxnManager;
11use citadel_txn::read_txn::ReadTxn;
12use citadel_txn::write_txn::WriteTxn;
13
14#[cfg(feature = "audit-log")]
15use crate::audit::{AuditEventType, AuditLog};
16
17#[derive(Debug, Clone)]
19pub struct DbStats {
20 pub tree_depth: u16,
21 pub entry_count: u64,
22 pub total_pages: u32,
23 pub high_water_mark: u32,
24 pub merkle_root: [u8; MERKLE_HASH_SIZE],
25}
26
27pub struct Database {
31 manager: TxnManager,
32 data_path: PathBuf,
33 key_path: PathBuf,
34 #[cfg(feature = "audit-log")]
35 audit_log: Option<parking_lot::Mutex<AuditLog>>,
36}
37
38impl std::fmt::Debug for Database {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("Database")
41 .field("data_path", &self.data_path)
42 .field("key_path", &self.key_path)
43 .finish()
44 }
45}
46
47unsafe impl Send for Database {}
49unsafe impl Sync for Database {}
50
51impl Database {
52 #[cfg(feature = "audit-log")]
53 pub(crate) fn new(
54 manager: TxnManager,
55 data_path: PathBuf,
56 key_path: PathBuf,
57 audit_log: Option<AuditLog>,
58 ) -> Self {
59 Self {
60 manager,
61 data_path,
62 key_path,
63 audit_log: audit_log.map(parking_lot::Mutex::new),
64 }
65 }
66
67 #[cfg(not(feature = "audit-log"))]
68 pub(crate) fn new(manager: TxnManager, data_path: PathBuf, key_path: PathBuf) -> Self {
69 Self {
70 manager,
71 data_path,
72 key_path,
73 }
74 }
75
76 pub fn begin_read(&self) -> ReadTxn<'_> {
78 self.manager.begin_read()
79 }
80
81 pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
83 self.manager.begin_write()
84 }
85
86 pub fn stats(&self) -> DbStats {
88 let slot = self.manager.current_slot();
89 DbStats {
90 tree_depth: slot.tree_depth,
91 entry_count: slot.tree_entries,
92 total_pages: slot.total_pages,
93 high_water_mark: slot.high_water_mark,
94 merkle_root: slot.merkle_root,
95 }
96 }
97
98 pub fn data_path(&self) -> &Path {
100 &self.data_path
101 }
102
103 pub fn key_path(&self) -> &Path {
105 &self.key_path
106 }
107
108 pub fn reader_count(&self) -> usize {
110 self.manager.reader_count()
111 }
112
113 pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
115 use citadel_crypto::kdf::{derive_mk, generate_salt};
116 use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
117
118 let key_data = fs::read(&self.key_path)?;
119 if key_data.len() != KEY_FILE_SIZE {
120 return Err(Error::Io(std::io::Error::new(
121 std::io::ErrorKind::InvalidData,
122 "key file has incorrect size",
123 )));
124 }
125 let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
126 let kf = KeyFile::deserialize(&key_buf)?;
127
128 let old_mk = derive_mk(
129 kf.kdf_algorithm,
130 old_passphrase,
131 &kf.argon2_salt,
132 kf.argon2_m_cost,
133 kf.argon2_t_cost,
134 kf.argon2_p_cost,
135 )?;
136 kf.verify_mac(&old_mk)?;
137
138 let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
139
140 let new_salt = generate_salt();
141 let new_mk = derive_mk(
142 kf.kdf_algorithm,
143 new_passphrase,
144 &new_salt,
145 kf.argon2_m_cost,
146 kf.argon2_t_cost,
147 kf.argon2_p_cost,
148 )?;
149
150 let new_wrapped = wrap_rek(&new_mk, &rek);
151
152 let mut new_kf = kf.clone();
153 new_kf.argon2_salt = new_salt;
154 new_kf.wrapped_rek = new_wrapped;
155 new_kf.update_mac(&new_mk);
156
157 durable::atomic_write(&self.key_path, &new_kf.serialize())?;
158
159 #[cfg(feature = "audit-log")]
160 self.log_audit(AuditEventType::PassphraseChanged, &[]);
161
162 Ok(())
163 }
164
165 pub fn integrity_check(&self) -> Result<IntegrityReport> {
167 let report = self.manager.integrity_check()?;
168
169 #[cfg(feature = "audit-log")]
170 {
171 let error_count = report.errors.len() as u32;
172 self.log_audit(
173 AuditEventType::IntegrityCheckPerformed,
174 &error_count.to_le_bytes(),
175 );
176 }
177
178 Ok(report)
179 }
180
181 #[cfg(not(target_arch = "wasm32"))]
183 pub fn backup(&self, dest_path: &Path) -> Result<()> {
184 let dest_file = OpenOptions::new()
185 .read(true)
186 .write(true)
187 .create_new(true)
188 .open(dest_path)?;
189 let dest_io = MmapPageIO::try_new(dest_file)?;
190 self.manager.backup_to(&dest_io)?;
191
192 let dest_key_path = resolve_key_path_for(dest_path);
193 fs::copy(&self.key_path, &dest_key_path)?;
194
195 #[cfg(feature = "audit-log")]
196 self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
197
198 Ok(())
199 }
200
201 pub fn export_key_backup(
206 &self,
207 db_passphrase: &[u8],
208 backup_passphrase: &[u8],
209 dest_path: &Path,
210 ) -> Result<()> {
211 use citadel_crypto::kdf::derive_mk;
212 use citadel_crypto::key_backup::create_key_backup;
213 use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
214
215 let key_data = fs::read(&self.key_path)?;
216 if key_data.len() != KEY_FILE_SIZE {
217 return Err(Error::Io(std::io::Error::new(
218 std::io::ErrorKind::InvalidData,
219 "key file has incorrect size",
220 )));
221 }
222 let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
223 let kf = KeyFile::deserialize(&key_buf)?;
224
225 let mk = derive_mk(
226 kf.kdf_algorithm,
227 db_passphrase,
228 &kf.argon2_salt,
229 kf.argon2_m_cost,
230 kf.argon2_t_cost,
231 kf.argon2_p_cost,
232 )?;
233 kf.verify_mac(&mk)?;
234
235 let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
236
237 let backup_data = create_key_backup(
238 &rek,
239 backup_passphrase,
240 kf.file_id,
241 kf.cipher_id,
242 kf.kdf_algorithm,
243 kf.argon2_m_cost,
244 kf.argon2_t_cost,
245 kf.argon2_p_cost,
246 kf.current_epoch,
247 )?;
248
249 durable::write_and_sync(dest_path, &backup_data)?;
250
251 #[cfg(feature = "audit-log")]
252 self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
253
254 Ok(())
255 }
256
257 pub fn restore_key_from_backup(
262 backup_path: &Path,
263 backup_passphrase: &[u8],
264 new_db_passphrase: &[u8],
265 db_path: &Path,
266 ) -> Result<()> {
267 use citadel_core::{
268 KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
269 };
270 use citadel_crypto::kdf::{derive_mk, generate_salt};
271 use citadel_crypto::key_backup::restore_rek_from_backup;
272 use citadel_crypto::key_manager::wrap_rek;
273 use citadel_crypto::key_manager::KeyFile;
274
275 let backup_data = fs::read(backup_path)?;
276 if backup_data.len() != KEY_BACKUP_SIZE {
277 return Err(Error::Io(std::io::Error::new(
278 std::io::ErrorKind::InvalidData,
279 "backup file has incorrect size",
280 )));
281 }
282 let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
283
284 let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
285
286 let new_salt = generate_salt();
287 let new_mk = derive_mk(
288 restored.kdf_algorithm,
289 new_db_passphrase,
290 &new_salt,
291 restored.kdf_param1,
292 restored.kdf_param2,
293 restored.kdf_param3,
294 )?;
295
296 let new_wrapped = wrap_rek(&new_mk, &restored.rek);
297
298 let mut new_kf = KeyFile {
299 magic: KEY_FILE_MAGIC,
300 version: KEY_FILE_VERSION,
301 file_id: restored.file_id,
302 argon2_salt: new_salt,
303 argon2_m_cost: restored.kdf_param1,
304 argon2_t_cost: restored.kdf_param2,
305 argon2_p_cost: restored.kdf_param3,
306 cipher_id: restored.cipher_id,
307 kdf_algorithm: restored.kdf_algorithm,
308 wrapped_rek: new_wrapped,
309 current_epoch: restored.epoch,
310 prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
311 prev_epoch: 0,
312 rotation_active: false,
313 file_mac: [0u8; MAC_SIZE],
314 };
315 new_kf.update_mac(&new_mk);
316
317 let key_path = resolve_key_path_for(db_path);
318 durable::atomic_write(&key_path, &new_kf.serialize())?;
319
320 Ok(())
321 }
322
323 #[cfg(not(target_arch = "wasm32"))]
325 pub fn compact(&self, dest_path: &Path) -> Result<()> {
326 let dest_file = OpenOptions::new()
327 .read(true)
328 .write(true)
329 .create_new(true)
330 .open(dest_path)?;
331 let dest_io = MmapPageIO::try_new(dest_file)?;
332 self.manager.compact_to(&dest_io)?;
333
334 let dest_key_path = resolve_key_path_for(dest_path);
335 fs::copy(&self.key_path, &dest_key_path)?;
336
337 #[cfg(feature = "audit-log")]
338 self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
339
340 Ok(())
341 }
342}
343
344impl Database {
345 #[doc(hidden)]
346 pub fn manager(&self) -> &TxnManager {
347 &self.manager
348 }
349
350 #[cfg(feature = "audit-log")]
352 pub fn audit_log_path(&self) -> Option<PathBuf> {
353 if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
354 Some(crate::audit::resolve_audit_path(&self.data_path))
355 } else {
356 None
357 }
358 }
359
360 #[cfg(feature = "audit-log")]
362 pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
363 let audit = self
364 .audit_log
365 .as_ref()
366 .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
367 let guard = audit.lock();
368 let path = crate::audit::resolve_audit_path(&self.data_path);
369 crate::audit::verify_audit_log(&path, guard.audit_key())
370 }
371
372 #[cfg(feature = "audit-log")]
373 pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
374 if let Some(ref mutex) = self.audit_log {
375 let _ = mutex.lock().log(event_type, detail);
376 }
377 }
378
379 #[cfg(feature = "audit-log")]
380 fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
381 let path_str = path.to_string_lossy();
382 let path_bytes = path_str.as_bytes();
383 let len = (path_bytes.len() as u16).to_le_bytes();
384 let mut detail = Vec::with_capacity(2 + path_bytes.len());
385 detail.extend_from_slice(&len);
386 detail.extend_from_slice(path_bytes);
387 self.log_audit(event_type, &detail);
388 }
389}
390
391use citadel_sync::transport::SyncTransport;
392
393#[derive(Debug, Clone)]
395pub struct SyncOutcome {
396 pub tables_synced: Vec<(Vec<u8>, u64)>,
398 pub default_tree: Option<citadel_sync::SyncOutcome>,
400}
401
402const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
403
404impl Database {
405 pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
407 let mut rtx = self.manager.begin_read();
408 if let Some(data) = rtx.get(NODE_ID_KEY)? {
409 if data.len() == 8 {
410 return Ok(citadel_sync::NodeId::from_bytes(
411 data[..8].try_into().unwrap(),
412 ));
413 }
414 }
415 drop(rtx);
416
417 let node_id = citadel_sync::NodeId::random();
418 let mut wtx = self.manager.begin_write()?;
419 wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
420 wtx.commit()?;
421 Ok(node_id)
422 }
423
424 pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
426 let node_id = self.node_id()?;
427 let transport =
428 citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
429 let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
430 node_id,
431 direction: citadel_sync::SyncDirection::Push,
432 crdt_aware: false,
433 });
434
435 let results = session
436 .sync_tables_as_initiator(&self.manager, &transport)
437 .map_err(sync_err_to_core)?;
438
439 transport.close().map_err(sync_err_to_core)?;
440
441 Ok(SyncOutcome {
442 tables_synced: results
443 .into_iter()
444 .map(|(name, r)| (name, r.entries_applied))
445 .collect(),
446 default_tree: None,
447 })
448 }
449
450 pub fn handle_sync(
452 &self,
453 stream: std::net::TcpStream,
454 sync_key: &citadel_sync::SyncKey,
455 ) -> Result<SyncOutcome> {
456 let node_id = self.node_id()?;
457 let transport =
458 citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
459 let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
460 node_id,
461 direction: citadel_sync::SyncDirection::Push,
462 crdt_aware: false,
463 });
464
465 let results = session
466 .handle_table_sync_as_responder(&self.manager, &transport)
467 .map_err(sync_err_to_core)?;
468
469 transport.close().map_err(sync_err_to_core)?;
470
471 Ok(SyncOutcome {
472 tables_synced: results
473 .into_iter()
474 .map(|(name, r)| (name, r.entries_applied))
475 .collect(),
476 default_tree: None,
477 })
478 }
479}
480
481fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
482 match e {
483 citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
484 other => Error::Sync(other.to_string()),
485 }
486}
487
488#[cfg(feature = "audit-log")]
489impl Drop for Database {
490 fn drop(&mut self) {
491 self.log_audit(AuditEventType::DatabaseClosed, &[]);
492 }
493}
494
495fn resolve_key_path_for(data_path: &Path) -> PathBuf {
497 let mut name = data_path.as_os_str().to_os_string();
498 name.push(".citadel-keys");
499 PathBuf::from(name)
500}