Skip to main content

citadel/
database.rs

1use std::fs;
2#[cfg(not(target_arch = "wasm32"))]
3use std::fs::OpenOptions;
4use std::path::{Path, PathBuf};
5
6use citadel_core::{Error, Result, KEY_FILE_SIZE, MERKLE_HASH_SIZE};
7use citadel_io::durable;
8#[cfg(not(target_arch = "wasm32"))]
9use citadel_io::mmap_io::MmapPageIO;
10use citadel_txn::integrity::IntegrityReport;
11use citadel_txn::manager::TxnManager;
12use citadel_txn::read_txn::ReadTxn;
13use citadel_txn::write_txn::WriteTxn;
14
15#[cfg(feature = "audit-log")]
16use crate::audit::{AuditEventType, AuditLog};
17
18/// Database statistics read from the current commit slot.
19#[derive(Debug, Clone)]
20pub struct DbStats {
21    pub tree_depth: u16,
22    pub entry_count: u64,
23    pub total_pages: u32,
24    pub high_water_mark: u32,
25    pub merkle_root: [u8; MERKLE_HASH_SIZE],
26}
27
28/// An open Citadel database (`Send + Sync`).
29///
30/// Exclusively locks the database file for its lifetime.
31pub struct Database {
32    manager: TxnManager,
33    data_path: PathBuf,
34    key_path: PathBuf,
35    #[cfg(feature = "audit-log")]
36    audit_log: Option<parking_lot::Mutex<AuditLog>>,
37}
38
39impl std::fmt::Debug for Database {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        f.debug_struct("Database")
42            .field("data_path", &self.data_path)
43            .field("key_path", &self.key_path)
44            .finish()
45    }
46}
47
48// TxnManager is internally synchronized (Mutex + Atomic)
49unsafe impl Send for Database {}
50unsafe impl Sync for Database {}
51
52impl Database {
53    #[cfg(feature = "audit-log")]
54    pub(crate) fn new(
55        manager: TxnManager,
56        data_path: PathBuf,
57        key_path: PathBuf,
58        audit_log: Option<AuditLog>,
59    ) -> Self {
60        Self {
61            manager,
62            data_path,
63            key_path,
64            audit_log: audit_log.map(parking_lot::Mutex::new),
65        }
66    }
67
68    #[cfg(not(feature = "audit-log"))]
69    pub(crate) fn new(manager: TxnManager, data_path: PathBuf, key_path: PathBuf) -> Self {
70        Self {
71            manager,
72            data_path,
73            key_path,
74        }
75    }
76
77    /// Begin a read-only transaction with snapshot isolation.
78    pub fn begin_read(&self) -> ReadTxn<'_> {
79        self.manager.begin_read()
80    }
81
82    /// Begin a read-write transaction. Only one can be active at a time.
83    pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
84        self.manager.begin_write()
85    }
86
87    /// Get database statistics from the current commit slot.
88    pub fn stats(&self) -> DbStats {
89        let slot = self.manager.current_slot();
90        DbStats {
91            tree_depth: slot.tree_depth,
92            entry_count: slot.tree_entries,
93            total_pages: slot.total_pages,
94            high_water_mark: slot.high_water_mark,
95            merkle_root: slot.merkle_root,
96        }
97    }
98
99    /// Path to the data file.
100    pub fn data_path(&self) -> &Path {
101        &self.data_path
102    }
103
104    /// Path to the key file.
105    pub fn key_path(&self) -> &Path {
106        &self.key_path
107    }
108
109    /// Number of currently active readers.
110    pub fn reader_count(&self) -> usize {
111        self.manager.reader_count()
112    }
113
114    /// Change the database passphrase (re-wraps REK, no page re-encryption).
115    pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
116        use citadel_crypto::kdf::{derive_mk, generate_salt};
117        use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
118
119        let key_data = fs::read(&self.key_path)?;
120        if key_data.len() != KEY_FILE_SIZE {
121            return Err(Error::Io(std::io::Error::new(
122                std::io::ErrorKind::InvalidData,
123                "key file has incorrect size",
124            )));
125        }
126        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
127        let kf = KeyFile::deserialize(&key_buf)?;
128
129        let old_mk = derive_mk(
130            kf.kdf_algorithm,
131            old_passphrase,
132            &kf.argon2_salt,
133            kf.argon2_m_cost,
134            kf.argon2_t_cost,
135            kf.argon2_p_cost,
136        )?;
137        kf.verify_mac(&old_mk)?;
138
139        let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
140
141        let new_salt = generate_salt();
142        let new_mk = derive_mk(
143            kf.kdf_algorithm,
144            new_passphrase,
145            &new_salt,
146            kf.argon2_m_cost,
147            kf.argon2_t_cost,
148            kf.argon2_p_cost,
149        )?;
150
151        let new_wrapped = wrap_rek(&new_mk, &rek);
152
153        let mut new_kf = kf.clone();
154        new_kf.argon2_salt = new_salt;
155        new_kf.wrapped_rek = new_wrapped;
156        new_kf.update_mac(&new_mk);
157
158        durable::atomic_write(&self.key_path, &new_kf.serialize())?;
159
160        #[cfg(feature = "audit-log")]
161        self.log_audit(AuditEventType::PassphraseChanged, &[]);
162
163        Ok(())
164    }
165
166    /// Run an integrity check on the database.
167    pub fn integrity_check(&self) -> Result<IntegrityReport> {
168        let report = self.manager.integrity_check()?;
169
170        #[cfg(feature = "audit-log")]
171        {
172            let error_count = report.errors.len() as u32;
173            self.log_audit(
174                AuditEventType::IntegrityCheckPerformed,
175                &error_count.to_le_bytes(),
176            );
177        }
178
179        Ok(report)
180    }
181
182    /// Create a hot backup via MVCC snapshot. Also copies the key file.
183    #[cfg(not(target_arch = "wasm32"))]
184    pub fn backup(&self, dest_path: &Path) -> Result<()> {
185        let dest_file = OpenOptions::new()
186            .read(true)
187            .write(true)
188            .create_new(true)
189            .open(dest_path)?;
190        let dest_io = MmapPageIO::try_new(dest_file)?;
191        self.manager.backup_to(&dest_io)?;
192
193        let dest_key_path = resolve_key_path_for(dest_path);
194        fs::copy(&self.key_path, &dest_key_path)?;
195
196        #[cfg(feature = "audit-log")]
197        self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
198
199        Ok(())
200    }
201
202    /// Export an encrypted key backup for disaster recovery.
203    ///
204    /// Requires the current database passphrase. The backup can later restore
205    /// access via `restore_key_from_backup` if the database passphrase is lost.
206    pub fn export_key_backup(
207        &self,
208        db_passphrase: &[u8],
209        backup_passphrase: &[u8],
210        dest_path: &Path,
211    ) -> Result<()> {
212        use citadel_crypto::kdf::derive_mk;
213        use citadel_crypto::key_backup::create_key_backup;
214        use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
215
216        let key_data = fs::read(&self.key_path)?;
217        if key_data.len() != KEY_FILE_SIZE {
218            return Err(Error::Io(std::io::Error::new(
219                std::io::ErrorKind::InvalidData,
220                "key file has incorrect size",
221            )));
222        }
223        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
224        let kf = KeyFile::deserialize(&key_buf)?;
225
226        let mk = derive_mk(
227            kf.kdf_algorithm,
228            db_passphrase,
229            &kf.argon2_salt,
230            kf.argon2_m_cost,
231            kf.argon2_t_cost,
232            kf.argon2_p_cost,
233        )?;
234        kf.verify_mac(&mk)?;
235
236        let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
237
238        let backup_data = create_key_backup(
239            &rek,
240            backup_passphrase,
241            kf.file_id,
242            kf.cipher_id,
243            kf.kdf_algorithm,
244            kf.argon2_m_cost,
245            kf.argon2_t_cost,
246            kf.argon2_p_cost,
247            kf.current_epoch,
248        )?;
249
250        durable::write_and_sync(dest_path, &backup_data)?;
251
252        #[cfg(feature = "audit-log")]
253        self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
254
255        Ok(())
256    }
257
258    /// Restore a key file from an encrypted backup (static - no `Database` needed).
259    ///
260    /// Unwraps the REK using `backup_passphrase`, then creates a new key file
261    /// protected by `new_db_passphrase`.
262    pub fn restore_key_from_backup(
263        backup_path: &Path,
264        backup_passphrase: &[u8],
265        new_db_passphrase: &[u8],
266        db_path: &Path,
267    ) -> Result<()> {
268        use citadel_core::{
269            KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
270        };
271        use citadel_crypto::kdf::{derive_mk, generate_salt};
272        use citadel_crypto::key_backup::restore_rek_from_backup;
273        use citadel_crypto::key_manager::wrap_rek;
274        use citadel_crypto::key_manager::KeyFile;
275
276        let backup_data = fs::read(backup_path)?;
277        if backup_data.len() != KEY_BACKUP_SIZE {
278            return Err(Error::Io(std::io::Error::new(
279                std::io::ErrorKind::InvalidData,
280                "backup file has incorrect size",
281            )));
282        }
283        let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
284
285        let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
286
287        let new_salt = generate_salt();
288        let new_mk = derive_mk(
289            restored.kdf_algorithm,
290            new_db_passphrase,
291            &new_salt,
292            restored.kdf_param1,
293            restored.kdf_param2,
294            restored.kdf_param3,
295        )?;
296
297        let new_wrapped = wrap_rek(&new_mk, &restored.rek);
298
299        let mut new_kf = KeyFile {
300            magic: KEY_FILE_MAGIC,
301            version: KEY_FILE_VERSION,
302            file_id: restored.file_id,
303            argon2_salt: new_salt,
304            argon2_m_cost: restored.kdf_param1,
305            argon2_t_cost: restored.kdf_param2,
306            argon2_p_cost: restored.kdf_param3,
307            cipher_id: restored.cipher_id,
308            kdf_algorithm: restored.kdf_algorithm,
309            wrapped_rek: new_wrapped,
310            current_epoch: restored.epoch,
311            prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
312            prev_epoch: 0,
313            rotation_active: false,
314            file_mac: [0u8; MAC_SIZE],
315        };
316        new_kf.update_mac(&new_mk);
317
318        let key_path = resolve_key_path_for(db_path);
319        durable::atomic_write(&key_path, &new_kf.serialize())?;
320
321        Ok(())
322    }
323
324    /// Compact the database into a new file. Also copies the key file.
325    #[cfg(not(target_arch = "wasm32"))]
326    pub fn compact(&self, dest_path: &Path) -> Result<()> {
327        let dest_file = OpenOptions::new()
328            .read(true)
329            .write(true)
330            .create_new(true)
331            .open(dest_path)?;
332        let dest_io = MmapPageIO::try_new(dest_file)?;
333        self.manager.compact_to(&dest_io)?;
334
335        let dest_key_path = resolve_key_path_for(dest_path);
336        fs::copy(&self.key_path, &dest_key_path)?;
337
338        #[cfg(feature = "audit-log")]
339        self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
340
341        Ok(())
342    }
343}
344
345impl Database {
346    #[doc(hidden)]
347    pub fn manager(&self) -> &TxnManager {
348        &self.manager
349    }
350
351    /// Path to the audit log file, if audit logging is enabled.
352    #[cfg(feature = "audit-log")]
353    pub fn audit_log_path(&self) -> Option<PathBuf> {
354        if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
355            Some(crate::audit::resolve_audit_path(&self.data_path))
356        } else {
357            None
358        }
359    }
360
361    /// Verify the audit log's HMAC chain integrity.
362    #[cfg(feature = "audit-log")]
363    pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
364        let audit = self
365            .audit_log
366            .as_ref()
367            .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
368        let guard = audit.lock();
369        let path = crate::audit::resolve_audit_path(&self.data_path);
370        crate::audit::verify_audit_log(&path, guard.audit_key())
371    }
372
373    #[cfg(feature = "audit-log")]
374    pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
375        if let Some(ref mutex) = self.audit_log {
376            let _ = mutex.lock().log(event_type, detail);
377        }
378    }
379
380    #[cfg(feature = "audit-log")]
381    fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
382        let path_str = path.to_string_lossy();
383        let path_bytes = path_str.as_bytes();
384        let len = (path_bytes.len() as u16).to_le_bytes();
385        let mut detail = Vec::with_capacity(2 + path_bytes.len());
386        detail.extend_from_slice(&len);
387        detail.extend_from_slice(path_bytes);
388        self.log_audit(event_type, &detail);
389    }
390}
391
392use citadel_sync::transport::SyncTransport;
393
394/// Outcome of a sync operation.
395#[derive(Debug, Clone)]
396pub struct SyncOutcome {
397    /// Per-table results: `(table_name, entries_applied)`.
398    pub tables_synced: Vec<(Vec<u8>, u64)>,
399    /// Default tree sync result (if performed).
400    pub default_tree: Option<citadel_sync::SyncOutcome>,
401}
402
403const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
404
405impl Database {
406    /// Get or create a persistent NodeId for this database.
407    pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
408        let mut rtx = self.manager.begin_read();
409        if let Some(data) = rtx.get(NODE_ID_KEY)? {
410            if data.len() == 8 {
411                return Ok(citadel_sync::NodeId::from_bytes(
412                    data[..8].try_into().unwrap(),
413                ));
414            }
415        }
416        drop(rtx);
417
418        let node_id = citadel_sync::NodeId::random();
419        let mut wtx = self.manager.begin_write()?;
420        wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
421        wtx.commit()?;
422        Ok(node_id)
423    }
424
425    /// Push local named tables to a remote peer.
426    pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
427        let node_id = self.node_id()?;
428        let transport =
429            citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
430        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
431            node_id,
432            direction: citadel_sync::SyncDirection::Push,
433            crdt_aware: false,
434        });
435
436        let results = session
437            .sync_tables_as_initiator(&self.manager, &transport)
438            .map_err(sync_err_to_core)?;
439
440        transport.close().map_err(sync_err_to_core)?;
441
442        Ok(SyncOutcome {
443            tables_synced: results
444                .into_iter()
445                .map(|(name, r)| (name, r.entries_applied))
446                .collect(),
447            default_tree: None,
448        })
449    }
450
451    /// Handle an incoming sync session from a remote peer.
452    pub fn handle_sync(
453        &self,
454        stream: std::net::TcpStream,
455        sync_key: &citadel_sync::SyncKey,
456    ) -> Result<SyncOutcome> {
457        let node_id = self.node_id()?;
458        let transport =
459            citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
460        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
461            node_id,
462            direction: citadel_sync::SyncDirection::Push,
463            crdt_aware: false,
464        });
465
466        let results = session
467            .handle_table_sync_as_responder(&self.manager, &transport)
468            .map_err(sync_err_to_core)?;
469
470        transport.close().map_err(sync_err_to_core)?;
471
472        Ok(SyncOutcome {
473            tables_synced: results
474                .into_iter()
475                .map(|(name, r)| (name, r.entries_applied))
476                .collect(),
477            default_tree: None,
478        })
479    }
480}
481
482fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
483    match e {
484        citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
485        other => Error::Sync(other.to_string()),
486    }
487}
488
489#[cfg(feature = "audit-log")]
490impl Drop for Database {
491    fn drop(&mut self) {
492        self.log_audit(AuditEventType::DatabaseClosed, &[]);
493    }
494}
495
496/// `{data_path}.citadel-keys`
497fn resolve_key_path_for(data_path: &Path) -> PathBuf {
498    let mut name = data_path.as_os_str().to_os_string();
499    name.push(".citadel-keys");
500    PathBuf::from(name)
501}