Skip to main content

citadel/
database.rs

1use std::any::Any;
2use std::fs;
3#[cfg(not(target_arch = "wasm32"))]
4use std::fs::OpenOptions;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use citadel_core::{Error, Result, KEY_FILE_SIZE, KEY_SIZE, MERKLE_HASH_SIZE, WRAPPED_KEY_SIZE};
9use citadel_crypto::hkdf_utils::RegionWrapKeys;
10use citadel_io::durable;
11#[cfg(not(target_arch = "wasm32"))]
12use citadel_io::mmap_io::MmapPageIO;
13use citadel_txn::integrity::IntegrityReport;
14use citadel_txn::manager::TxnManager;
15use citadel_txn::read_txn::ReadTxn;
16use citadel_txn::write_txn::WriteTxn;
17use parking_lot::Mutex;
18use rustc_hash::FxHashMap;
19
20use crate::atom_store::AtomKeyStore;
21#[cfg(feature = "audit-log")]
22use crate::audit::{AuditEventType, AuditLog};
23use crate::key_codec::SlotRecord;
24use crate::region_store::RegionKeyStore;
25
26/// Type-erased cache of `Arc<T>` entries shared across connections to one DB.
27pub type SharedCache = Mutex<FxHashMap<String, Arc<dyn Any + Send + Sync>>>;
28
29/// Cloneable handle to the per-Database shared cache.
30pub type SqlCacheHandle = Arc<SharedCache>;
31
32/// Database statistics read from the current commit slot.
33#[derive(Debug, Clone)]
34pub struct DbStats {
35    pub tree_depth: u16,
36    pub entry_count: u64,
37    pub total_pages: u32,
38    pub high_water_mark: u32,
39    pub merkle_root: [u8; MERKLE_HASH_SIZE],
40}
41
42/// An open Citadel database (`Send + Sync`).
43///
44/// Exclusively locks the database file for its lifetime.
45pub struct Database {
46    manager: TxnManager,
47    data_path: PathBuf,
48    key_path: PathBuf,
49    /// Database file_id (from the file header), binding the region key store.
50    file_id: u64,
51    #[cfg(feature = "audit-log")]
52    audit_log: Option<Mutex<AuditLog>>,
53    /// Shared cache for higher-level crates (e.g. citadel-sql ANN indexes).
54    /// Held here so it spans all connections without a dependency cycle.
55    sql_caches: Arc<SharedCache>,
56    /// Region wrap keys for per-region cryptographic erasure (citadel-mem).
57    /// `Some` only when the builder enabled region keys; derived from the REK
58    /// and zeroized on drop. The raw REK is never retained here.
59    region_keys: Option<RegionWrapKeys>,
60    /// Sidecar region key store (lazy); shared by every `MemoryEngine` over this db.
61    region_store: Mutex<Option<RegionKeyStore>>,
62    /// Sidecar per-atom key store (lazy); holds each atom's wrapped ACK.
63    atom_store: Mutex<Option<AtomKeyStore>>,
64}
65
66impl std::fmt::Debug for Database {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("Database")
69            .field("data_path", &self.data_path)
70            .field("key_path", &self.key_path)
71            .finish()
72    }
73}
74
75// TxnManager is internally synchronized (Mutex + Atomic)
76unsafe impl Send for Database {}
77unsafe impl Sync for Database {}
78
79impl Database {
80    #[cfg(feature = "audit-log")]
81    pub(crate) fn new(
82        manager: TxnManager,
83        data_path: PathBuf,
84        key_path: PathBuf,
85        file_id: u64,
86        region_keys: Option<RegionWrapKeys>,
87        audit_log: Option<AuditLog>,
88    ) -> Self {
89        Self {
90            manager,
91            data_path,
92            key_path,
93            file_id,
94            audit_log: audit_log.map(Mutex::new),
95            sql_caches: Arc::new(Mutex::new(FxHashMap::default())),
96            region_keys,
97            region_store: Mutex::new(None),
98            atom_store: Mutex::new(None),
99        }
100    }
101
102    #[cfg(not(feature = "audit-log"))]
103    pub(crate) fn new(
104        manager: TxnManager,
105        data_path: PathBuf,
106        key_path: PathBuf,
107        file_id: u64,
108        region_keys: Option<RegionWrapKeys>,
109    ) -> Self {
110        Self {
111            manager,
112            data_path,
113            key_path,
114            file_id,
115            sql_caches: Arc::new(Mutex::new(FxHashMap::default())),
116            region_keys,
117            region_store: Mutex::new(None),
118            atom_store: Mutex::new(None),
119        }
120    }
121
122    /// Fetch a typed entry from the shared SQL cache.
123    /// Returns `None` if the key is missing or stored under a different type.
124    pub fn sql_cache_get<T: Any + Send + Sync>(&self, key: &str) -> Option<Arc<T>> {
125        let guard = self.sql_caches.lock();
126        let entry = guard.get(key)?;
127        Arc::clone(entry).downcast::<T>().ok()
128    }
129
130    /// Insert (or overwrite) a typed entry in the shared SQL cache.
131    pub fn sql_cache_insert<T: Any + Send + Sync>(&self, key: String, value: Arc<T>) {
132        self.sql_caches.lock().insert(key, value);
133    }
134
135    /// Remove every entry whose key starts with `prefix`.
136    /// Returns the number of entries removed.
137    pub fn sql_cache_invalidate_prefix(&self, prefix: &str) -> usize {
138        let mut guard = self.sql_caches.lock();
139        let before = guard.len();
140        guard.retain(|k, _| !k.starts_with(prefix));
141        before - guard.len()
142    }
143
144    /// Total number of cache entries (test/diagnostics helper).
145    pub fn sql_cache_len(&self) -> usize {
146        self.sql_caches.lock().len()
147    }
148
149    /// Cloneable handle to the shared cache.
150    pub fn sql_cache_handle(&self) -> SqlCacheHandle {
151        Arc::clone(&self.sql_caches)
152    }
153
154    /// Begin a read-only transaction with snapshot isolation.
155    pub fn begin_read(&self) -> ReadTxn<'_> {
156        self.manager.begin_read()
157    }
158
159    /// Begin a read-write transaction. Only one can be active at a time.
160    pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
161        self.manager.begin_write()
162    }
163
164    /// Get database statistics from the current commit slot.
165    pub fn stats(&self) -> DbStats {
166        let slot = self.manager.current_slot();
167        DbStats {
168            tree_depth: slot.tree_depth,
169            entry_count: slot.tree_entries,
170            total_pages: slot.total_pages,
171            high_water_mark: slot.high_water_mark,
172            merkle_root: slot.merkle_root,
173        }
174    }
175
176    pub fn data_path(&self) -> &Path {
177        &self.data_path
178    }
179
180    pub fn key_path(&self) -> &Path {
181        &self.key_path
182    }
183
184    /// Database file identifier from the file header. citadel-mem binds the
185    /// region key store to this value so a mismatched sidecar is rejected.
186    pub fn file_id(&self) -> u64 {
187        self.file_id
188    }
189
190    /// Whether per-region cryptographic erasure keys are available.
191    /// `true` only when the database was opened with `enable_region_keys(true)`.
192    pub fn region_keys_enabled(&self) -> bool {
193        self.region_keys.is_some()
194    }
195
196    /// Wrap a region's random content key (RCK) under the region KEK (AES-256-KW).
197    /// The 40-byte result is the sole copy of the RCK; citadel-mem stores it in the
198    /// sidecar key store and overwrites it in place to erase the region.
199    pub fn wrap_region_key(&self, rck: &[u8; KEY_SIZE]) -> Result<[u8; WRAPPED_KEY_SIZE]> {
200        self.region_keys
201            .as_ref()
202            .map(|rk| rk.wrap_region_key(rck))
203            .ok_or(Error::RegionKeysDisabled)
204    }
205
206    /// Unwrap a region content key. Fails if the slot was erased (zeroed wrap).
207    pub fn unwrap_region_key(&self, wrapped: &[u8; WRAPPED_KEY_SIZE]) -> Result<[u8; KEY_SIZE]> {
208        self.region_keys
209            .as_ref()
210            .ok_or(Error::RegionKeysDisabled)?
211            .unwrap_region_key(wrapped)
212    }
213
214    /// HMAC key authenticating the region key store's header and slots
215    /// (torn-write detection only; RCK secrecy is protected by AES-KW).
216    pub fn region_store_mac_key(&self) -> Result<[u8; KEY_SIZE]> {
217        self.region_keys
218            .as_ref()
219            .map(|rk| rk.store_mac_key)
220            .ok_or(Error::RegionKeysDisabled)
221    }
222
223    /// Path to the sidecar region key store, `{key_path}` with the
224    /// `citadel-regions` extension. Pure path math; valid even when region keys
225    /// are disabled (the file only exists once an encrypted region is created).
226    pub fn region_store_path(&self) -> PathBuf {
227        region_store_path_for(&self.key_path)
228    }
229
230    /// Run `f` against the lazily-opened sidecar store under its lock.
231    fn with_region_store<T>(&self, f: impl FnOnce(&mut RegionKeyStore) -> Result<T>) -> Result<T> {
232        let mut guard = self.region_store.lock();
233        if guard.is_none() {
234            let mac_key = self.region_store_mac_key()?;
235            *guard = Some(RegionKeyStore::create_or_open(
236                &self.region_store_path(),
237                self.file_id,
238                mac_key,
239            )?);
240        }
241        f(guard.as_mut().expect("region store initialized above"))
242    }
243
244    /// Allocate a slot and store the wrapped RCK (fsync'd); returns `(slot, gen)`.
245    pub fn region_store_allocate_write(
246        &self,
247        region_id: u64,
248        wrapped: &[u8; WRAPPED_KEY_SIZE],
249    ) -> Result<(u32, u64)> {
250        self.with_region_store(|s| {
251            let slot = s.allocate_slot()?;
252            let gen = s.write_live(slot, region_id, wrapped)?;
253            Ok((slot, gen))
254        })
255    }
256
257    /// The authoritative record of region key `slot`.
258    pub fn region_store_slot(&self, slot: u32) -> Result<SlotRecord> {
259        self.with_region_store(|s| s.read_slot(slot))
260    }
261
262    /// Cryptographically erase region key `slot` (no-op if already erased).
263    pub fn region_store_tombstone(&self, slot: u32, region_id: u64) -> Result<()> {
264        self.with_region_store(|s| s.tombstone(slot, region_id))
265    }
266
267    /// `(slot, region_id)` for every LIVE region key slot.
268    pub fn region_store_live_owners(&self) -> Result<Vec<(u32, u64)>> {
269        self.with_region_store(|s| s.live_owners())
270    }
271
272    /// Path to the sidecar per-atom key store, `{key_path}` with the `citadel-atomkeys`
273    /// extension. Pure path math; the file only exists once an encrypted atom is written.
274    pub fn atom_store_path(&self) -> PathBuf {
275        atom_store_path_for(&self.key_path)
276    }
277
278    /// Run `f` against the lazily-opened atom key store under its lock.
279    fn with_atom_store<T>(&self, f: impl FnOnce(&mut AtomKeyStore) -> Result<T>) -> Result<T> {
280        let mut guard = self.atom_store.lock();
281        if guard.is_none() {
282            let mac_key = self.region_store_mac_key()?;
283            *guard = Some(AtomKeyStore::create_or_open(
284                &self.atom_store_path(),
285                self.file_id,
286                mac_key,
287            )?);
288        }
289        f(guard.as_mut().expect("atom store initialized above"))
290    }
291
292    /// Allocate a slot and store one atom's wrapped ACK (fsync'd); returns `(slot, gen)`.
293    pub fn atom_store_allocate_write(
294        &self,
295        atom_id: u64,
296        wrapped: &[u8; WRAPPED_KEY_SIZE],
297    ) -> Result<(u32, u64)> {
298        self.with_atom_store(|s| {
299            let slot = s.allocate_slot()?;
300            let gen = s.write_live(slot, atom_id, wrapped)?;
301            Ok((slot, gen))
302        })
303    }
304
305    /// Allocate and durably write a batch of `(atom_id, wrapped)` ACKs with ONE fsync;
306    /// returns `(slot, gen)` per item in order.
307    pub fn atom_store_allocate_batch(
308        &self,
309        items: &[(u64, [u8; WRAPPED_KEY_SIZE])],
310    ) -> Result<Vec<(u32, u64)>> {
311        if items.is_empty() {
312            return Ok(Vec::new());
313        }
314        self.with_atom_store(|s| {
315            let slots = s.allocate_batch(items.len())?;
316            let writes: Vec<(u32, u64, [u8; WRAPPED_KEY_SIZE])> = slots
317                .iter()
318                .zip(items)
319                .map(|(&slot, (atom_id, wrapped))| (slot, *atom_id, *wrapped))
320                .collect();
321            let gens = s.write_live_batch(&writes)?;
322            Ok(slots.into_iter().zip(gens).collect())
323        })
324    }
325
326    /// The authoritative record of atom key `slot` (its wrapped ACK and state).
327    pub fn atom_store_slot(&self, slot: u32) -> Result<SlotRecord> {
328        self.with_atom_store(|s| s.read_slot(slot))
329    }
330
331    /// Cryptographically erase atom key `slot` (no-op if already erased).
332    pub fn atom_store_tombstone(&self, slot: u32, atom_id: u64) -> Result<()> {
333        self.with_atom_store(|s| s.tombstone(slot, atom_id))
334    }
335
336    /// Erase a batch of atom key slots with two fsyncs total (not 2N). Items are `(slot, atom_id)`.
337    pub fn atom_store_tombstone_batch(&self, items: &[(u32, u64)]) -> Result<()> {
338        self.with_atom_store(|s| s.tombstone_batch(items))
339    }
340
341    /// Every LIVE atom key's `atom_id -> wrapped ACK`, in one whole-file pass.
342    pub fn atom_store_live_wrapped(&self) -> Result<FxHashMap<u64, [u8; WRAPPED_KEY_SIZE]>> {
343        self.with_atom_store(|s| s.live_wrapped())
344    }
345
346    /// `(slot, atom_id)` for every LIVE atom key slot.
347    pub fn atom_store_live_owners(&self) -> Result<Vec<(u32, u64)>> {
348        self.with_atom_store(|s| s.live_owners())
349    }
350
351    /// Number of currently active readers.
352    pub fn reader_count(&self) -> usize {
353        self.manager.reader_count()
354    }
355
356    /// Change the database passphrase (re-wraps REK, no page re-encryption).
357    pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
358        use citadel_crypto::kdf::{derive_mk, generate_salt};
359        use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
360
361        let key_data = fs::read(&self.key_path)?;
362        if key_data.len() != KEY_FILE_SIZE {
363            return Err(Error::Io(std::io::Error::new(
364                std::io::ErrorKind::InvalidData,
365                "key file has incorrect size",
366            )));
367        }
368        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
369        let kf = KeyFile::deserialize(&key_buf)?;
370
371        let old_mk = derive_mk(
372            kf.kdf_algorithm,
373            old_passphrase,
374            &kf.argon2_salt,
375            kf.argon2_m_cost,
376            kf.argon2_t_cost,
377            kf.argon2_p_cost,
378        )?;
379        kf.verify_mac(&old_mk)?;
380
381        let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
382
383        let new_salt = generate_salt();
384        let new_mk = derive_mk(
385            kf.kdf_algorithm,
386            new_passphrase,
387            &new_salt,
388            kf.argon2_m_cost,
389            kf.argon2_t_cost,
390            kf.argon2_p_cost,
391        )?;
392
393        let new_wrapped = wrap_rek(&new_mk, &rek);
394
395        let mut new_kf = kf.clone();
396        new_kf.argon2_salt = new_salt;
397        new_kf.wrapped_rek = new_wrapped;
398        new_kf.update_mac(&new_mk);
399
400        durable::atomic_write(&self.key_path, &new_kf.serialize())?;
401
402        #[cfg(feature = "audit-log")]
403        self.log_audit(AuditEventType::PassphraseChanged, &[]);
404
405        Ok(())
406    }
407
408    pub fn integrity_check(&self) -> Result<IntegrityReport> {
409        let report = self.manager.integrity_check()?;
410
411        #[cfg(feature = "audit-log")]
412        {
413            let error_count = report.errors.len() as u32;
414            self.log_audit(
415                AuditEventType::IntegrityCheckPerformed,
416                &error_count.to_le_bytes(),
417            );
418        }
419
420        Ok(report)
421    }
422
423    /// Create a hot backup via MVCC snapshot. Also copies the key file.
424    #[cfg(not(target_arch = "wasm32"))]
425    pub fn backup(&self, dest_path: &Path) -> Result<()> {
426        let dest_file = OpenOptions::new()
427            .read(true)
428            .write(true)
429            .create_new(true)
430            .open(dest_path)?;
431        let dest_io = MmapPageIO::try_new(dest_file)?;
432        self.manager.backup_to(&dest_io)?;
433
434        let dest_key_path = resolve_key_path_for(dest_path);
435        fs::copy(&self.key_path, &dest_key_path)?;
436        self.copy_region_store_to(&dest_key_path)?;
437
438        #[cfg(feature = "audit-log")]
439        self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
440
441        Ok(())
442    }
443
444    /// Export an encrypted key backup for disaster recovery.
445    ///
446    /// Requires the current database passphrase. The backup can later restore
447    /// access via `restore_key_from_backup` if the database passphrase is lost.
448    pub fn export_key_backup(
449        &self,
450        db_passphrase: &[u8],
451        backup_passphrase: &[u8],
452        dest_path: &Path,
453    ) -> Result<()> {
454        use citadel_crypto::kdf::derive_mk;
455        use citadel_crypto::key_backup::create_key_backup;
456        use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
457
458        let key_data = fs::read(&self.key_path)?;
459        if key_data.len() != KEY_FILE_SIZE {
460            return Err(Error::Io(std::io::Error::new(
461                std::io::ErrorKind::InvalidData,
462                "key file has incorrect size",
463            )));
464        }
465        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
466        let kf = KeyFile::deserialize(&key_buf)?;
467
468        let mk = derive_mk(
469            kf.kdf_algorithm,
470            db_passphrase,
471            &kf.argon2_salt,
472            kf.argon2_m_cost,
473            kf.argon2_t_cost,
474            kf.argon2_p_cost,
475        )?;
476        kf.verify_mac(&mk)?;
477
478        let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
479
480        let backup_data = create_key_backup(
481            &rek,
482            backup_passphrase,
483            kf.file_id,
484            kf.cipher_id,
485            kf.kdf_algorithm,
486            kf.argon2_m_cost,
487            kf.argon2_t_cost,
488            kf.argon2_p_cost,
489            kf.current_epoch,
490        )?;
491
492        durable::write_and_sync(dest_path, &backup_data)?;
493
494        #[cfg(feature = "audit-log")]
495        self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
496
497        Ok(())
498    }
499
500    /// Restore a key file from an encrypted backup (static - no `Database` needed).
501    ///
502    /// Unwraps the REK using `backup_passphrase`, then creates a new key file
503    /// protected by `new_db_passphrase`.
504    pub fn restore_key_from_backup(
505        backup_path: &Path,
506        backup_passphrase: &[u8],
507        new_db_passphrase: &[u8],
508        db_path: &Path,
509    ) -> Result<()> {
510        use citadel_core::{
511            KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
512        };
513        use citadel_crypto::kdf::{derive_mk, generate_salt};
514        use citadel_crypto::key_backup::restore_rek_from_backup;
515        use citadel_crypto::key_manager::wrap_rek;
516        use citadel_crypto::key_manager::KeyFile;
517
518        let backup_data = fs::read(backup_path)?;
519        if backup_data.len() != KEY_BACKUP_SIZE {
520            return Err(Error::Io(std::io::Error::new(
521                std::io::ErrorKind::InvalidData,
522                "backup file has incorrect size",
523            )));
524        }
525        let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
526
527        let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
528
529        let new_salt = generate_salt();
530        let new_mk = derive_mk(
531            restored.kdf_algorithm,
532            new_db_passphrase,
533            &new_salt,
534            restored.kdf_param1,
535            restored.kdf_param2,
536            restored.kdf_param3,
537        )?;
538
539        let new_wrapped = wrap_rek(&new_mk, &restored.rek);
540
541        let mut new_kf = KeyFile {
542            magic: KEY_FILE_MAGIC,
543            version: KEY_FILE_VERSION,
544            file_id: restored.file_id,
545            argon2_salt: new_salt,
546            argon2_m_cost: restored.kdf_param1,
547            argon2_t_cost: restored.kdf_param2,
548            argon2_p_cost: restored.kdf_param3,
549            cipher_id: restored.cipher_id,
550            kdf_algorithm: restored.kdf_algorithm,
551            wrapped_rek: new_wrapped,
552            current_epoch: restored.epoch,
553            prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
554            prev_epoch: 0,
555            rotation_active: false,
556            file_mac: [0u8; MAC_SIZE],
557        };
558        new_kf.update_mac(&new_mk);
559
560        let key_path = resolve_key_path_for(db_path);
561        durable::atomic_write(&key_path, &new_kf.serialize())?;
562
563        Ok(())
564    }
565
566    /// Compact the database into a new file. Also copies the key file.
567    #[cfg(not(target_arch = "wasm32"))]
568    pub fn compact(&self, dest_path: &Path) -> Result<()> {
569        let dest_file = OpenOptions::new()
570            .read(true)
571            .write(true)
572            .create_new(true)
573            .open(dest_path)?;
574        let dest_io = MmapPageIO::try_new(dest_file)?;
575        self.manager.compact_to(&dest_io)?;
576
577        let dest_key_path = resolve_key_path_for(dest_path);
578        fs::copy(&self.key_path, &dest_key_path)?;
579        self.copy_region_store_to(&dest_key_path)?;
580
581        #[cfg(feature = "audit-log")]
582        self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
583
584        Ok(())
585    }
586
587    /// Copy the sidecar region key store next to `dest_key_path`, if it exists.
588    ///
589    /// A backup/compaction must carry the wrapped region keys so encrypted
590    /// regions remain openable from the copy. Note: a backup taken while a
591    /// region is live retains a recoverable key; `forget` cannot reach it, so
592    /// backup retention is the operator's responsibility (see `region_store_path`).
593    #[cfg(not(target_arch = "wasm32"))]
594    fn copy_region_store_to(&self, dest_key_path: &Path) -> Result<()> {
595        let src = self.region_store_path();
596        if src.exists() {
597            let dest = region_store_path_for(dest_key_path);
598            fs::copy(&src, &dest)?;
599        }
600        let atom_src = self.atom_store_path();
601        if atom_src.exists() {
602            fs::copy(&atom_src, atom_store_path_for(dest_key_path))?;
603        }
604        Ok(())
605    }
606}
607
608impl Database {
609    #[doc(hidden)]
610    pub fn manager(&self) -> &TxnManager {
611        &self.manager
612    }
613
614    /// Path to the audit log file, if audit logging is enabled.
615    #[cfg(feature = "audit-log")]
616    pub fn audit_log_path(&self) -> Option<PathBuf> {
617        if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
618            Some(crate::audit::resolve_audit_path(&self.data_path))
619        } else {
620            None
621        }
622    }
623
624    /// Verify the audit log's HMAC chain integrity.
625    #[cfg(feature = "audit-log")]
626    pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
627        let audit = self
628            .audit_log
629            .as_ref()
630            .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
631        let guard = audit.lock();
632        let path = crate::audit::resolve_audit_path(&self.data_path);
633        crate::audit::verify_audit_log(&path, guard.audit_key())
634    }
635
636    #[cfg(feature = "audit-log")]
637    pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
638        if let Some(ref mutex) = self.audit_log {
639            let _ = mutex.lock().log(event_type, detail);
640        }
641    }
642
643    #[cfg(feature = "audit-log")]
644    fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
645        let path_str = path.to_string_lossy();
646        let path_bytes = path_str.as_bytes();
647        let len = (path_bytes.len() as u16).to_le_bytes();
648        let mut detail = Vec::with_capacity(2 + path_bytes.len());
649        detail.extend_from_slice(&len);
650        detail.extend_from_slice(path_bytes);
651        self.log_audit(event_type, &detail);
652    }
653}
654
655use citadel_sync::transport::SyncTransport;
656
657/// Outcome of a sync operation.
658#[derive(Debug, Clone)]
659pub struct SyncOutcome {
660    /// Per-table results: `(table_name, entries_applied)`.
661    pub tables_synced: Vec<(Vec<u8>, u64)>,
662    /// Default tree sync result (if performed).
663    pub default_tree: Option<citadel_sync::SyncOutcome>,
664}
665
666const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
667
668impl Database {
669    /// Get or create a persistent NodeId for this database.
670    pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
671        let mut rtx = self.manager.begin_read();
672        if let Some(data) = rtx.get(NODE_ID_KEY)? {
673            if data.len() == 8 {
674                return Ok(citadel_sync::NodeId::from_bytes(
675                    data[..8].try_into().unwrap(),
676                ));
677            }
678        }
679        drop(rtx);
680
681        let node_id = citadel_sync::NodeId::random();
682        let mut wtx = self.manager.begin_write()?;
683        wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
684        wtx.commit()?;
685        Ok(node_id)
686    }
687
688    /// Push local named tables to a remote peer.
689    pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
690        let node_id = self.node_id()?;
691        let transport =
692            citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
693        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
694            node_id,
695            direction: citadel_sync::SyncDirection::Push,
696            crdt_aware: false,
697        });
698
699        let results = session
700            .sync_tables_as_initiator(&self.manager, &transport)
701            .map_err(sync_err_to_core)?;
702
703        transport.close().map_err(sync_err_to_core)?;
704
705        Ok(SyncOutcome {
706            tables_synced: results
707                .into_iter()
708                .map(|(name, r)| (name, r.entries_applied))
709                .collect(),
710            default_tree: None,
711        })
712    }
713
714    /// Handle an incoming sync session from a remote peer.
715    pub fn handle_sync(
716        &self,
717        stream: std::net::TcpStream,
718        sync_key: &citadel_sync::SyncKey,
719    ) -> Result<SyncOutcome> {
720        let node_id = self.node_id()?;
721        let transport =
722            citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
723        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
724            node_id,
725            direction: citadel_sync::SyncDirection::Push,
726            crdt_aware: false,
727        });
728
729        let results = session
730            .handle_table_sync_as_responder(&self.manager, &transport)
731            .map_err(sync_err_to_core)?;
732
733        transport.close().map_err(sync_err_to_core)?;
734
735        Ok(SyncOutcome {
736            tables_synced: results
737                .into_iter()
738                .map(|(name, r)| (name, r.entries_applied))
739                .collect(),
740            default_tree: None,
741        })
742    }
743}
744
745fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
746    match e {
747        citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
748        other => Error::Sync(other.to_string()),
749    }
750}
751
752#[cfg(feature = "audit-log")]
753impl Drop for Database {
754    fn drop(&mut self) {
755        self.log_audit(AuditEventType::DatabaseClosed, &[]);
756    }
757}
758
759/// `{data_path}.citadel-keys`
760fn resolve_key_path_for(data_path: &Path) -> PathBuf {
761    let mut name = data_path.as_os_str().to_os_string();
762    name.push(".citadel-keys");
763    PathBuf::from(name)
764}
765
766/// Sidecar region key store path: `key_path` with the `citadel-regions` extension,
767/// e.g. `mydb.citadel.citadel-keys` -> `mydb.citadel.citadel-regions`.
768fn region_store_path_for(key_path: &Path) -> PathBuf {
769    key_path.with_extension("citadel-regions")
770}
771
772/// Sidecar atom key store path: `key_path` with the `citadel-atomkeys` extension.
773fn atom_store_path_for(key_path: &Path) -> PathBuf {
774    key_path.with_extension("citadel-atomkeys")
775}
776
777#[cfg(test)]
778mod sql_cache_tests {
779    use super::*;
780    use crate::builder::DatabaseBuilder;
781    use citadel_core::types::Argon2Profile;
782
783    fn open_db(dir: &Path) -> Database {
784        DatabaseBuilder::new(dir.join("test.db"))
785            .passphrase(b"x")
786            .argon2_profile(Argon2Profile::Iot)
787            .create()
788            .unwrap()
789    }
790
791    #[derive(Debug, PartialEq)]
792    struct Marker(u32);
793
794    #[test]
795    fn insert_then_get_round_trips() {
796        let dir = tempfile::tempdir().unwrap();
797        let db = open_db(dir.path());
798        db.sql_cache_insert("k".to_string(), Arc::new(Marker(42)));
799        let got = db.sql_cache_get::<Marker>("k").unwrap();
800        assert_eq!(*got, Marker(42));
801    }
802
803    #[test]
804    fn get_missing_returns_none() {
805        let dir = tempfile::tempdir().unwrap();
806        let db = open_db(dir.path());
807        assert!(db.sql_cache_get::<Marker>("missing").is_none());
808    }
809
810    #[test]
811    fn get_wrong_type_returns_none() {
812        let dir = tempfile::tempdir().unwrap();
813        let db = open_db(dir.path());
814        db.sql_cache_insert("k".to_string(), Arc::new(Marker(1)));
815        assert!(db.sql_cache_get::<String>("k").is_none());
816    }
817
818    #[test]
819    fn insert_overwrites_existing_entry() {
820        let dir = tempfile::tempdir().unwrap();
821        let db = open_db(dir.path());
822        db.sql_cache_insert("k".to_string(), Arc::new(Marker(1)));
823        db.sql_cache_insert("k".to_string(), Arc::new(Marker(2)));
824        assert_eq!(*db.sql_cache_get::<Marker>("k").unwrap(), Marker(2));
825    }
826
827    #[test]
828    fn invalidate_prefix_removes_matching_keys() {
829        let dir = tempfile::tempdir().unwrap();
830        let db = open_db(dir.path());
831        db.sql_cache_insert("ann:t1:ix_v".to_string(), Arc::new(Marker(1)));
832        db.sql_cache_insert("ann:t1:ix_w".to_string(), Arc::new(Marker(2)));
833        db.sql_cache_insert("ann:t2:ix_v".to_string(), Arc::new(Marker(3)));
834        db.sql_cache_insert("other:x".to_string(), Arc::new(Marker(4)));
835
836        let removed = db.sql_cache_invalidate_prefix("ann:t1:");
837        assert_eq!(removed, 2);
838        assert!(db.sql_cache_get::<Marker>("ann:t1:ix_v").is_none());
839        assert!(db.sql_cache_get::<Marker>("ann:t1:ix_w").is_none());
840        assert!(db.sql_cache_get::<Marker>("ann:t2:ix_v").is_some());
841        assert!(db.sql_cache_get::<Marker>("other:x").is_some());
842    }
843
844    #[test]
845    fn invalidate_prefix_no_match_returns_zero() {
846        let dir = tempfile::tempdir().unwrap();
847        let db = open_db(dir.path());
848        db.sql_cache_insert("a:1".to_string(), Arc::new(Marker(1)));
849        assert_eq!(db.sql_cache_invalidate_prefix("z:"), 0);
850        assert_eq!(db.sql_cache_len(), 1);
851    }
852
853    #[test]
854    fn shared_arc_observed_by_two_borrows() {
855        let dir = tempfile::tempdir().unwrap();
856        let db = open_db(dir.path());
857        let value = Arc::new(Marker(7));
858        db.sql_cache_insert("k".to_string(), Arc::clone(&value));
859        let a = db.sql_cache_get::<Marker>("k").unwrap();
860        let b = db.sql_cache_get::<Marker>("k").unwrap();
861        assert!(Arc::ptr_eq(&a, &b));
862        assert!(Arc::ptr_eq(&a, &value));
863    }
864}