Skip to main content

citadel/
database.rs

1use std::fs;
2use std::fs::OpenOptions;
3use std::path::{Path, PathBuf};
4
5use citadel_core::{Error, Result, KEY_FILE_SIZE, MERKLE_HASH_SIZE};
6use citadel_io::durable;
7#[cfg(not(target_arch = "wasm32"))]
8use citadel_io::mmap_io::MmapPageIO;
9use citadel_txn::integrity::IntegrityReport;
10use citadel_txn::manager::TxnManager;
11use citadel_txn::read_txn::ReadTxn;
12use citadel_txn::write_txn::WriteTxn;
13
14#[cfg(feature = "audit-log")]
15use crate::audit::{AuditEventType, AuditLog};
16
17/// Database statistics read from the current commit slot.
18#[derive(Debug, Clone)]
19pub struct DbStats {
20    pub tree_depth: u16,
21    pub entry_count: u64,
22    pub total_pages: u32,
23    pub high_water_mark: u32,
24    pub merkle_root: [u8; MERKLE_HASH_SIZE],
25}
26
27/// An open Citadel database (`Send + Sync`).
28///
29/// Exclusively locks the database file for its lifetime.
30pub struct Database {
31    manager: TxnManager,
32    data_path: PathBuf,
33    key_path: PathBuf,
34    #[cfg(feature = "audit-log")]
35    audit_log: Option<parking_lot::Mutex<AuditLog>>,
36}
37
38impl std::fmt::Debug for Database {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("Database")
41            .field("data_path", &self.data_path)
42            .field("key_path", &self.key_path)
43            .finish()
44    }
45}
46
47// TxnManager is internally synchronized (Mutex + Atomic)
48unsafe impl Send for Database {}
49unsafe impl Sync for Database {}
50
51impl Database {
52    #[cfg(feature = "audit-log")]
53    pub(crate) fn new(
54        manager: TxnManager,
55        data_path: PathBuf,
56        key_path: PathBuf,
57        audit_log: Option<AuditLog>,
58    ) -> Self {
59        Self {
60            manager,
61            data_path,
62            key_path,
63            audit_log: audit_log.map(parking_lot::Mutex::new),
64        }
65    }
66
67    #[cfg(not(feature = "audit-log"))]
68    pub(crate) fn new(manager: TxnManager, data_path: PathBuf, key_path: PathBuf) -> Self {
69        Self {
70            manager,
71            data_path,
72            key_path,
73        }
74    }
75
76    /// Begin a read-only transaction with snapshot isolation.
77    pub fn begin_read(&self) -> ReadTxn<'_> {
78        self.manager.begin_read()
79    }
80
81    /// Begin a read-write transaction. Only one can be active at a time.
82    pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
83        self.manager.begin_write()
84    }
85
86    /// Get database statistics from the current commit slot.
87    pub fn stats(&self) -> DbStats {
88        let slot = self.manager.current_slot();
89        DbStats {
90            tree_depth: slot.tree_depth,
91            entry_count: slot.tree_entries,
92            total_pages: slot.total_pages,
93            high_water_mark: slot.high_water_mark,
94            merkle_root: slot.merkle_root,
95        }
96    }
97
98    /// Path to the data file.
99    pub fn data_path(&self) -> &Path {
100        &self.data_path
101    }
102
103    /// Path to the key file.
104    pub fn key_path(&self) -> &Path {
105        &self.key_path
106    }
107
108    /// Number of currently active readers.
109    pub fn reader_count(&self) -> usize {
110        self.manager.reader_count()
111    }
112
113    /// Change the database passphrase (re-wraps REK, no page re-encryption).
114    pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
115        use citadel_crypto::kdf::{derive_mk, generate_salt};
116        use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
117
118        let key_data = fs::read(&self.key_path)?;
119        if key_data.len() != KEY_FILE_SIZE {
120            return Err(Error::Io(std::io::Error::new(
121                std::io::ErrorKind::InvalidData,
122                "key file has incorrect size",
123            )));
124        }
125        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
126        let kf = KeyFile::deserialize(&key_buf)?;
127
128        let old_mk = derive_mk(
129            kf.kdf_algorithm,
130            old_passphrase,
131            &kf.argon2_salt,
132            kf.argon2_m_cost,
133            kf.argon2_t_cost,
134            kf.argon2_p_cost,
135        )?;
136        kf.verify_mac(&old_mk)?;
137
138        let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
139
140        let new_salt = generate_salt();
141        let new_mk = derive_mk(
142            kf.kdf_algorithm,
143            new_passphrase,
144            &new_salt,
145            kf.argon2_m_cost,
146            kf.argon2_t_cost,
147            kf.argon2_p_cost,
148        )?;
149
150        let new_wrapped = wrap_rek(&new_mk, &rek);
151
152        let mut new_kf = kf.clone();
153        new_kf.argon2_salt = new_salt;
154        new_kf.wrapped_rek = new_wrapped;
155        new_kf.update_mac(&new_mk);
156
157        durable::atomic_write(&self.key_path, &new_kf.serialize())?;
158
159        #[cfg(feature = "audit-log")]
160        self.log_audit(AuditEventType::PassphraseChanged, &[]);
161
162        Ok(())
163    }
164
165    /// Run an integrity check on the database.
166    pub fn integrity_check(&self) -> Result<IntegrityReport> {
167        let report = self.manager.integrity_check()?;
168
169        #[cfg(feature = "audit-log")]
170        {
171            let error_count = report.errors.len() as u32;
172            self.log_audit(
173                AuditEventType::IntegrityCheckPerformed,
174                &error_count.to_le_bytes(),
175            );
176        }
177
178        Ok(report)
179    }
180
181    /// Create a hot backup via MVCC snapshot. Also copies the key file.
182    #[cfg(not(target_arch = "wasm32"))]
183    pub fn backup(&self, dest_path: &Path) -> Result<()> {
184        let dest_file = OpenOptions::new()
185            .read(true)
186            .write(true)
187            .create_new(true)
188            .open(dest_path)?;
189        let dest_io = MmapPageIO::try_new(dest_file)?;
190        self.manager.backup_to(&dest_io)?;
191
192        let dest_key_path = resolve_key_path_for(dest_path);
193        fs::copy(&self.key_path, &dest_key_path)?;
194
195        #[cfg(feature = "audit-log")]
196        self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
197
198        Ok(())
199    }
200
201    /// Export an encrypted key backup for disaster recovery.
202    ///
203    /// Requires the current database passphrase. The backup can later restore
204    /// access via `restore_key_from_backup` if the database passphrase is lost.
205    pub fn export_key_backup(
206        &self,
207        db_passphrase: &[u8],
208        backup_passphrase: &[u8],
209        dest_path: &Path,
210    ) -> Result<()> {
211        use citadel_crypto::kdf::derive_mk;
212        use citadel_crypto::key_backup::create_key_backup;
213        use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
214
215        let key_data = fs::read(&self.key_path)?;
216        if key_data.len() != KEY_FILE_SIZE {
217            return Err(Error::Io(std::io::Error::new(
218                std::io::ErrorKind::InvalidData,
219                "key file has incorrect size",
220            )));
221        }
222        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
223        let kf = KeyFile::deserialize(&key_buf)?;
224
225        let mk = derive_mk(
226            kf.kdf_algorithm,
227            db_passphrase,
228            &kf.argon2_salt,
229            kf.argon2_m_cost,
230            kf.argon2_t_cost,
231            kf.argon2_p_cost,
232        )?;
233        kf.verify_mac(&mk)?;
234
235        let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
236
237        let backup_data = create_key_backup(
238            &rek,
239            backup_passphrase,
240            kf.file_id,
241            kf.cipher_id,
242            kf.kdf_algorithm,
243            kf.argon2_m_cost,
244            kf.argon2_t_cost,
245            kf.argon2_p_cost,
246            kf.current_epoch,
247        )?;
248
249        durable::write_and_sync(dest_path, &backup_data)?;
250
251        #[cfg(feature = "audit-log")]
252        self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
253
254        Ok(())
255    }
256
257    /// Restore a key file from an encrypted backup (static - no `Database` needed).
258    ///
259    /// Unwraps the REK using `backup_passphrase`, then creates a new key file
260    /// protected by `new_db_passphrase`.
261    pub fn restore_key_from_backup(
262        backup_path: &Path,
263        backup_passphrase: &[u8],
264        new_db_passphrase: &[u8],
265        db_path: &Path,
266    ) -> Result<()> {
267        use citadel_core::{
268            KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
269        };
270        use citadel_crypto::kdf::{derive_mk, generate_salt};
271        use citadel_crypto::key_backup::restore_rek_from_backup;
272        use citadel_crypto::key_manager::wrap_rek;
273        use citadel_crypto::key_manager::KeyFile;
274
275        let backup_data = fs::read(backup_path)?;
276        if backup_data.len() != KEY_BACKUP_SIZE {
277            return Err(Error::Io(std::io::Error::new(
278                std::io::ErrorKind::InvalidData,
279                "backup file has incorrect size",
280            )));
281        }
282        let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
283
284        let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
285
286        let new_salt = generate_salt();
287        let new_mk = derive_mk(
288            restored.kdf_algorithm,
289            new_db_passphrase,
290            &new_salt,
291            restored.kdf_param1,
292            restored.kdf_param2,
293            restored.kdf_param3,
294        )?;
295
296        let new_wrapped = wrap_rek(&new_mk, &restored.rek);
297
298        let mut new_kf = KeyFile {
299            magic: KEY_FILE_MAGIC,
300            version: KEY_FILE_VERSION,
301            file_id: restored.file_id,
302            argon2_salt: new_salt,
303            argon2_m_cost: restored.kdf_param1,
304            argon2_t_cost: restored.kdf_param2,
305            argon2_p_cost: restored.kdf_param3,
306            cipher_id: restored.cipher_id,
307            kdf_algorithm: restored.kdf_algorithm,
308            wrapped_rek: new_wrapped,
309            current_epoch: restored.epoch,
310            prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
311            prev_epoch: 0,
312            rotation_active: false,
313            file_mac: [0u8; MAC_SIZE],
314        };
315        new_kf.update_mac(&new_mk);
316
317        let key_path = resolve_key_path_for(db_path);
318        durable::atomic_write(&key_path, &new_kf.serialize())?;
319
320        Ok(())
321    }
322
323    /// Compact the database into a new file. Also copies the key file.
324    #[cfg(not(target_arch = "wasm32"))]
325    pub fn compact(&self, dest_path: &Path) -> Result<()> {
326        let dest_file = OpenOptions::new()
327            .read(true)
328            .write(true)
329            .create_new(true)
330            .open(dest_path)?;
331        let dest_io = MmapPageIO::try_new(dest_file)?;
332        self.manager.compact_to(&dest_io)?;
333
334        let dest_key_path = resolve_key_path_for(dest_path);
335        fs::copy(&self.key_path, &dest_key_path)?;
336
337        #[cfg(feature = "audit-log")]
338        self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
339
340        Ok(())
341    }
342}
343
344impl Database {
345    #[doc(hidden)]
346    pub fn manager(&self) -> &TxnManager {
347        &self.manager
348    }
349
350    /// Path to the audit log file, if audit logging is enabled.
351    #[cfg(feature = "audit-log")]
352    pub fn audit_log_path(&self) -> Option<PathBuf> {
353        if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
354            Some(crate::audit::resolve_audit_path(&self.data_path))
355        } else {
356            None
357        }
358    }
359
360    /// Verify the audit log's HMAC chain integrity.
361    #[cfg(feature = "audit-log")]
362    pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
363        let audit = self
364            .audit_log
365            .as_ref()
366            .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
367        let guard = audit.lock();
368        let path = crate::audit::resolve_audit_path(&self.data_path);
369        crate::audit::verify_audit_log(&path, guard.audit_key())
370    }
371
372    #[cfg(feature = "audit-log")]
373    pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
374        if let Some(ref mutex) = self.audit_log {
375            let _ = mutex.lock().log(event_type, detail);
376        }
377    }
378
379    #[cfg(feature = "audit-log")]
380    fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
381        let path_str = path.to_string_lossy();
382        let path_bytes = path_str.as_bytes();
383        let len = (path_bytes.len() as u16).to_le_bytes();
384        let mut detail = Vec::with_capacity(2 + path_bytes.len());
385        detail.extend_from_slice(&len);
386        detail.extend_from_slice(path_bytes);
387        self.log_audit(event_type, &detail);
388    }
389}
390
391use citadel_sync::transport::SyncTransport;
392
393/// Outcome of a sync operation.
394#[derive(Debug, Clone)]
395pub struct SyncOutcome {
396    /// Per-table results: `(table_name, entries_applied)`.
397    pub tables_synced: Vec<(Vec<u8>, u64)>,
398    /// Default tree sync result (if performed).
399    pub default_tree: Option<citadel_sync::SyncOutcome>,
400}
401
402const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
403
404impl Database {
405    /// Get or create a persistent NodeId for this database.
406    pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
407        let mut rtx = self.manager.begin_read();
408        if let Some(data) = rtx.get(NODE_ID_KEY)? {
409            if data.len() == 8 {
410                return Ok(citadel_sync::NodeId::from_bytes(
411                    data[..8].try_into().unwrap(),
412                ));
413            }
414        }
415        drop(rtx);
416
417        let node_id = citadel_sync::NodeId::random();
418        let mut wtx = self.manager.begin_write()?;
419        wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
420        wtx.commit()?;
421        Ok(node_id)
422    }
423
424    /// Push local named tables to a remote peer.
425    pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
426        let node_id = self.node_id()?;
427        let transport =
428            citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
429        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
430            node_id,
431            direction: citadel_sync::SyncDirection::Push,
432            crdt_aware: false,
433        });
434
435        let results = session
436            .sync_tables_as_initiator(&self.manager, &transport)
437            .map_err(sync_err_to_core)?;
438
439        transport.close().map_err(sync_err_to_core)?;
440
441        Ok(SyncOutcome {
442            tables_synced: results
443                .into_iter()
444                .map(|(name, r)| (name, r.entries_applied))
445                .collect(),
446            default_tree: None,
447        })
448    }
449
450    /// Handle an incoming sync session from a remote peer.
451    pub fn handle_sync(
452        &self,
453        stream: std::net::TcpStream,
454        sync_key: &citadel_sync::SyncKey,
455    ) -> Result<SyncOutcome> {
456        let node_id = self.node_id()?;
457        let transport =
458            citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
459        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
460            node_id,
461            direction: citadel_sync::SyncDirection::Push,
462            crdt_aware: false,
463        });
464
465        let results = session
466            .handle_table_sync_as_responder(&self.manager, &transport)
467            .map_err(sync_err_to_core)?;
468
469        transport.close().map_err(sync_err_to_core)?;
470
471        Ok(SyncOutcome {
472            tables_synced: results
473                .into_iter()
474                .map(|(name, r)| (name, r.entries_applied))
475                .collect(),
476            default_tree: None,
477        })
478    }
479}
480
481fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
482    match e {
483        citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
484        other => Error::Sync(other.to_string()),
485    }
486}
487
488#[cfg(feature = "audit-log")]
489impl Drop for Database {
490    fn drop(&mut self) {
491        self.log_audit(AuditEventType::DatabaseClosed, &[]);
492    }
493}
494
495/// `{data_path}.citadel-keys`
496fn resolve_key_path_for(data_path: &Path) -> PathBuf {
497    let mut name = data_path.as_os_str().to_os_string();
498    name.push(".citadel-keys");
499    PathBuf::from(name)
500}