1use std::fs;
2use std::fs::OpenOptions;
3use std::path::{Path, PathBuf};
4
5use citadel_core::{Error, Result, KEY_FILE_SIZE, MERKLE_HASH_SIZE};
6use citadel_io::durable;
7use citadel_io::sync_io::SyncPageIO;
8use citadel_txn::integrity::IntegrityReport;
9use citadel_txn::manager::TxnManager;
10use citadel_txn::read_txn::ReadTxn;
11use citadel_txn::write_txn::WriteTxn;
12
13#[cfg(feature = "audit-log")]
14use crate::audit::{AuditEventType, AuditLog};
15
16#[derive(Debug, Clone)]
18pub struct DbStats {
19 pub tree_depth: u16,
20 pub entry_count: u64,
21 pub total_pages: u32,
22 pub high_water_mark: u32,
23 pub merkle_root: [u8; MERKLE_HASH_SIZE],
24}
25
26pub struct Database {
30 manager: TxnManager,
31 data_path: PathBuf,
32 key_path: PathBuf,
33 #[cfg(feature = "audit-log")]
34 audit_log: Option<parking_lot::Mutex<AuditLog>>,
35}
36
37impl std::fmt::Debug for Database {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("Database")
40 .field("data_path", &self.data_path)
41 .field("key_path", &self.key_path)
42 .finish()
43 }
44}
45
46unsafe impl Send for Database {}
48unsafe impl Sync for Database {}
49
50impl Database {
51 #[cfg(feature = "audit-log")]
52 pub(crate) fn new(
53 manager: TxnManager,
54 data_path: PathBuf,
55 key_path: PathBuf,
56 audit_log: Option<AuditLog>,
57 ) -> Self {
58 Self {
59 manager,
60 data_path,
61 key_path,
62 audit_log: audit_log.map(parking_lot::Mutex::new),
63 }
64 }
65
66 #[cfg(not(feature = "audit-log"))]
67 pub(crate) fn new(manager: TxnManager, data_path: PathBuf, key_path: PathBuf) -> Self {
68 Self {
69 manager,
70 data_path,
71 key_path,
72 }
73 }
74
75 pub fn begin_read(&self) -> ReadTxn<'_> {
77 self.manager.begin_read()
78 }
79
80 pub fn begin_write(&self) -> Result<WriteTxn<'_>> {
82 self.manager.begin_write()
83 }
84
85 pub fn stats(&self) -> DbStats {
87 let slot = self.manager.current_slot();
88 DbStats {
89 tree_depth: slot.tree_depth,
90 entry_count: slot.tree_entries,
91 total_pages: slot.total_pages,
92 high_water_mark: slot.high_water_mark,
93 merkle_root: slot.merkle_root,
94 }
95 }
96
97 pub fn data_path(&self) -> &Path {
99 &self.data_path
100 }
101
102 pub fn key_path(&self) -> &Path {
104 &self.key_path
105 }
106
107 pub fn reader_count(&self) -> usize {
109 self.manager.reader_count()
110 }
111
112 pub fn change_passphrase(&self, old_passphrase: &[u8], new_passphrase: &[u8]) -> Result<()> {
114 use citadel_crypto::kdf::{derive_mk, generate_salt};
115 use citadel_crypto::key_manager::{unwrap_rek, wrap_rek, KeyFile};
116
117 let key_data = fs::read(&self.key_path)?;
118 if key_data.len() != KEY_FILE_SIZE {
119 return Err(Error::Io(std::io::Error::new(
120 std::io::ErrorKind::InvalidData,
121 "key file has incorrect size",
122 )));
123 }
124 let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
125 let kf = KeyFile::deserialize(&key_buf)?;
126
127 let old_mk = derive_mk(
128 kf.kdf_algorithm,
129 old_passphrase,
130 &kf.argon2_salt,
131 kf.argon2_m_cost,
132 kf.argon2_t_cost,
133 kf.argon2_p_cost,
134 )?;
135 kf.verify_mac(&old_mk)?;
136
137 let rek = unwrap_rek(&old_mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
138
139 let new_salt = generate_salt();
140 let new_mk = derive_mk(
141 kf.kdf_algorithm,
142 new_passphrase,
143 &new_salt,
144 kf.argon2_m_cost,
145 kf.argon2_t_cost,
146 kf.argon2_p_cost,
147 )?;
148
149 let new_wrapped = wrap_rek(&new_mk, &rek);
150
151 let mut new_kf = kf.clone();
152 new_kf.argon2_salt = new_salt;
153 new_kf.wrapped_rek = new_wrapped;
154 new_kf.update_mac(&new_mk);
155
156 durable::atomic_write(&self.key_path, &new_kf.serialize())?;
157
158 #[cfg(feature = "audit-log")]
159 self.log_audit(AuditEventType::PassphraseChanged, &[]);
160
161 Ok(())
162 }
163
164 pub fn integrity_check(&self) -> Result<IntegrityReport> {
166 let report = self.manager.integrity_check()?;
167
168 #[cfg(feature = "audit-log")]
169 {
170 let error_count = report.errors.len() as u32;
171 self.log_audit(
172 AuditEventType::IntegrityCheckPerformed,
173 &error_count.to_le_bytes(),
174 );
175 }
176
177 Ok(report)
178 }
179
180 pub fn backup(&self, dest_path: &Path) -> Result<()> {
182 let dest_file = OpenOptions::new()
183 .read(true)
184 .write(true)
185 .create_new(true)
186 .open(dest_path)?;
187 let dest_io = SyncPageIO::new(dest_file);
188 self.manager.backup_to(&dest_io)?;
189
190 let dest_key_path = resolve_key_path_for(dest_path);
191 fs::copy(&self.key_path, &dest_key_path)?;
192
193 #[cfg(feature = "audit-log")]
194 self.log_audit_with_path(AuditEventType::BackupCreated, dest_path);
195
196 Ok(())
197 }
198
199 pub fn export_key_backup(
204 &self,
205 db_passphrase: &[u8],
206 backup_passphrase: &[u8],
207 dest_path: &Path,
208 ) -> Result<()> {
209 use citadel_crypto::kdf::derive_mk;
210 use citadel_crypto::key_backup::create_key_backup;
211 use citadel_crypto::key_manager::{unwrap_rek, KeyFile};
212
213 let key_data = fs::read(&self.key_path)?;
214 if key_data.len() != KEY_FILE_SIZE {
215 return Err(Error::Io(std::io::Error::new(
216 std::io::ErrorKind::InvalidData,
217 "key file has incorrect size",
218 )));
219 }
220 let key_buf: [u8; KEY_FILE_SIZE] = key_data.try_into().unwrap();
221 let kf = KeyFile::deserialize(&key_buf)?;
222
223 let mk = derive_mk(
224 kf.kdf_algorithm,
225 db_passphrase,
226 &kf.argon2_salt,
227 kf.argon2_m_cost,
228 kf.argon2_t_cost,
229 kf.argon2_p_cost,
230 )?;
231 kf.verify_mac(&mk)?;
232
233 let rek = unwrap_rek(&mk, &kf.wrapped_rek).map_err(|_| Error::BadPassphrase)?;
234
235 let backup_data = create_key_backup(
236 &rek,
237 backup_passphrase,
238 kf.file_id,
239 kf.cipher_id,
240 kf.kdf_algorithm,
241 kf.argon2_m_cost,
242 kf.argon2_t_cost,
243 kf.argon2_p_cost,
244 kf.current_epoch,
245 )?;
246
247 durable::write_and_sync(dest_path, &backup_data)?;
248
249 #[cfg(feature = "audit-log")]
250 self.log_audit_with_path(AuditEventType::KeyBackupExported, dest_path);
251
252 Ok(())
253 }
254
255 pub fn restore_key_from_backup(
260 backup_path: &Path,
261 backup_passphrase: &[u8],
262 new_db_passphrase: &[u8],
263 db_path: &Path,
264 ) -> Result<()> {
265 use citadel_core::{
266 KEY_BACKUP_SIZE, KEY_FILE_MAGIC, KEY_FILE_VERSION, MAC_SIZE, WRAPPED_KEY_SIZE,
267 };
268 use citadel_crypto::kdf::{derive_mk, generate_salt};
269 use citadel_crypto::key_backup::restore_rek_from_backup;
270 use citadel_crypto::key_manager::wrap_rek;
271 use citadel_crypto::key_manager::KeyFile;
272
273 let backup_data = fs::read(backup_path)?;
274 if backup_data.len() != KEY_BACKUP_SIZE {
275 return Err(Error::Io(std::io::Error::new(
276 std::io::ErrorKind::InvalidData,
277 "backup file has incorrect size",
278 )));
279 }
280 let backup_buf: [u8; KEY_BACKUP_SIZE] = backup_data.try_into().unwrap();
281
282 let restored = restore_rek_from_backup(&backup_buf, backup_passphrase)?;
283
284 let new_salt = generate_salt();
285 let new_mk = derive_mk(
286 restored.kdf_algorithm,
287 new_db_passphrase,
288 &new_salt,
289 restored.kdf_param1,
290 restored.kdf_param2,
291 restored.kdf_param3,
292 )?;
293
294 let new_wrapped = wrap_rek(&new_mk, &restored.rek);
295
296 let mut new_kf = KeyFile {
297 magic: KEY_FILE_MAGIC,
298 version: KEY_FILE_VERSION,
299 file_id: restored.file_id,
300 argon2_salt: new_salt,
301 argon2_m_cost: restored.kdf_param1,
302 argon2_t_cost: restored.kdf_param2,
303 argon2_p_cost: restored.kdf_param3,
304 cipher_id: restored.cipher_id,
305 kdf_algorithm: restored.kdf_algorithm,
306 wrapped_rek: new_wrapped,
307 current_epoch: restored.epoch,
308 prev_wrapped_rek: [0u8; WRAPPED_KEY_SIZE],
309 prev_epoch: 0,
310 rotation_active: false,
311 file_mac: [0u8; MAC_SIZE],
312 };
313 new_kf.update_mac(&new_mk);
314
315 let key_path = resolve_key_path_for(db_path);
316 durable::atomic_write(&key_path, &new_kf.serialize())?;
317
318 Ok(())
319 }
320
321 pub fn compact(&self, dest_path: &Path) -> Result<()> {
323 let dest_file = OpenOptions::new()
324 .read(true)
325 .write(true)
326 .create_new(true)
327 .open(dest_path)?;
328 let dest_io = SyncPageIO::new(dest_file);
329 self.manager.compact_to(&dest_io)?;
330
331 let dest_key_path = resolve_key_path_for(dest_path);
332 fs::copy(&self.key_path, &dest_key_path)?;
333
334 #[cfg(feature = "audit-log")]
335 self.log_audit_with_path(AuditEventType::CompactionPerformed, dest_path);
336
337 Ok(())
338 }
339}
340
341impl Database {
342 #[doc(hidden)]
343 pub fn manager(&self) -> &TxnManager {
344 &self.manager
345 }
346
347 #[cfg(feature = "audit-log")]
349 pub fn audit_log_path(&self) -> Option<PathBuf> {
350 if self.audit_log.is_some() && !self.data_path.as_os_str().is_empty() {
351 Some(crate::audit::resolve_audit_path(&self.data_path))
352 } else {
353 None
354 }
355 }
356
357 #[cfg(feature = "audit-log")]
359 pub fn verify_audit_log(&self) -> Result<crate::audit::AuditVerifyResult> {
360 let audit = self
361 .audit_log
362 .as_ref()
363 .ok_or_else(|| Error::Io(std::io::Error::other("audit logging is not enabled")))?;
364 let guard = audit.lock();
365 let path = crate::audit::resolve_audit_path(&self.data_path);
366 crate::audit::verify_audit_log(&path, guard.audit_key())
367 }
368
369 #[cfg(feature = "audit-log")]
370 pub(crate) fn log_audit(&self, event_type: AuditEventType, detail: &[u8]) {
371 if let Some(ref mutex) = self.audit_log {
372 let _ = mutex.lock().log(event_type, detail);
373 }
374 }
375
376 #[cfg(feature = "audit-log")]
377 fn log_audit_with_path(&self, event_type: AuditEventType, path: &Path) {
378 let path_str = path.to_string_lossy();
379 let path_bytes = path_str.as_bytes();
380 let len = (path_bytes.len() as u16).to_le_bytes();
381 let mut detail = Vec::with_capacity(2 + path_bytes.len());
382 detail.extend_from_slice(&len);
383 detail.extend_from_slice(path_bytes);
384 self.log_audit(event_type, &detail);
385 }
386}
387
388use citadel_sync::transport::SyncTransport;
391
392#[derive(Debug, Clone)]
394pub struct SyncOutcome {
395 pub tables_synced: Vec<(Vec<u8>, u64)>,
397 pub default_tree: Option<citadel_sync::SyncOutcome>,
399}
400
401const NODE_ID_KEY: &[u8] = b"__citadel_node_id";
402
403impl Database {
404 pub fn node_id(&self) -> Result<citadel_sync::NodeId> {
406 let mut rtx = self.manager.begin_read();
407 if let Some(data) = rtx.get(NODE_ID_KEY)? {
408 if data.len() == 8 {
409 return Ok(citadel_sync::NodeId::from_bytes(
410 data[..8].try_into().unwrap(),
411 ));
412 }
413 }
414 drop(rtx);
415
416 let node_id = citadel_sync::NodeId::random();
417 let mut wtx = self.manager.begin_write()?;
418 wtx.insert(NODE_ID_KEY, &node_id.to_bytes())?;
419 wtx.commit()?;
420 Ok(node_id)
421 }
422
423 pub fn sync_to(&self, addr: &str, sync_key: &citadel_sync::SyncKey) -> Result<SyncOutcome> {
425 let node_id = self.node_id()?;
426 let transport =
427 citadel_sync::NoiseTransport::connect(addr, sync_key).map_err(sync_err_to_core)?;
428 let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
429 node_id,
430 direction: citadel_sync::SyncDirection::Push,
431 crdt_aware: false,
432 });
433
434 let results = session
435 .sync_tables_as_initiator(&self.manager, &transport)
436 .map_err(sync_err_to_core)?;
437
438 transport.close().map_err(sync_err_to_core)?;
439
440 Ok(SyncOutcome {
441 tables_synced: results
442 .into_iter()
443 .map(|(name, r)| (name, r.entries_applied))
444 .collect(),
445 default_tree: None,
446 })
447 }
448
449 pub fn handle_sync(
451 &self,
452 stream: std::net::TcpStream,
453 sync_key: &citadel_sync::SyncKey,
454 ) -> Result<SyncOutcome> {
455 let node_id = self.node_id()?;
456 let transport =
457 citadel_sync::NoiseTransport::accept(stream, sync_key).map_err(sync_err_to_core)?;
458 let session = citadel_sync::SyncSession::new(citadel_sync::SyncConfig {
459 node_id,
460 direction: citadel_sync::SyncDirection::Push,
461 crdt_aware: false,
462 });
463
464 let results = session
465 .handle_table_sync_as_responder(&self.manager, &transport)
466 .map_err(sync_err_to_core)?;
467
468 transport.close().map_err(sync_err_to_core)?;
469
470 Ok(SyncOutcome {
471 tables_synced: results
472 .into_iter()
473 .map(|(name, r)| (name, r.entries_applied))
474 .collect(),
475 default_tree: None,
476 })
477 }
478}
479
480fn sync_err_to_core(e: citadel_sync::transport::SyncError) -> Error {
481 match e {
482 citadel_sync::transport::SyncError::Io(io) => Error::Io(io),
483 other => Error::Sync(other.to_string()),
484 }
485}
486
487#[cfg(feature = "audit-log")]
488impl Drop for Database {
489 fn drop(&mut self) {
490 self.log_audit(AuditEventType::DatabaseClosed, &[]);
491 }
492}
493
494fn resolve_key_path_for(data_path: &Path) -> PathBuf {
496 let mut name = data_path.as_os_str().to_os_string();
497 name.push(".citadel-keys");
498 PathBuf::from(name)
499}