Skip to main content

citadel/
database.rs

1use std::any::Any;
2use std::fs;
3#[cfg(not(target_arch = "wasm32"))]
4use std::fs::OpenOptions;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use citadel_core::{Error, Result, KEY_FILE_SIZE, KEY_SIZE, MERKLE_HASH_SIZE, WRAPPED_KEY_SIZE};
9use citadel_crypto::hkdf_utils::RegionWrapKeys;
10use citadel_io::durable;
11#[cfg(not(target_arch = "wasm32"))]
12use citadel_io::mmap_io::MmapPageIO;
13use citadel_txn::integrity::IntegrityReport;
14use citadel_txn::manager::TxnManager;
15use citadel_txn::read_txn::ReadTxn;
16use citadel_txn::write_txn::WriteTxn;
17use parking_lot::Mutex;
18use rustc_hash::FxHashMap;
19
20use crate::atom_store::AtomKeyStore;
21#[cfg(feature = "audit-log")]
22use crate::audit::{AuditEventType, AuditLog};
23use crate::key_codec::SlotRecord;
24use crate::region_store::RegionKeyStore;
25
26/// Type-erased cache of `Arc<T>` entries shared across connections to one DB.
27pub type SharedCache = Mutex<FxHashMap<String, Arc<dyn Any + Send + Sync>>>;
28
29/// Cloneable handle to the per-Database shared cache.
30pub type SqlCacheHandle = Arc<SharedCache>;
31
32/// Database statistics read from the current commit slot.
33#[derive(Debug, Clone)]
34pub struct DbStats {
35    pub tree_depth: u16,
36    pub entry_count: u64,
37    pub total_pages: u32,
38    pub high_water_mark: u32,
39    pub merkle_root: [u8; MERKLE_HASH_SIZE],
40}
41
42/// An open Citadel database (`Send + Sync`).
43///
44/// Exclusively locks the database file for its lifetime.
45pub struct Database {
46    manager: TxnManager,
47    data_path: PathBuf,
48    key_path: PathBuf,
49    /// Database file_id (from the file header), binding the region key store.
50    file_id: u64,
51    #[cfg(feature = "audit-log")]
52    audit_log: Option<Mutex<AuditLog>>,
53    /// Shared cache for higher-level crates (e.g. citadel-sql ANN indexes).
54    /// Held here so it spans all connections without a dependency cycle.
55    sql_caches: Arc<SharedCache>,
56    /// Region wrap keys for per-region cryptographic erasure (citadel-mem).
57    /// `Some` only when the builder enabled region keys; derived from the REK
58    /// and zeroized on drop. The raw REK is never retained here.
59    region_keys: Option<RegionWrapKeys>,
60    /// Sidecar region key store (lazy); shared by every `MemoryEngine` over this db.
61    region_store: Mutex<Option<RegionKeyStore>>,
62    /// Sidecar per-atom key store (lazy); holds each atom's wrapped ACK.
63    atom_store: Mutex<Option<AtomKeyStore>>,
64}
65
66impl std::fmt::Debug for Database {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("Database")
69            .field("data_path", &self.data_path)
70            .field("key_path", &self.key_path)
71            .finish()
72    }
73}
74
75// TxnManager is internally synchronized (Mutex + Atomic)
76unsafe impl Send for Database {}
77unsafe impl Sync for Database {}
78
79impl Database {
80    #[cfg(feature = "audit-log")]
81    pub(crate) fn new(
82        manager: TxnManager,
83        data_path: PathBuf,
84        key_path: PathBuf,
85        file_id: u64,
86        region_keys: Option<RegionWrapKeys>,
87        audit_log: Option<AuditLog>,
88    ) -> Self {
89        Self {
90            manager,
91            data_path,
92            key_path,
93            file_id,
94            audit_log: audit_log.map(Mutex::new),
95            sql_caches: Arc::new(Mutex::new(FxHashMap::default())),
96            region_keys,
97            region_store: Mutex::new(None),
98            atom_store: Mutex::new(None),
99        }
100    }
101
102    #[cfg(not(feature = "audit-log"))]
103    pub(crate) fn new(
104        manager: TxnManager,
105        data_path: PathBuf,
106        key_path: PathBuf,
107        file_id: u64,
108        region_keys: Option<RegionWrapKeys>,
109    ) -> Self {
110        Self {
111            manager,
112            data_path,
113            key_path,
114            file_id,
115            sql_caches: Arc::new(Mutex::new(FxHashMap::default())),
116            region_keys,
117            region_store: Mutex::new(None),
118            atom_store: Mutex::new(None),
119        }
120    }
121
122    /// Fetch a typed entry from the shared SQL cache.
123    /// Returns `None` if the key is missing or stored under a different type.
124    pub fn sql_cache_get<T: Any + Send + Sync>(&self, key: &str) -> Option<Arc<T>> {
125        let guard = self.sql_caches.lock();
126        let entry = guard.get(key)?;
127        Arc::clone(entry).downcast::<T>().ok()
128    }
129
130    /// Insert (or overwrite) a typed entry in the shared SQL cache.
131    pub fn sql_cache_insert<T: Any + Send + Sync>(&self, key: String, value: Arc<T>) {
132        self.sql_caches.lock().insert(key, value);
133    }
134
135    /// Remove every entry whose key starts with `prefix`.
136    /// Returns the number of entries removed.
137    pub fn sql_cache_invalidate_prefix(&self, prefix: &str) -> usize {
138        let mut guard = self.sql_caches.lock();
139        let before = guard.len();
140        guard.retain(|k, _| !k.starts_with(prefix));
141        before - guard.len()
142    }
143
144    /// Total number of cache entries (test/diagnostics helper).
145    pub fn sql_cache_len(&self) -> usize {
146        self.sql_caches.lock().len()
147    }
148
149    /// Cloneable handle to the shared cache.
150    pub fn sql_cache_handle(&self) -> SqlCacheHandle {
151        Arc::clone(&self.sql_caches)
152    }
153
154    /// Begin a read-only transaction with snapshot isolation.
155    pub fn begin_read(&self) -> ReadTxn<'_> {
156        self.manager.begin_read()
157    }
158
159    /// Begin a read-write transaction. Only one can be active at a time.
160    pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
161        self.manager.begin_write()
162    }
163
164    /// Get database statistics from the current commit slot.
165    pub fn stats(&self) -> DbStats {
166        let slot = self.manager.current_slot();
167        DbStats {
168            tree_depth: slot.tree_depth,
169            entry_count: slot.tree_entries,
170            total_pages: slot.total_pages,
171            high_water_mark: slot.high_water_mark,
172            merkle_root: slot.merkle_root,
173        }
174    }
175
176    pub fn data_path(&self) -> &Path {
177        &self.data_path
178    }
179
180    pub fn key_path(&self) -> &Path {
181        &self.key_path
182    }
183
184    /// Database file identifier from the file header. citadel-mem binds the
185    /// region key store to this value so a mismatched sidecar is rejected.
186    pub fn file_id(&self) -> u64 {
187        self.file_id
188    }
189
190    /// Whether per-region cryptographic erasure keys are available.
191    /// `true` only when the database was opened with `enable_region_keys(true)`.
192    pub fn region_keys_enabled(&self) -> bool {
193        self.region_keys.is_some()
194    }
195
196    /// Wrap a region's random content key (RCK) under the region KEK (AES-256-KW).
197    /// The 40-byte result is the sole copy of the RCK; citadel-mem stores it in the
198    /// sidecar key store and overwrites it in place to erase the region.
199    pub fn wrap_region_key(&self, rck: &[u8; KEY_SIZE]) -> Result<[u8; WRAPPED_KEY_SIZE]> {
200        self.region_keys
201            .as_ref()
202            .map(|rk| rk.wrap_region_key(rck))
203            .ok_or(Error::RegionKeysDisabled)
204    }
205
206    /// Unwrap a region content key. Fails if the slot was erased (zeroed wrap).
207    pub fn unwrap_region_key(&self, wrapped: &[u8; WRAPPED_KEY_SIZE]) -> Result<[u8; KEY_SIZE]> {
208        self.region_keys
209            .as_ref()
210            .ok_or(Error::RegionKeysDisabled)?
211            .unwrap_region_key(wrapped)
212    }
213
214    /// HMAC key authenticating the region key store's header and slots
215    /// (torn-write detection only; RCK secrecy is protected by AES-KW).
216    pub fn region_store_mac_key(&self) -> Result<[u8; KEY_SIZE]> {
217        self.region_keys
218            .as_ref()
219            .map(|rk| rk.store_mac_key)
220            .ok_or(Error::RegionKeysDisabled)
221    }
222
223    /// Path to the sidecar region key store, `{key_path}` with the
224    /// `citadel-regions` extension. Pure path math; valid even when region keys
225    /// are disabled (the file only exists once an encrypted region is created).
226    pub fn region_store_path(&self) -> PathBuf {
227        region_store_path_for(&self.key_path)
228    }
229
230    /// Run `f` against the lazily-opened sidecar store under its lock.
231    fn with_region_store<T>(&self, f: impl FnOnce(&mut RegionKeyStore) -> Result<T>) -> Result<T> {
232        let mut guard = self.region_store.lock();
233        if guard.is_none() {
234            let mac_key = self.region_store_mac_key()?;
235            *guard = Some(RegionKeyStore::create_or_open(
236                &self.region_store_path(),
237                self.file_id,
238                mac_key,
239            )?);
240        }
241        f(guard.as_mut().expect("region store initialized above"))
242    }
243
244    /// Allocate a slot and store the wrapped RCK (fsync'd); returns `(slot, gen)`.
245    pub fn region_store_allocate_write(
246        &self,
247        region_id: u64,
248        wrapped: &[u8; WRAPPED_KEY_SIZE],
249    ) -> Result<(u32, u64)> {
250        self.with_region_store(|s| {
251            let slot = s.allocate_slot()?;
252            let gen = s.write_live(slot, region_id, wrapped)?;
253            Ok((slot, gen))
254        })
255    }
256
257    /// The authoritative record of region key `slot`.
258    pub fn region_store_slot(&self, slot: u32) -> Result<SlotRecord> {
259        self.with_region_store(|s| s.read_slot(slot))
260    }
261
262    /// Cryptographically erase region key `slot` (no-op if already erased).
263    pub fn region_store_tombstone(&self, slot: u32, region_id: u64) -> Result<()> {
264        self.with_region_store(|s| s.tombstone(slot, region_id))
265    }
266
267    /// `(slot, region_id)` for every LIVE region key slot.
268    pub fn region_store_live_owners(&self) -> Result<Vec<(u32, u64)>> {
269        self.with_region_store(|s| s.live_owners())
270    }
271
272    /// Path to the sidecar per-atom key store, `{key_path}` with the `citadel-atomkeys`
273    /// extension. Pure path math; the file only exists once an encrypted atom is written.
274    pub fn atom_store_path(&self) -> PathBuf {
275        atom_store_path_for(&self.key_path)
276    }
277
278    /// Run `f` against the lazily-opened atom key store under its lock.
279    fn with_atom_store<T>(&self, f: impl FnOnce(&mut AtomKeyStore) -> Result<T>) -> Result<T> {
280        let mut guard = self.atom_store.lock();
281        if guard.is_none() {
282            let mac_key = self.region_store_mac_key()?;
283            *guard = Some(AtomKeyStore::create_or_open(
284                &self.atom_store_path(),
285                self.file_id,
286                mac_key,
287            )?);
288        }
289        f(guard.as_mut().expect("atom store initialized above"))
290    }
291
292    /// Allocate a slot and store one atom's wrapped ACK (fsync'd); returns `(slot, gen)`.
293    pub fn atom_store_allocate_write(
294        &self,
295        atom_id: u64,
296        wrapped: &[u8; WRAPPED_KEY_SIZE],
297    ) -> Result<(u32, u64)> {
298        self.with_atom_store(|s| {
299            let slot = s.allocate_slot()?;
300            let gen = s.write_live(slot, atom_id, wrapped)?;
301            Ok((slot, gen))
302        })
303    }
304
305    /// Allocate and durably write a batch of `(atom_id, wrapped)` ACKs with ONE fsync;
306    /// returns `(slot, gen)` per item in order.
307    pub fn atom_store_allocate_batch(
308        &self,
309        items: &[(u64, [u8; WRAPPED_KEY_SIZE])],
310    ) -> Result<Vec<(u32, u64)>> {
311        if items.is_empty() {
312            return Ok(Vec::new());
313        }
314        self.with_atom_store(|s| {
315            let slots = s.allocate_batch(items.len())?;
316            let writes: Vec<(u32, u64, [u8; WRAPPED_KEY_SIZE])> = slots
317                .iter()
318                .zip(items)
319                .map(|(&slot, (atom_id, wrapped))| (slot, *atom_id, *wrapped))
320                .collect();
321            let gens = s.write_live_batch(&writes)?;
322            Ok(slots.into_iter().zip(gens).collect())
323        })
324    }
325
326    /// The authoritative record of atom key `slot` (its wrapped ACK and state).
327    pub fn atom_store_slot(&self, slot: u32) -> Result<SlotRecord> {
328        self.with_atom_store(|s| s.read_slot(slot))
329    }
330
331    /// Cryptographically erase atom key `slot` (no-op if already erased).
332    pub fn atom_store_tombstone(&self, slot: u32, atom_id: u64) -> Result<()> {
333        self.with_atom_store(|s| s.tombstone(slot, atom_id))
334    }
335
336    /// Erase a batch of atom key slots with two fsyncs total (not 2N). Items are `(slot, atom_id)`.
337    /// Returns the slots actually erased as `(slot, atom_id, old_gen, new_gen)`, confirmed
338    /// through the key store's read-back gate, for building a verifiable erasure receipt.
339    pub fn atom_store_tombstone_batch(
340        &self,
341        items: &[(u32, u64)],
342    ) -> Result<Vec<(u32, u64, u64, u64)>> {
343        self.with_atom_store(|s| s.tombstone_batch(items))
344    }
345
346    /// Every LIVE atom key's `atom_id -> wrapped ACK`, in one whole-file pass.
347    pub fn atom_store_live_wrapped(&self) -> Result<FxHashMap<u64, [u8; WRAPPED_KEY_SIZE]>> {
348        self.with_atom_store(|s| s.live_wrapped())
349    }
350
351    /// `(slot, atom_id)` for every LIVE atom key slot.
352    pub fn atom_store_live_owners(&self) -> Result<Vec<(u32, u64)>> {
353        self.with_atom_store(|s| s.live_owners())
354    }
355
356    /// Number of currently active readers.
357    pub fn reader_count(&self) -> usize {
358        self.manager.reader_count()
359    }
360
361    /// Change the database passphrase (re-wraps REK, no page re-encryption).
362    pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
363        use citadel_crypto::kdf::{derive_mk, generate_salt};
364        use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
365
366        let key_data = fs::read(&self.key_path)?;
367        if key_data.len() != KEY_FILE_SIZE {
368            return Err(Error::Io(std::io::Error::new(
369                std::io::ErrorKind::InvalidData,
370                "key file has incorrect size",
371            )));
372        }
373        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
374        let kf = KeyFile::deserialize(&key_buf)?;
375
376        let old_mk = derive_mk(
377            kf.kdf_algorithm,
378            old_passphrase,
379            &kf.argon2_salt,
380            kf.argon2_m_cost,
381            kf.argon2_t_cost,
382            kf.argon2_p_cost,
383        )?;
384        kf.verify_mac(&old_mk)?;
385
386        let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
387
388        let new_salt = generate_salt();
389        let new_mk = derive_mk(
390            kf.kdf_algorithm,
391            new_passphrase,
392            &new_salt,
393            kf.argon2_m_cost,
394            kf.argon2_t_cost,
395            kf.argon2_p_cost,
396        )?;
397
398        let new_wrapped = wrap_rek(&new_mk, &rek);
399
400        let mut new_kf = kf.clone();
401        new_kf.argon2_salt = new_salt;
402        new_kf.wrapped_rek = new_wrapped;
403        new_kf.update_mac(&new_mk);
404
405        durable::atomic_write(&self.key_path, &new_kf.serialize())?;
406
407        #[cfg(feature = "audit-log")]
408        self.log_audit(AuditEventType::PassphraseChanged, &[]);
409
410        Ok(())
411    }
412
413    pub fn integrity_check(&self) -> Result<IntegrityReport> {
414        let report = self.manager.integrity_check()?;
415
416        #[cfg(feature = "audit-log")]
417        {
418            let error_count = report.errors.len() as u32;
419            self.log_audit(
420                AuditEventType::IntegrityCheckPerformed,
421                &error_count.to_le_bytes(),
422            );
423        }
424
425        Ok(report)
426    }
427
428    /// Create a hot backup via MVCC snapshot. Also copies the key file.
429    #[cfg(not(target_arch = "wasm32"))]
430    pub fn backup(&self, dest_path: &Path) -> Result<()> {
431        let dest_file = OpenOptions::new()
432            .read(true)
433            .write(true)
434            .create_new(true)
435            .open(dest_path)?;
436        let dest_io = MmapPageIO::try_new(dest_file)?;
437        self.manager.backup_to(&dest_io)?;
438
439        let dest_key_path = resolve_key_path_for(dest_path);
440        fs::copy(&self.key_path, &dest_key_path)?;
441        self.copy_region_store_to(&dest_key_path)?;
442
443        #[cfg(feature = "audit-log")]
444        self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
445
446        Ok(())
447    }
448
449    /// Export an encrypted key backup for disaster recovery.
450    ///
451    /// Requires the current database passphrase. The backup can later restore
452    /// access via `restore_key_from_backup` if the database passphrase is lost.
453    pub fn export_key_backup(
454        &self,
455        db_passphrase: &[u8],
456        backup_passphrase: &[u8],
457        dest_path: &Path,
458    ) -> Result<()> {
459        use citadel_crypto::kdf::derive_mk;
460        use citadel_crypto::key_backup::create_key_backup;
461        use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
462
463        let key_data = fs::read(&self.key_path)?;
464        if key_data.len() != KEY_FILE_SIZE {
465            return Err(Error::Io(std::io::Error::new(
466                std::io::ErrorKind::InvalidData,
467                "key file has incorrect size",
468            )));
469        }
470        let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
471        let kf = KeyFile::deserialize(&key_buf)?;
472
473        let mk = derive_mk(
474            kf.kdf_algorithm,
475            db_passphrase,
476            &kf.argon2_salt,
477            kf.argon2_m_cost,
478            kf.argon2_t_cost,
479            kf.argon2_p_cost,
480        )?;
481        kf.verify_mac(&mk)?;
482
483        let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
484
485        let backup_data = create_key_backup(
486            &rek,
487            backup_passphrase,
488            kf.file_id,
489            kf.cipher_id,
490            kf.kdf_algorithm,
491            kf.argon2_m_cost,
492            kf.argon2_t_cost,
493            kf.argon2_p_cost,
494            kf.current_epoch,
495        )?;
496
497        durable::write_and_sync(dest_path, &backup_data)?;
498
499        #[cfg(feature = "audit-log")]
500        self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
501
502        Ok(())
503    }
504
505    /// Restore a key file from an encrypted backup (static - no `Database` needed).
506    ///
507    /// Unwraps the REK using `backup_passphrase`, then creates a new key file
508    /// protected by `new_db_passphrase`.
509    pub fn restore_key_from_backup(
510        backup_path: &Path,
511        backup_passphrase: &[u8],
512        new_db_passphrase: &[u8],
513        db_path: &Path,
514    ) -> Result<()> {
515        use citadel_core::{
516            KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
517        };
518        use citadel_crypto::kdf::{derive_mk, generate_salt};
519        use citadel_crypto::key_backup::restore_rek_from_backup;
520        use citadel_crypto::key_manager::wrap_rek;
521        use citadel_crypto::key_manager::KeyFile;
522
523        let backup_data = fs::read(backup_path)?;
524        if backup_data.len() != KEY_BACKUP_SIZE {
525            return Err(Error::Io(std::io::Error::new(
526                std::io::ErrorKind::InvalidData,
527                "backup file has incorrect size",
528            )));
529        }
530        let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
531
532        let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
533
534        let new_salt = generate_salt();
535        let new_mk = derive_mk(
536            restored.kdf_algorithm,
537            new_db_passphrase,
538            &new_salt,
539            restored.kdf_param1,
540            restored.kdf_param2,
541            restored.kdf_param3,
542        )?;
543
544        let new_wrapped = wrap_rek(&new_mk, &restored.rek);
545
546        let mut new_kf = KeyFile {
547            magic: KEY_FILE_MAGIC,
548            version: KEY_FILE_VERSION,
549            file_id: restored.file_id,
550            argon2_salt: new_salt,
551            argon2_m_cost: restored.kdf_param1,
552            argon2_t_cost: restored.kdf_param2,
553            argon2_p_cost: restored.kdf_param3,
554            cipher_id: restored.cipher_id,
555            kdf_algorithm: restored.kdf_algorithm,
556            wrapped_rek: new_wrapped,
557            current_epoch: restored.epoch,
558            prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
559            prev_epoch: 0,
560            rotation_active: false,
561            file_mac: [0u8; MAC_SIZE],
562        };
563        new_kf.update_mac(&new_mk);
564
565        let key_path = resolve_key_path_for(db_path);
566        durable::atomic_write(&key_path, &new_kf.serialize())?;
567
568        Ok(())
569    }
570
571    /// Compact the database into a new file. Also copies the key file.
572    #[cfg(not(target_arch = "wasm32"))]
573    pub fn compact(&self, dest_path: &Path) -> Result<()> {
574        let dest_file = OpenOptions::new()
575            .read(true)
576            .write(true)
577            .create_new(true)
578            .open(dest_path)?;
579        let dest_io = MmapPageIO::try_new(dest_file)?;
580        self.manager.compact_to(&dest_io)?;
581
582        let dest_key_path = resolve_key_path_for(dest_path);
583        fs::copy(&self.key_path, &dest_key_path)?;
584        self.copy_region_store_to(&dest_key_path)?;
585
586        #[cfg(feature = "audit-log")]
587        self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
588
589        Ok(())
590    }
591
592    /// Copy the sidecar region key store next to `dest_key_path`, if it exists.
593    ///
594    /// A backup/compaction must carry the wrapped region keys so encrypted
595    /// regions remain openable from the copy. Note: a backup taken while a
596    /// region is live retains a recoverable key; `forget` cannot reach it, so
597    /// backup retention is the operator's responsibility (see `region_store_path`).
598    #[cfg(not(target_arch = "wasm32"))]
599    fn copy_region_store_to(&self, dest_key_path: &Path) -> Result<()> {
600        let src = self.region_store_path();
601        if src.exists() {
602            let dest = region_store_path_for(dest_key_path);
603            fs::copy(&src, &dest)?;
604        }
605        let atom_src = self.atom_store_path();
606        if atom_src.exists() {
607            fs::copy(&atom_src, atom_store_path_for(dest_key_path))?;
608        }
609        Ok(())
610    }
611}
612
613impl Database {
614    #[doc(hidden)]
615    pub fn manager(&self) -> &TxnManager {
616        &self.manager
617    }
618
619    /// Path to the audit log file, if audit logging is enabled.
620    #[cfg(feature = "audit-log")]
621    pub fn audit_log_path(&self) -> Option<PathBuf> {
622        if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
623            Some(crate::audit::resolve_audit_path(&self.data_path))
624        } else {
625            None
626        }
627    }
628
629    /// Verify the audit log's HMAC chain integrity.
630    #[cfg(feature = "audit-log")]
631    pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
632        let audit = self
633            .audit_log
634            .as_ref()
635            .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
636        let guard = audit.lock();
637        let path = crate::audit::resolve_audit_path(&self.data_path);
638        crate::audit::verify_audit_log(&path, guard.audit_key())
639    }
640
641    #[cfg(feature = "audit-log")]
642    pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
643        if let Some(ref mutex) = self.audit_log {
644            let _ = mutex.lock().log(event_type, detail);
645        }
646    }
647
648    #[cfg(feature = "audit-log")]
649    fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
650        let path_str = path.to_string_lossy();
651        let path_bytes = path_str.as_bytes();
652        let len = (path_bytes.len() as u16).to_le_bytes();
653        let mut detail = Vec::with_capacity(2 + path_bytes.len());
654        detail.extend_from_slice(&len);
655        detail.extend_from_slice(path_bytes);
656        self.log_audit(event_type, &detail);
657    }
658}
659
660use citadel_sync::transport::SyncTransport;
661
662/// Outcome of a sync operation.
663#[derive(Debug, Clone)]
664pub struct SyncOutcome {
665    /// Per-table results: `(table_name, entries_applied)`.
666    pub tables_synced: Vec<(Vec<u8>, u64)>,
667    /// Default tree sync result (if performed).
668    pub default_tree: Option<citadel_sync::SyncOutcome>,
669}
670
671const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
672
673impl Database {
674    /// Get or create a persistent NodeId for this database.
675    pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
676        let mut rtx = self.manager.begin_read();
677        if let Some(data) = rtx.get(NODE_ID_KEY)? {
678            if data.len() == 8 {
679                return Ok(citadel_sync::NodeId::from_bytes(
680                    data[..8].try_into().unwrap(),
681                ));
682            }
683        }
684        drop(rtx);
685
686        let node_id = citadel_sync::NodeId::random();
687        let mut wtx = self.manager.begin_write()?;
688        wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
689        wtx.commit()?;
690        Ok(node_id)
691    }
692
693    /// Push local named tables to a remote peer.
694    pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
695        let node_id = self.node_id()?;
696        let transport =
697            citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
698        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
699            node_id,
700            direction: citadel_sync::SyncDirection::Push,
701            crdt_aware: false,
702        });
703
704        let results = session
705            .sync_tables_as_initiator(&self.manager, &transport)
706            .map_err(sync_err_to_core)?;
707
708        transport.close().map_err(sync_err_to_core)?;
709
710        Ok(SyncOutcome {
711            tables_synced: results
712                .into_iter()
713                .map(|(name, r)| (name, r.entries_applied))
714                .collect(),
715            default_tree: None,
716        })
717    }
718
719    /// Handle an incoming sync session from a remote peer.
720    pub fn handle_sync(
721        &self,
722        stream: std::net::TcpStream,
723        sync_key: &citadel_sync::SyncKey,
724    ) -> Result<SyncOutcome> {
725        let node_id = self.node_id()?;
726        let transport =
727            citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
728        let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
729            node_id,
730            direction: citadel_sync::SyncDirection::Push,
731            crdt_aware: false,
732        });
733
734        let results = session
735            .handle_table_sync_as_responder(&self.manager, &transport)
736            .map_err(sync_err_to_core)?;
737
738        transport.close().map_err(sync_err_to_core)?;
739
740        Ok(SyncOutcome {
741            tables_synced: results
742                .into_iter()
743                .map(|(name, r)| (name, r.entries_applied))
744                .collect(),
745            default_tree: None,
746        })
747    }
748}
749
750fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
751    match e {
752        citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
753        other => Error::Sync(other.to_string()),
754    }
755}
756
757#[cfg(feature = "audit-log")]
758impl Drop for Database {
759    fn drop(&mut self) {
760        self.log_audit(AuditEventType::DatabaseClosed, &[]);
761    }
762}
763
764/// `{data_path}.citadel-keys`
765fn resolve_key_path_for(data_path: &Path) -> PathBuf {
766    let mut name = data_path.as_os_str().to_os_string();
767    name.push(".citadel-keys");
768    PathBuf::from(name)
769}
770
771/// Sidecar region key store path: `key_path` with the `citadel-regions` extension,
772/// e.g. `mydb.citadel.citadel-keys` -> `mydb.citadel.citadel-regions`.
773fn region_store_path_for(key_path: &Path) -> PathBuf {
774    key_path.with_extension("citadel-regions")
775}
776
777/// Sidecar atom key store path: `key_path` with the `citadel-atomkeys` extension.
778fn atom_store_path_for(key_path: &Path) -> PathBuf {
779    key_path.with_extension("citadel-atomkeys")
780}
781
782#[cfg(test)]
783mod sql_cache_tests {
784    use super::*;
785    use crate::builder::DatabaseBuilder;
786    use citadel_core::types::Argon2Profile;
787
788    fn open_db(dir: &Path) -> Database {
789        DatabaseBuilder::new(dir.join("test.db"))
790            .passphrase(b"x")
791            .argon2_profile(Argon2Profile::Iot)
792            .create()
793            .unwrap()
794    }
795
796    #[derive(Debug, PartialEq)]
797    struct Marker(u32);
798
799    #[test]
800    fn insert_then_get_round_trips() {
801        let dir = tempfile::tempdir().unwrap();
802        let db = open_db(dir.path());
803        db.sql_cache_insert("k".to_string(), Arc::new(Marker(42)));
804        let got = db.sql_cache_get::<Marker>("k").unwrap();
805        assert_eq!(*got, Marker(42));
806    }
807
808    #[test]
809    fn get_missing_returns_none() {
810        let dir = tempfile::tempdir().unwrap();
811        let db = open_db(dir.path());
812        assert!(db.sql_cache_get::<Marker>("missing").is_none());
813    }
814
815    #[test]
816    fn get_wrong_type_returns_none() {
817        let dir = tempfile::tempdir().unwrap();
818        let db = open_db(dir.path());
819        db.sql_cache_insert("k".to_string(), Arc::new(Marker(1)));
820        assert!(db.sql_cache_get::<String>("k").is_none());
821    }
822
823    #[test]
824    fn insert_overwrites_existing_entry() {
825        let dir = tempfile::tempdir().unwrap();
826        let db = open_db(dir.path());
827        db.sql_cache_insert("k".to_string(), Arc::new(Marker(1)));
828        db.sql_cache_insert("k".to_string(), Arc::new(Marker(2)));
829        assert_eq!(*db.sql_cache_get::<Marker>("k").unwrap(), Marker(2));
830    }
831
832    #[test]
833    fn invalidate_prefix_removes_matching_keys() {
834        let dir = tempfile::tempdir().unwrap();
835        let db = open_db(dir.path());
836        db.sql_cache_insert("ann:t1:ix_v".to_string(), Arc::new(Marker(1)));
837        db.sql_cache_insert("ann:t1:ix_w".to_string(), Arc::new(Marker(2)));
838        db.sql_cache_insert("ann:t2:ix_v".to_string(), Arc::new(Marker(3)));
839        db.sql_cache_insert("other:x".to_string(), Arc::new(Marker(4)));
840
841        let removed = db.sql_cache_invalidate_prefix("ann:t1:");
842        assert_eq!(removed, 2);
843        assert!(db.sql_cache_get::<Marker>("ann:t1:ix_v").is_none());
844        assert!(db.sql_cache_get::<Marker>("ann:t1:ix_w").is_none());
845        assert!(db.sql_cache_get::<Marker>("ann:t2:ix_v").is_some());
846        assert!(db.sql_cache_get::<Marker>("other:x").is_some());
847    }
848
849    #[test]
850    fn invalidate_prefix_no_match_returns_zero() {
851        let dir = tempfile::tempdir().unwrap();
852        let db = open_db(dir.path());
853        db.sql_cache_insert("a:1".to_string(), Arc::new(Marker(1)));
854        assert_eq!(db.sql_cache_invalidate_prefix("z:"), 0);
855        assert_eq!(db.sql_cache_len(), 1);
856    }
857
858    #[test]
859    fn shared_arc_observed_by_two_borrows() {
860        let dir = tempfile::tempdir().unwrap();
861        let db = open_db(dir.path());
862        let value = Arc::new(Marker(7));
863        db.sql_cache_insert("k".to_string(), Arc::clone(&value));
864        let a = db.sql_cache_get::<Marker>("k").unwrap();
865        let b = db.sql_cache_get::<Marker>("k").unwrap();
866        assert!(Arc::ptr_eq(&a, &b));
867        assert!(Arc::ptr_eq(&a, &value));
868    }
869}