Skip to main content

citadel/
database.rs

1use std::fs;
2use std::fs::OpenOptions;
3use std::path::{Path, PathBuf};
4
5use citadel_core::{Error, Result, KEY_FILE_SIZE, MERKLE_HASH_SIZE};
6use citadel_io::durable;
7use citadel_io::sync_io::SyncPageIO;
8use citadel_txn::integrity::IntegrityReport;
9use citadel_txn::manager::TxnManager;
10use citadel_txn::read_txn::ReadTxn;
11use citadel_txn::write_txn::WriteTxn;
12
13#[cfg(feature = "audit-log")]
14use crate::audit::{AuditEventType, AuditLog};
15
16/// Database statistics read from the current commit slot.
17#[derive(Debug, Clone)]
18pub struct DbStats {
19    pub tree_depth: u16,
20    pub entry_count: u64,
21    pub total_pages: u32,
22    pub high_water_mark: u32,
23    pub merkle_root: [u8; MERKLE_HASH_SIZE],
24}
25
26/// An open Citadel database (`Send + Sync`).
27///
28/// Exclusively locks the database file for its lifetime.
29pub struct Database {
30    manager: TxnManager,
31    data_path: PathBuf,
32    key_path: PathBuf,
33    #[cfg(feature = "audit-log")]
34    audit_log: Option<parking_lot::Mutex<AuditLog>>,
35}
36
37impl std::fmt::Debug for Database {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("Database")
40            .field("data_path", &self.data_path)
41            .field("key_path", &self.key_path)
42            .finish()
43    }
44}
45
46// TxnManager is internally synchronized (Mutex + Atomic)
47unsafe impl Send for Database {}
48unsafe impl Sync for Database {}
49
50impl Database {
51    #[cfg(feature = "audit-log")]
52    pub(crate) fn new(
53        manager: TxnManager,
54        data_path: PathBuf,
55        key_path: PathBuf,
56        audit_log: Option<AuditLog>,
57    ) -> Self {
58        Self {
59            manager,
60            data_path,
61            key_path,
62            audit_log: audit_log.map(parking_lot::Mutex::new),
63        }
64    }
65
66    #[cfg(not(feature = "audit-log"))]
67    pub(crate) fn new(manager: TxnManager, data_path: PathBuf, key_path: PathBuf) -> Self {
68        Self {
69            manager,
70            data_path,
71            key_path,
72        }
73    }
74
75    /// Begin a read-only transaction with snapshot isolation.
76    pub fn begin_read(&self) -> ReadTxn<'_> {
77        self.manager.begin_read()
78    }
79
80    /// Begin a read-write transaction. Only one can be active at a time.
81    pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
82        self.manager.begin_write()
83    }
84
85    /// Get database statistics from the current commit slot.
86    pub fn stats(&self) -> DbStats {
87        let slot = self.manager.current_slot();
88        DbStats {
89            tree_depth: slot.tree_depth,
90            entry_count: slot.tree_entries,
91            total_pages: slot.total_pages,
92            high_water_mark: slot.high_water_mark,
93            merkle_root: slot.merkle_root,
94        }
95    }
96
97    /// Path to the data file.
98    pub fn data_path(&self) -> &Path {
99        &self.data_path
100    }
101
102    /// Path to the key file.
103    pub fn key_path(&self) -> &Path {
104        &self.key_path
105    }
106
107    /// Number of currently active readers.
108    pub fn reader_count(&self) -> usize {
109        self.manager.reader_count()
110    }
111
112    /// Change the database passphrase (re-wraps REK, no page re-encryption).
113    pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
114        use citadel_crypto::kdf::{derive_mk, generate_salt};
115        use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
116
117        let key_data = fs::read(&self.key_path)?;
118        if key_data.len() != KEY_FILE_SIZE {
119            return Err(Error::Io(std::io::Error::new(
120                std::io::ErrorKind::InvalidData,
121                "key file has incorrect size",
122            )));
123        }
124        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
125        let kf = KeyFile::deserialize(&key_buf)?;
126
127        let old_mk = derive_mk(
128            kf.kdf_algorithm,
129            old_passphrase,
130            &kf.argon2_salt,
131            kf.argon2_m_cost,
132            kf.argon2_t_cost,
133            kf.argon2_p_cost,
134        )?;
135        kf.verify_mac(&old_mk)?;
136
137        let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
138
139        let new_salt = generate_salt();
140        let new_mk = derive_mk(
141            kf.kdf_algorithm,
142            new_passphrase,
143            &new_salt,
144            kf.argon2_m_cost,
145            kf.argon2_t_cost,
146            kf.argon2_p_cost,
147        )?;
148
149        let new_wrapped = wrap_rek(&new_mk, &rek);
150
151        let mut new_kf = kf.clone();
152        new_kf.argon2_salt = new_salt;
153        new_kf.wrapped_rek = new_wrapped;
154        new_kf.update_mac(&new_mk);
155
156        durable::atomic_write(&self.key_path, &new_kf.serialize())?;
157
158        #[cfg(feature = "audit-log")]
159        self.log_audit(AuditEventType::PassphraseChanged, &[]);
160
161        Ok(())
162    }
163
164    /// Run an integrity check on the database.
165    pub fn integrity_check(&self) -> Result<IntegrityReport> {
166        let report = self.manager.integrity_check()?;
167
168        #[cfg(feature = "audit-log")]
169        {
170            let error_count = report.errors.len() as u32;
171            self.log_audit(
172                AuditEventType::IntegrityCheckPerformed,
173                &error_count.to_le_bytes(),
174            );
175        }
176
177        Ok(report)
178    }
179
180    /// Create a hot backup via MVCC snapshot. Also copies the key file.
181    pub fn backup(&self, dest_path: &Path) -> Result<()> {
182        let dest_file = OpenOptions::new()
183            .read(true)
184            .write(true)
185            .create_new(true)
186            .open(dest_path)?;
187        let dest_io = SyncPageIO::new(dest_file);
188        self.manager.backup_to(&dest_io)?;
189
190        let dest_key_path = resolve_key_path_for(dest_path);
191        fs::copy(&self.key_path, &dest_key_path)?;
192
193        #[cfg(feature = "audit-log")]
194        self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
195
196        Ok(())
197    }
198
199    /// Export an encrypted key backup for disaster recovery.
200    ///
201    /// Requires the current database passphrase. The backup can later restore
202    /// access via `restore_key_from_backup` if the database passphrase is lost.
203    pub fn export_key_backup(
204        &self,
205        db_passphrase: &[u8],
206        backup_passphrase: &[u8],
207        dest_path: &Path,
208    ) -> Result<()> {
209        use citadel_crypto::kdf::derive_mk;
210        use citadel_crypto::key_backup::create_key_backup;
211        use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
212
213        let key_data = fs::read(&self.key_path)?;
214        if key_data.len() != KEY_FILE_SIZE {
215            return Err(Error::Io(std::io::Error::new(
216                std::io::ErrorKind::InvalidData,
217                "key file has incorrect size",
218            )));
219        }
220        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
221        let kf = KeyFile::deserialize(&key_buf)?;
222
223        let mk = derive_mk(
224            kf.kdf_algorithm,
225            db_passphrase,
226            &kf.argon2_salt,
227            kf.argon2_m_cost,
228            kf.argon2_t_cost,
229            kf.argon2_p_cost,
230        )?;
231        kf.verify_mac(&mk)?;
232
233        let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
234
235        let backup_data = create_key_backup(
236            &rek,
237            backup_passphrase,
238            kf.file_id,
239            kf.cipher_id,
240            kf.kdf_algorithm,
241            kf.argon2_m_cost,
242            kf.argon2_t_cost,
243            kf.argon2_p_cost,
244            kf.current_epoch,
245        )?;
246
247        durable::write_and_sync(dest_path, &backup_data)?;
248
249        #[cfg(feature = "audit-log")]
250        self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
251
252        Ok(())
253    }
254
255    /// Restore a key file from an encrypted backup (static — no `Database` needed).
256    ///
257    /// Unwraps the REK using `backup_passphrase`, then creates a new key file
258    /// protected by `new_db_passphrase`.
259    pub fn restore_key_from_backup(
260        backup_path: &Path,
261        backup_passphrase: &[u8],
262        new_db_passphrase: &[u8],
263        db_path: &Path,
264    ) -> Result<()> {
265        use citadel_core::{
266            KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
267        };
268        use citadel_crypto::kdf::{derive_mk, generate_salt};
269        use citadel_crypto::key_backup::restore_rek_from_backup;
270        use citadel_crypto::key_manager::wrap_rek;
271        use citadel_crypto::key_manager::KeyFile;
272
273        let backup_data = fs::read(backup_path)?;
274        if backup_data.len() != KEY_BACKUP_SIZE {
275            return Err(Error::Io(std::io::Error::new(
276                std::io::ErrorKind::InvalidData,
277                "backup file has incorrect size",
278            )));
279        }
280        let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
281
282        let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
283
284        let new_salt = generate_salt();
285        let new_mk = derive_mk(
286            restored.kdf_algorithm,
287            new_db_passphrase,
288            &new_salt,
289            restored.kdf_param1,
290            restored.kdf_param2,
291            restored.kdf_param3,
292        )?;
293
294        let new_wrapped = wrap_rek(&new_mk, &restored.rek);
295
296        let mut new_kf = KeyFile {
297            magic: KEY_FILE_MAGIC,
298            version: KEY_FILE_VERSION,
299            file_id: restored.file_id,
300            argon2_salt: new_salt,
301            argon2_m_cost: restored.kdf_param1,
302            argon2_t_cost: restored.kdf_param2,
303            argon2_p_cost: restored.kdf_param3,
304            cipher_id: restored.cipher_id,
305            kdf_algorithm: restored.kdf_algorithm,
306            wrapped_rek: new_wrapped,
307            current_epoch: restored.epoch,
308            prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
309            prev_epoch: 0,
310            rotation_active: false,
311            file_mac: [0u8; MAC_SIZE],
312        };
313        new_kf.update_mac(&new_mk);
314
315        let key_path = resolve_key_path_for(db_path);
316        durable::atomic_write(&key_path, &new_kf.serialize())?;
317
318        Ok(())
319    }
320
321    /// Compact the database into a new file. Also copies the key file.
322    pub fn compact(&self, dest_path: &Path) -> Result<()> {
323        let dest_file = OpenOptions::new()
324            .read(true)
325            .write(true)
326            .create_new(true)
327            .open(dest_path)?;
328        let dest_io = SyncPageIO::new(dest_file);
329        self.manager.compact_to(&dest_io)?;
330
331        let dest_key_path = resolve_key_path_for(dest_path);
332        fs::copy(&self.key_path, &dest_key_path)?;
333
334        #[cfg(feature = "audit-log")]
335        self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
336
337        Ok(())
338    }
339}
340
341impl Database {
342    #[doc(hidden)]
343    pub fn manager(&self) -> &TxnManager {
344        &self.manager
345    }
346
347    /// Path to the audit log file, if audit logging is enabled.
348    #[cfg(feature = "audit-log")]
349    pub fn audit_log_path(&self) -> Option<PathBuf> {
350        if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
351            Some(crate::audit::resolve_audit_path(&self.data_path))
352        } else {
353            None
354        }
355    }
356
357    /// Verify the audit log's HMAC chain integrity.
358    #[cfg(feature = "audit-log")]
359    pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
360        let audit = self
361            .audit_log
362            .as_ref()
363            .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
364        let guard = audit.lock();
365        let path = crate::audit::resolve_audit_path(&self.data_path);
366        crate::audit::verify_audit_log(&path, guard.audit_key())
367    }
368
369    #[cfg(feature = "audit-log")]
370    pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
371        if let Some(ref mutex) = self.audit_log {
372            let _ = mutex.lock().log(event_type, detail);
373        }
374    }
375
376    #[cfg(feature = "audit-log")]
377    fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
378        let path_str = path.to_string_lossy();
379        let path_bytes = path_str.as_bytes();
380        let len = (path_bytes.len() as u16).to_le_bytes();
381        let mut detail = Vec::with_capacity(2 + path_bytes.len());
382        detail.extend_from_slice(&len);
383        detail.extend_from_slice(path_bytes);
384        self.log_audit(event_type, &detail);
385    }
386}
387
388// --- Peer-to-peer sync ---
389
390use citadel_sync::transport::SyncTransport;
391
392/// Outcome of a sync operation.
393#[derive(Debug, Clone)]
394pub struct SyncOutcome {
395    /// Per-table results: `(table_name, entries_applied)`.
396    pub tables_synced: Vec<(Vec<u8>, u64)>,
397    /// Default tree sync result (if performed).
398    pub default_tree: Option<citadel_sync::SyncOutcome>,
399}
400
401const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
402
403impl Database {
404    /// Get or create a persistent NodeId for this database.
405    pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
406        let mut rtx = self.manager.begin_read();
407        if let Some(data) = rtx.get(NODE_ID_KEY)? {
408            if data.len() == 8 {
409                return Ok(citadel_sync::NodeId::from_bytes(
410                    data[..8].try_into().unwrap(),
411                ));
412            }
413        }
414        drop(rtx);
415
416        let node_id = citadel_sync::NodeId::random();
417        let mut wtx = self.manager.begin_write()?;
418        wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
419        wtx.commit()?;
420        Ok(node_id)
421    }
422
423    /// Push local named tables to a remote peer.
424    pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
425        let node_id = self.node_id()?;
426        let transport =
427            citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
428        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
429            node_id,
430            direction: citadel_sync::SyncDirection::Push,
431            crdt_aware: false,
432        });
433
434        let results = session
435            .sync_tables_as_initiator(&self.manager, &transport)
436            .map_err(sync_err_to_core)?;
437
438        transport.close().map_err(sync_err_to_core)?;
439
440        Ok(SyncOutcome {
441            tables_synced: results
442                .into_iter()
443                .map(|(name, r)| (name, r.entries_applied))
444                .collect(),
445            default_tree: None,
446        })
447    }
448
449    /// Handle an incoming sync session from a remote peer.
450    pub fn handle_sync(
451        &self,
452        stream: std::net::TcpStream,
453        sync_key: &citadel_sync::SyncKey,
454    ) -> Result<SyncOutcome> {
455        let node_id = self.node_id()?;
456        let transport =
457            citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
458        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
459            node_id,
460            direction: citadel_sync::SyncDirection::Push,
461            crdt_aware: false,
462        });
463
464        let results = session
465            .handle_table_sync_as_responder(&self.manager, &transport)
466            .map_err(sync_err_to_core)?;
467
468        transport.close().map_err(sync_err_to_core)?;
469
470        Ok(SyncOutcome {
471            tables_synced: results
472                .into_iter()
473                .map(|(name, r)| (name, r.entries_applied))
474                .collect(),
475            default_tree: None,
476        })
477    }
478}
479
480fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
481    match e {
482        citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
483        other => Error::Sync(other.to_string()),
484    }
485}
486
487#[cfg(feature = "audit-log")]
488impl Drop for Database {
489    fn drop(&mut self) {
490        self.log_audit(AuditEventType::DatabaseClosed, &[]);
491    }
492}
493
494/// `{data_path}.citadel-keys`
495fn resolve_key_path_for(data_path: &Path) -> PathBuf {
496    let mut name = data_path.as_os_str().to_os_string();
497    name.push(".citadel-keys");
498    PathBuf::from(name)
499}