1use crate::access::{AccessQuery, AccessRecord, RESERVED_TYPE_PREFIX};
2use crate::checkpoint::{Checkpoint, CheckpointLog};
3use crate::crypto::{self, KeyRing, KeyVersion, KEYLESS};
4use crate::error::{Error, Result};
5use crate::id::Uid;
6use crate::merkle::Hash;
7use crate::merkle_log::{ConsistencyProof, InclusionProof};
8use crate::model::{AuditLog, Content, StoredValue, TargetRelation, Value};
9use crate::query::{LogQuery, LogView, Order, QueryPage, TargetFilter, TargetSnapshot};
10use crate::registry::{EntityInput, FieldTokens, RegistryIndex, RegistryRecord, TypeRegistry};
11use crate::retention::RetentionPolicy;
12use crate::schema::{CustomColumnDef, FieldIndex, TypeSchema};
13use crate::storage::{rewrite_table, Position, PositionedScan, SegmentSlice, Table};
14use serde::{Deserialize, Serialize};
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::path::PathBuf;
17use std::sync::Arc;
18
19pub type AnchorHook = Arc<dyn Fn(&Checkpoint) + Send + Sync>;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SyncPolicy {
26 Always,
28 EveryN(u32),
30 OsManaged,
32}
33
34#[derive(Clone)]
35pub struct StoreConfig {
36 pub root: PathBuf,
37 pub max_segment_bytes: u64,
38 pub sync_policy: SyncPolicy,
39 pub retention: RetentionPolicy,
40 pub keys: KeyRing,
41 pub plaintext_cache: bool,
49 pub anchor: Option<AnchorHook>,
55 pub access_log: bool,
59 pub access_retention: RetentionPolicy,
64}
65
66impl StoreConfig {
67 pub fn new(root: impl Into<PathBuf>) -> Self {
68 Self {
69 root: root.into(),
70 max_segment_bytes: 64 * 1024 * 1024,
71 sync_policy: SyncPolicy::EveryN(64),
72 retention: RetentionPolicy::keep_forever(),
73 keys: KeyRing::new(),
74 plaintext_cache: false,
75 anchor: None,
76 access_log: false,
77 access_retention: RetentionPolicy::keep_forever(),
78 }
79 }
80
81 pub fn access_log(mut self, enabled: bool) -> Self {
84 self.access_log = enabled;
85 self
86 }
87
88 pub fn access_retention(mut self, r: RetentionPolicy) -> Self {
91 self.access_retention = r;
92 self
93 }
94
95 pub fn anchor(mut self, hook: impl Fn(&Checkpoint) + Send + Sync + 'static) -> Self {
97 self.anchor = Some(Arc::new(hook));
98 self
99 }
100
101 pub fn plaintext_cache(mut self, enabled: bool) -> Self {
104 self.plaintext_cache = enabled;
105 self
106 }
107
108 pub fn retention(mut self, r: RetentionPolicy) -> Self {
109 self.retention = r;
110 self
111 }
112
113 pub fn sync_policy(mut self, p: SyncPolicy) -> Self {
114 self.sync_policy = p;
115 self
116 }
117
118 pub fn keys(mut self, k: KeyRing) -> Self {
119 self.keys = k;
120 self
121 }
122
123 pub fn max_segment_bytes(mut self, n: u64) -> Self {
124 self.max_segment_bytes = n;
125 self
126 }
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
135enum MetaEvent {
136 TypeDefined(TypeSchema),
137 CustomColumnDefined(CustomColumnDef),
138 Rekeyed(RekeyEvent),
139}
140
141const REKEY_SIGNING_DOMAIN: &[u8] = b"quipu-rekey-v2\0";
144
145#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
155pub struct RekeyEvent {
156 pub occurred_at: u64,
158 pub rsa_version: KeyVersion,
160 pub hmac_version: KeyVersion,
163 pub tables: Vec<RekeyedTable>,
164 pub signing_key_version: KeyVersion,
166 pub signature: Vec<u8>,
168}
169
170#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173pub struct RekeyedTable {
174 pub type_name: String,
175 pub records: u64,
179 pub old_root: String,
181 pub new_root: String,
183}
184
185fn rekey_signing_bytes(
186 occurred_at: u64,
187 rsa_version: KeyVersion,
188 hmac_version: KeyVersion,
189 tables: &[RekeyedTable],
190) -> Vec<u8> {
191 let mut out = Vec::with_capacity(REKEY_SIGNING_DOMAIN.len() + 16 + tables.len() * 160);
192 out.extend_from_slice(REKEY_SIGNING_DOMAIN);
193 out.extend_from_slice(&occurred_at.to_le_bytes());
194 out.extend_from_slice(&rsa_version.to_le_bytes());
195 out.extend_from_slice(&hmac_version.to_le_bytes());
196 for t in tables {
197 out.extend_from_slice(t.type_name.as_bytes());
198 out.push(0);
199 out.extend_from_slice(&t.records.to_le_bytes());
200 out.extend_from_slice(t.old_root.as_bytes());
201 out.push(0);
202 out.extend_from_slice(t.new_root.as_bytes());
203 out.push(0);
204 }
205 out
206}
207
208pub struct AuditStore {
218 cfg: StoreConfig,
219 meta: Table<MetaEvent>,
220 logs: Table<AuditLog>,
221 relations: Table<TargetRelation>,
222 access: Option<Table<AccessRecord>>,
226 registries: HashMap<String, TypeRegistry>,
227 custom_columns: HashMap<String, CustomColumnDef>,
228 checkpoints: CheckpointLog,
229 appends_since_sync: u32,
230 _lock: std::fs::File,
235}
236
237impl AuditStore {
238 pub fn open(cfg: StoreConfig) -> Result<Self> {
239 std::fs::create_dir_all(&cfg.root)?;
240 let lock = std::fs::OpenOptions::new()
241 .create(true)
242 .truncate(false)
243 .write(true)
244 .open(cfg.root.join("LOCK"))?;
245 lock.try_lock().map_err(|e| match e {
246 std::fs::TryLockError::WouldBlock => Error::Locked(cfg.root.display().to_string()),
247 std::fs::TryLockError::Error(e) => Error::Io(e),
248 })?;
249 let mut meta: Table<MetaEvent> =
250 Table::open(&cfg.root.join("meta"), cfg.max_segment_bytes)?;
251 let logs = Table::open(&cfg.root.join("logs"), cfg.max_segment_bytes)?;
252 let relations = Table::open(&cfg.root.join("relations"), cfg.max_segment_bytes)?;
253 let access = if cfg.access_log {
254 Some(Table::open(
255 &cfg.root.join("access"),
256 cfg.max_segment_bytes,
257 )?)
258 } else {
259 None
260 };
261
262 let mut registries = HashMap::new();
263 let mut custom_columns = HashMap::new();
264 let events: Vec<MetaEvent> = meta.scan()?.collect::<Result<Vec<_>>>()?;
265 for ev in events {
266 match ev {
267 MetaEvent::TypeDefined(schema) => {
268 let dir = cfg.root.join("registry").join(&schema.type_name);
270 let reg = TypeRegistry::open(
271 &dir,
272 schema.clone(),
273 cfg.max_segment_bytes,
274 cfg.plaintext_cache,
275 )?;
276 registries.insert(schema.type_name.clone(), reg);
277 }
278 MetaEvent::CustomColumnDefined(def) => {
279 custom_columns.insert(def.name.clone(), def);
280 }
281 MetaEvent::Rekeyed(_) => {}
284 }
285 }
286 let checkpoints = CheckpointLog::new(&cfg.root);
287 Ok(Self {
288 cfg,
289 meta,
290 logs,
291 relations,
292 access,
293 registries,
294 custom_columns,
295 checkpoints,
296 appends_since_sync: 0,
297 _lock: lock,
298 })
299 }
300
301 pub fn define_type(&mut self, schema: TypeSchema) -> Result<()> {
312 if schema.type_name.starts_with(RESERVED_TYPE_PREFIX) {
313 return Err(Error::Schema(format!(
314 "type name '{}' uses the reserved prefix '{RESERVED_TYPE_PREFIX}' — that \
315 namespace belongs to quipu's internal record kinds (e.g. the \
316 '{}' meta-audit type)",
317 schema.type_name,
318 crate::access::ACCESS_TYPE
319 )));
320 }
321 if let Some(existing) = self.registries.get(&schema.type_name) {
322 for old in &existing.schema().fields {
323 match schema.field(&old.name) {
324 None => {
325 return Err(Error::Schema(format!(
326 "type '{}': field '{}' cannot be removed — recorded data still \
327 references it",
328 schema.type_name, old.name
329 )));
330 }
331 Some(new) if new.kind != old.kind || new.protection != old.protection => {
332 return Err(Error::Schema(format!(
333 "type '{}': field '{}' cannot change kind/protection — existing \
334 values would become unsearchable or unreadable",
335 schema.type_name, old.name
336 )));
337 }
338 Some(new) if new.search != old.search => {
339 return Err(Error::Schema(format!(
340 "type '{}': field '{}' cannot change its FieldIndex — records \
341 written under the old index carry no tokens for the new one \
342 and would silently stop matching",
343 schema.type_name, old.name
344 )));
345 }
346 _ => {}
347 }
348 }
349 }
350 let dir = self.cfg.root.join("registry").join(&schema.type_name);
351 let reg = TypeRegistry::open(
352 &dir,
353 schema.clone(),
354 self.cfg.max_segment_bytes,
355 self.cfg.plaintext_cache,
356 )?;
357 self.meta.append(
358 &MetaEvent::TypeDefined(schema.clone()),
359 crate::time::now_micros(),
360 )?;
361 self.meta.sync()?;
362 self.registries.insert(schema.type_name.clone(), reg);
363 Ok(())
364 }
365
366 pub fn has_type(&self, type_name: &str) -> bool {
367 self.registries.contains_key(type_name)
368 }
369
370 pub fn define_custom_column(&mut self, mut def: CustomColumnDef) -> Result<()> {
374 if def.required && def.required_since.is_none() {
375 def.required_since = Some(crate::time::now_micros());
376 }
377 self.meta.append(
378 &MetaEvent::CustomColumnDefined(def.clone()),
379 crate::time::now_micros(),
380 )?;
381 self.meta.sync()?;
382 self.custom_columns.insert(def.name.clone(), def);
383 Ok(())
384 }
385
386 pub fn custom_columns(&self) -> impl Iterator<Item = &CustomColumnDef> {
387 self.custom_columns.values()
388 }
389
390 fn registry(&mut self, type_name: &str) -> Result<&mut TypeRegistry> {
393 self.registries.get_mut(type_name).ok_or_else(|| {
394 Error::Schema(format!(
395 "type '{type_name}' has no registry table — call define_type() first"
396 ))
397 })
398 }
399
400 pub fn register_entity(&mut self, type_name: &str, input: &EntityInput) -> Result<Uid> {
402 let keys = self.cfg.keys.clone();
403 self.registry(type_name)?.upsert(input, &keys)
404 }
405
406 pub fn update_entity(&mut self, type_name: &str, input: &EntityInput) -> Result<Uid> {
409 self.register_entity(type_name, input)
410 }
411
412 pub fn delete_entity(&mut self, type_name: &str, entity_id: &str) -> Result<Uid> {
413 self.registry(type_name)?.delete(entity_id)
414 }
415
416 pub fn entity_latest(&self, type_name: &str, entity_id: &str) -> Option<&RegistryRecord> {
417 self.registries.get(type_name)?.latest(entity_id)
418 }
419
420 #[allow(clippy::too_many_arguments)]
430 pub fn append(
431 &mut self,
432 actor_type: &str,
433 actor: &EntityInput,
434 method: &str,
435 url: &str,
436 content: Content,
437 targets: &[(String, EntityInput)],
438 custom: BTreeMap<String, Value>,
439 ) -> Result<Uid> {
440 self.append_at(
441 crate::time::now_micros(),
442 actor_type,
443 actor,
444 method,
445 url,
446 content,
447 targets,
448 custom,
449 )
450 }
451
452 #[allow(clippy::too_many_arguments)]
457 pub fn append_at(
458 &mut self,
459 occurred_at: u64,
460 actor_type: &str,
461 actor: &EntityInput,
462 method: &str,
463 url: &str,
464 content: Content,
465 targets: &[(String, EntityInput)],
466 custom: BTreeMap<String, Value>,
467 ) -> Result<Uid> {
468 self.validate_custom(&custom, occurred_at)?;
469 let actor_uid = self.register_entity(actor_type, actor)?;
470 let mut target_refs = Vec::with_capacity(targets.len());
471 for (t_type, t_input) in targets {
472 let uid = self.register_entity(t_type, t_input)?;
473 target_refs.push((t_type.clone(), uid));
474 }
475 self.append_resolved_at(
476 occurred_at,
477 actor_type,
478 actor_uid,
479 method,
480 url,
481 content,
482 &target_refs,
483 custom,
484 )
485 }
486
487 #[allow(clippy::too_many_arguments)]
489 pub fn append_resolved(
490 &mut self,
491 actor_type: &str,
492 actor_uid: Uid,
493 method: &str,
494 url: &str,
495 content: Content,
496 targets: &[(String, Uid)],
497 custom: BTreeMap<String, Value>,
498 ) -> Result<Uid> {
499 self.append_resolved_at(
500 crate::time::now_micros(),
501 actor_type,
502 actor_uid,
503 method,
504 url,
505 content,
506 targets,
507 custom,
508 )
509 }
510
511 #[allow(clippy::too_many_arguments)]
513 pub fn append_resolved_at(
514 &mut self,
515 occurred_at: u64,
516 actor_type: &str,
517 actor_uid: Uid,
518 method: &str,
519 url: &str,
520 content: Content,
521 targets: &[(String, Uid)],
522 custom: BTreeMap<String, Value>,
523 ) -> Result<Uid> {
524 self.validate_custom(&custom, occurred_at)?;
525 let log = AuditLog {
526 log_id: Uid::generate(),
527 timestamp: occurred_at,
528 actor: actor_uid,
529 actor_type: actor_type.to_string(),
530 method: method.to_string(),
531 url: url.to_string(),
532 content,
533 custom,
534 };
535 let seq_before = self.logs.active_seq();
536 self.logs.append(&log, log.timestamp)?;
537 let sealed_a_segment = self.logs.active_seq() != seq_before;
538 for (entity_type, uid) in targets {
539 let rel = TargetRelation {
540 log_id: log.log_id,
541 entity_registry_uid: *uid,
542 entity_type: entity_type.clone(),
543 };
544 self.relations.append(&rel, log.timestamp)?;
545 }
546 self.apply_sync_policy()?;
547 if sealed_a_segment {
548 self.write_checkpoint()?;
553 }
554 Ok(log.log_id)
555 }
556
557 fn validate_custom(&self, custom: &BTreeMap<String, Value>, occurred_at: u64) -> Result<()> {
558 for (name, value) in custom {
559 let def = self.custom_columns.get(name).ok_or_else(|| {
560 Error::Schema(format!(
561 "custom column '{name}' is not registered — call define_custom_column()"
562 ))
563 })?;
564 if value.kind() != def.kind {
565 return Err(Error::Schema(format!(
566 "custom column '{name}' expects {:?}, got {:?}",
567 def.kind,
568 value.kind()
569 )));
570 }
571 }
572 for def in self.custom_columns.values() {
573 let in_force = def.required_since.is_none_or(|since| occurred_at >= since);
574 if def.required && in_force && !custom.contains_key(&def.name) {
575 return Err(Error::Schema(format!(
576 "missing required custom column '{}'",
577 def.name
578 )));
579 }
580 }
581 Ok(())
582 }
583
584 fn apply_sync_policy(&mut self) -> Result<()> {
585 match self.cfg.sync_policy {
586 SyncPolicy::Always => self.sync_all()?,
587 SyncPolicy::EveryN(n) => {
588 self.appends_since_sync += 1;
589 if self.appends_since_sync >= n {
590 self.sync_all()?;
591 } else {
592 self.logs.flush()?;
593 self.relations.flush()?;
594 }
595 }
596 SyncPolicy::OsManaged => {
597 self.logs.flush()?;
598 self.relations.flush()?;
599 }
600 }
601 Ok(())
602 }
603
604 fn sync_all(&mut self) -> Result<()> {
608 for reg in self.registries.values_mut() {
609 reg.sync()?;
610 }
611 self.logs.sync()?;
612 self.relations.sync()?;
613 self.appends_since_sync = 0;
614 Ok(())
615 }
616
617 pub fn sync(&mut self) -> Result<()> {
619 self.sync_all()?;
620 self.meta.sync()?;
621 if let Some(access) = self.access.as_mut() {
622 access.sync()?;
623 }
624 Ok(())
625 }
626
627 pub fn verify_integrity(&mut self) -> Result<()> {
638 self.logs.verify()?;
639 self.relations.verify()?;
640 self.meta.verify()?;
641 if let Some(access) = self.access.as_mut() {
642 access.verify()?;
643 }
644 for reg in self.registries.values_mut() {
645 reg.verify()?;
646 }
647 self.verify_checkpoints()?;
648 self.verify_rekey_events()
649 }
650
651 pub fn rekey(&mut self) -> Result<RekeyEvent> {
675 let keys = self.cfg.keys.clone();
676 let rsa_version = keys.active_rsa_version().ok_or_else(|| {
677 Error::Crypto("re-key: no RSA key in the ring — nothing to re-wrap to".into())
678 })?;
679 if !keys.can_sign() {
680 return Err(Error::Crypto(
681 "re-key requires the active RSA private key (the re-key event is signed)".into(),
682 ));
683 }
684 let hmac_version = keys.active_hmac_version().unwrap_or(KEYLESS);
685
686 let mut names: Vec<String> = self.registries.keys().cloned().collect();
687 names.sort();
688 let mut tables = Vec::with_capacity(names.len());
689 for name in names {
690 let reg = self.registries.remove(&name).expect("name was just listed");
693 let schema = reg.schema().clone();
694 drop(reg);
695 let dir = self.cfg.root.join("registry").join(&name);
696 let stats = rewrite_table::<RegistryRecord>(&dir, self.cfg.max_segment_bytes, |rec| {
697 rekey_record(rec, &schema, &keys)
698 })?;
699 let reopened = TypeRegistry::open(
700 &dir,
701 schema,
702 self.cfg.max_segment_bytes,
703 self.cfg.plaintext_cache,
704 )?;
705 self.registries.insert(name.clone(), reopened);
706 tables.push(RekeyedTable {
707 type_name: name,
708 records: stats.records,
709 old_root: crypto::hex(&stats.old_root),
710 new_root: crypto::hex(&stats.new_root),
711 });
712 }
713
714 let occurred_at = crate::time::now_micros();
715 let (signing_key_version, signature) = keys.sign(&rekey_signing_bytes(
716 occurred_at,
717 rsa_version,
718 hmac_version,
719 &tables,
720 ))?;
721 let event = RekeyEvent {
722 occurred_at,
723 rsa_version,
724 hmac_version,
725 tables,
726 signing_key_version,
727 signature,
728 };
729 self.meta
730 .append(&MetaEvent::Rekeyed(event.clone()), occurred_at)?;
731 self.meta.sync()?;
732 Ok(event)
733 }
734
735 pub fn rekey_events(&mut self) -> Result<Vec<RekeyEvent>> {
737 let mut out = Vec::new();
738 for ev in self.meta.scan()? {
739 if let MetaEvent::Rekeyed(r) = ev? {
740 out.push(r);
741 }
742 }
743 Ok(out)
744 }
745
746 fn verify_rekey_events(&mut self) -> Result<()> {
753 let events = self.rekey_events()?;
754 let mut latest: HashMap<String, (u64, Hash)> = HashMap::new();
756 for ev in &events {
757 self.cfg
758 .keys
759 .verify_signature(
760 ev.signing_key_version,
761 &rekey_signing_bytes(
762 ev.occurred_at,
763 ev.rsa_version,
764 ev.hmac_version,
765 &ev.tables,
766 ),
767 &ev.signature,
768 )
769 .map_err(|e| Error::Crypto(format!("re-key event signature invalid: {e}")))?;
770 for t in &ev.tables {
771 let root: Hash = crypto::hex_decode(&t.new_root)
772 .and_then(|v| v.try_into().ok())
773 .ok_or_else(|| Error::Corrupt {
774 segment: format!("meta (re-key event for '{}')", t.type_name),
775 offset: 0,
776 reason: "malformed merkle root in re-key event".into(),
777 })?;
778 latest.insert(t.type_name.clone(), (t.records, root));
779 }
780 }
781 for (name, (signed_size, signed_root)) in latest {
782 let Some(reg) = self.registries.get_mut(&name) else {
783 continue; };
785 let current_size = reg.spine_size();
786 let current_root = reg.root();
787 let consistent = if signed_size == current_size {
788 signed_root == current_root
789 } else if signed_size < current_size {
790 let proof = reg.prove_consistency(signed_size)?;
791 crate::merkle::verify_consistency(
792 signed_size as usize,
793 current_size as usize,
794 &signed_root,
795 ¤t_root,
796 &proof.path,
797 )
798 } else {
799 false };
801 if !consistent {
802 return Err(Error::Corrupt {
803 segment: format!("registry/{name}"),
804 offset: 0,
805 reason: "registry tree is not consistent with the root signed by the latest \
806 re-key event — the registry was rewritten outside an audited re-key"
807 .into(),
808 });
809 }
810 }
811 Ok(())
812 }
813
814 pub fn checkpoint(&mut self) -> Result<Option<Checkpoint>> {
822 self.write_checkpoint()
823 }
824
825 pub fn checkpoints(&self) -> Result<Vec<Checkpoint>> {
827 self.checkpoints.read_all()
828 }
829
830 pub fn merkle_root(&self) -> Hash {
836 self.logs.root()
837 }
838
839 pub fn tree_size(&self) -> u64 {
842 self.logs.spine_size()
843 }
844
845 pub fn prove_inclusion(&mut self, log_id: Uid) -> Result<InclusionProof> {
852 let base = self.logs.purged_count();
853 let slices = self.logs.slices()?;
854 let mut offset = 0u64;
855 for slice in slices {
856 let mut reader = crate::storage::SegmentReader::open_bounded(&slice.path, slice.bound)?;
857 while let Some((_, payload)) = reader.next_record()? {
858 let log: AuditLog = bincode::deserialize(&payload)?;
859 if log.log_id == log_id {
860 return self.logs.prove_inclusion(base + offset);
861 }
862 offset += 1;
863 }
864 }
865 Err(Error::NotFound(format!(
866 "log {log_id} not found among retained records — cannot prove inclusion"
867 )))
868 }
869
870 pub fn prove_consistency(&mut self, first_size: u64) -> Result<ConsistencyProof> {
875 self.logs.prove_consistency(first_size)
876 }
877
878 fn write_checkpoint(&mut self) -> Result<Option<Checkpoint>> {
879 if !self.cfg.keys.can_sign() {
880 return Ok(None);
881 }
882 self.logs.sync()?;
886 let cp = Checkpoint::sign(
887 &self.cfg.keys,
888 crate::time::now_micros(),
889 self.logs.active_seq(),
890 self.logs.record_count(),
891 self.logs.spine_size(),
892 self.logs.root(),
893 )?;
894 self.checkpoints.append(&cp)?;
895 if let Some(hook) = &self.cfg.anchor {
896 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| hook(&cp)));
898 }
899 Ok(Some(cp))
900 }
901
902 fn verify_checkpoints(&mut self) -> Result<()> {
903 let cps = self.checkpoints.read_all()?;
904 let Some(latest) = cps.last() else {
905 return Ok(()); };
907 for cp in &cps {
908 cp.verify(&self.cfg.keys)?;
909 }
910 let current_size = self.logs.spine_size();
917 let current_root = self.logs.root();
918 let consistent = if latest.tree_size == current_size {
919 latest.merkle_root == current_root
920 } else if latest.tree_size < current_size {
921 let proof = self.logs.prove_consistency(latest.tree_size)?;
922 crate::merkle::verify_consistency(
923 latest.tree_size as usize,
924 current_size as usize,
925 &latest.merkle_root,
926 ¤t_root,
927 &proof.path,
928 )
929 } else {
930 false };
932 if !consistent {
933 return Err(Error::Corrupt {
934 segment: self.checkpoints.path().display().to_string(),
935 offset: 0,
936 reason: "latest checkpoint's merkle root is not consistent with the current \
937 tree — the log was rewritten or truncated after the checkpoint was signed"
938 .into(),
939 });
940 }
941 Ok(())
942 }
943
944 pub fn apply_retention(&mut self) -> Result<usize> {
950 let now = crate::time::now_micros();
951 let mut dropped_main = 0;
952 if let Some(cutoff) = self.cfg.retention.cutoff_micros(now) {
953 dropped_main += self.logs.purge_older_than(cutoff)?;
954 dropped_main += self.relations.purge_older_than(cutoff)?;
955 }
956 if let Some(budget) = self.cfg.retention.max_bytes {
957 dropped_main += self.purge_to_byte_budget(budget)?;
958 }
959 if dropped_main > 0 {
960 self.write_checkpoint()?;
964 }
965 let mut dropped = dropped_main;
968 if let Some(access) = self.access.as_mut() {
969 if let Some(cutoff) = self.cfg.access_retention.cutoff_micros(now) {
970 dropped += access.purge_older_than(cutoff)?;
971 }
972 }
973 Ok(dropped)
974 }
975
976 fn purge_to_byte_budget(&mut self, budget: u64) -> Result<usize> {
982 let mut total = self.logs.total_bytes()? + self.relations.total_bytes()?;
983 let mut dropped = 0usize;
984 while total > budget {
985 let from_logs = match (
986 self.logs.oldest_sealed_max_ts(),
987 self.relations.oldest_sealed_max_ts(),
988 ) {
989 (Some(l), Some(r)) => l <= r,
990 (Some(_), None) => true,
991 (None, Some(_)) => false,
992 (None, None) => break, };
994 let freed = if from_logs {
995 self.logs.purge_oldest_sealed()?
996 } else {
997 self.relations.purge_oldest_sealed()?
998 };
999 match freed {
1000 Some(bytes) => {
1001 total = total.saturating_sub(bytes);
1002 dropped += 1;
1003 }
1004 None => break,
1005 }
1006 }
1007 Ok(dropped)
1008 }
1009
1010 pub fn retained_bytes(&mut self) -> Result<u64> {
1014 Ok(self.logs.total_bytes()? + self.relations.total_bytes()?)
1015 }
1016
1017 pub fn type_schemas(&self) -> Vec<TypeSchema> {
1021 let mut out: Vec<TypeSchema> = self
1022 .registries
1023 .values()
1024 .map(|r| r.schema().clone())
1025 .collect();
1026 out.sort_by(|a, b| a.type_name.cmp(&b.type_name));
1027 out
1028 }
1029
1030 pub fn list_entities(
1032 &self,
1033 type_name: &str,
1034 include_deleted: bool,
1035 ) -> Result<Vec<TargetSnapshot>> {
1036 let reg = self
1037 .registries
1038 .get(type_name)
1039 .ok_or_else(|| Error::Schema(format!("type '{type_name}' has no registry table")))?;
1040 let mut ids: Vec<&String> = reg.idx.entity_ids().collect();
1041 ids.sort();
1042 Ok(ids
1043 .into_iter()
1044 .filter_map(|id| reg.latest(id))
1045 .filter(|rec| include_deleted || !rec.deleted)
1046 .map(snapshot_of)
1047 .collect())
1048 }
1049
1050 pub fn entity_history(&self, type_name: &str, entity_id: &str) -> Result<Vec<TargetSnapshot>> {
1053 let reg = self
1054 .registries
1055 .get(type_name)
1056 .ok_or_else(|| Error::Schema(format!("type '{type_name}' has no registry table")))?;
1057 let uids = reg.all_version_uids(entity_id);
1058 if uids.is_empty() {
1059 return Err(Error::NotFound(format!(
1060 "entity '{entity_id}' of type '{type_name}'"
1061 )));
1062 }
1063 Ok(uids
1064 .iter()
1065 .map(|uid| match reg.version(uid) {
1066 Some(rec) => snapshot_of(rec),
1067 None => missing_snapshot(type_name, uid),
1068 })
1069 .collect())
1070 }
1071
1072 pub fn snapshot(&mut self) -> Result<ReadSnapshot> {
1079 Ok(ReadSnapshot {
1080 keys: self.cfg.keys.clone(),
1081 registries: self
1082 .registries
1083 .iter()
1084 .map(|(k, v)| (k.clone(), v.idx.clone()))
1085 .collect(),
1086 logs: self.logs.slices()?,
1087 relations: self.relations.slices()?,
1088 })
1089 }
1090
1091 pub fn query(&mut self, q: &LogQuery) -> Result<Vec<LogView>> {
1102 self.query_as("local", q)
1103 }
1104
1105 pub fn query_as(&mut self, actor: &str, q: &LogQuery) -> Result<Vec<LogView>> {
1107 let hits = self.snapshot()?.query(q)?;
1108 if self.access.is_some() {
1109 self.record_access(AccessRecord::new(
1110 actor,
1111 "query_logs",
1112 crate::access::summarize_log_query(q),
1113 Some(hits.len() as u64),
1114 ))?;
1115 }
1116 Ok(hits)
1117 }
1118
1119 pub fn access_enabled(&self) -> bool {
1123 self.access.is_some()
1124 }
1125
1126 pub fn record_access(&mut self, rec: AccessRecord) -> Result<()> {
1131 let Some(access) = self.access.as_mut() else {
1132 return Ok(());
1133 };
1134 let ts = rec.timestamp;
1135 access.append(&rec, ts)?;
1136 access.flush()?;
1137 Ok(())
1138 }
1139
1140 pub fn access_records(&mut self, q: &AccessQuery) -> Result<Vec<AccessRecord>> {
1145 let Some(access) = self.access.as_mut() else {
1146 return Err(Error::Schema(
1147 "the access log is not enabled — set StoreConfig::access_log".into(),
1148 ));
1149 };
1150 let mut out = Vec::new();
1151 for rec in access.scan()? {
1152 let rec = rec?;
1153 if !q.matches(&rec) {
1154 continue;
1155 }
1156 out.push(rec);
1157 if q.limit.is_some_and(|n| out.len() >= n) {
1158 break;
1159 }
1160 }
1161 Ok(out)
1162 }
1163
1164 pub fn query_page(&mut self, q: &LogQuery) -> Result<QueryPage> {
1167 self.snapshot()?.query_page(q)
1168 }
1169
1170 pub fn count(&mut self, q: &LogQuery) -> Result<u64> {
1173 self.snapshot()?.count(q)
1174 }
1175
1176 pub fn decrypt(&self, v: &crate::model::StoredValue) -> Result<Vec<u8>> {
1178 self.cfg.keys.decrypt(v)
1179 }
1180}
1181
1182fn rekey_record(
1187 mut rec: RegistryRecord,
1188 schema: &TypeSchema,
1189 keys: &KeyRing,
1190) -> Result<RegistryRecord> {
1191 let names: Vec<String> = rec
1192 .fields
1193 .iter()
1194 .filter(|(_, v)| matches!(v, StoredValue::Rsa { .. }))
1195 .map(|(k, _)| k.clone())
1196 .collect();
1197 for name in names {
1198 let rewrapped = keys.rewrap(&rec.fields[&name])?;
1199 if let Some(def) = schema.field(&name) {
1200 if def.search != FieldIndex::None {
1201 let plaintext = keys.decrypt(&rewrapped)?;
1202 let text = String::from_utf8_lossy(&plaintext).into_owned();
1203 let mut key_version = KEYLESS;
1204 let mut digests = Vec::new();
1205 for t in crate::tokens::value_tokens(&text, def.search) {
1206 let (v, d) = keys.index_token_digest(&def.name, def.protection, &t)?;
1207 key_version = v;
1208 digests.push(d);
1209 }
1210 rec.tokens.insert(
1211 name.clone(),
1212 FieldTokens {
1213 key_version,
1214 digests,
1215 },
1216 );
1217 }
1218 }
1219 rec.fields.insert(name, rewrapped);
1220 }
1221 Ok(rec)
1222}
1223
1224fn snapshot_of(rec: &crate::registry::RegistryRecord) -> TargetSnapshot {
1225 TargetSnapshot {
1226 entity_registry_uid: rec.uid,
1227 entity_type: rec.entity_type.clone(),
1228 entity_id: rec.entity_id.clone(),
1229 version: rec.version,
1230 fields: rec.fields.clone(),
1231 deleted: rec.deleted,
1232 missing: false,
1233 }
1234}
1235
1236fn missing_snapshot(entity_type: &str, uid: &Uid) -> TargetSnapshot {
1237 TargetSnapshot {
1238 entity_registry_uid: *uid,
1239 entity_type: entity_type.to_string(),
1240 entity_id: String::new(),
1241 version: 0,
1242 fields: BTreeMap::new(),
1243 deleted: false,
1244 missing: true,
1245 }
1246}
1247
1248pub struct ReadSnapshot {
1255 keys: KeyRing,
1256 registries: HashMap<String, RegistryIndex>,
1257 logs: Vec<SegmentSlice>,
1258 relations: Vec<SegmentSlice>,
1259}
1260
1261struct ResolvedFilters {
1265 allowed_by_target: Option<HashSet<Uid>>,
1266 allowed_actor_uids: Option<HashSet<Uid>>,
1267}
1268
1269impl ResolvedFilters {
1270 fn matches(&self, q: &LogQuery, log: &AuditLog) -> bool {
1271 if let Some(m) = &q.method {
1274 if !log.method.eq_ignore_ascii_case(m) {
1275 return false;
1276 }
1277 }
1278 if let Some(p) = &q.url_prefix {
1279 if !log.url.starts_with(p.as_str()) {
1280 return false;
1281 }
1282 }
1283 if let Some(allowed) = &self.allowed_by_target {
1284 if !allowed.contains(&log.log_id) {
1285 return false;
1286 }
1287 }
1288 if let Some(uids) = &self.allowed_actor_uids {
1289 if !uids.contains(&log.actor) {
1290 return false;
1291 }
1292 }
1293 q.custom.iter().all(|(k, v)| log.custom.get(k) == Some(v))
1294 }
1295}
1296
1297impl ReadSnapshot {
1298 pub fn query(&mut self, q: &LogQuery) -> Result<Vec<LogView>> {
1302 Ok(self.query_page(q)?.logs)
1303 }
1304
1305 pub fn query_page(&mut self, q: &LogQuery) -> Result<QueryPage> {
1315 let filters = self.resolve_filters(q)?;
1316 let mut scan = self.log_scan(q)?;
1317 let mut hits: Vec<(Position, AuditLog)> = Vec::new();
1318 let mut more = false;
1319 while let Some((pos, log)) = scan.next_row()? {
1320 if !filters.matches(q, &log) {
1321 continue;
1322 }
1323 if q.limit.is_some_and(|limit| hits.len() >= limit) {
1324 more = true;
1326 break;
1327 }
1328 hits.push((pos, log));
1329 }
1330 let next_cursor = match (more, hits.last()) {
1331 (true, Some((pos, _))) => Some(crate::query::encode_cursor(q.order, *pos)),
1332 _ => None,
1333 };
1334
1335 let wanted: HashSet<Uid> = hits.iter().map(|(_, log)| log.log_id).collect();
1338 let mut rels_by_log: HashMap<Uid, Vec<TargetRelation>> = HashMap::new();
1339 if !wanted.is_empty() {
1340 let mut rels: PositionedScan<TargetRelation> = PositionedScan::new(
1341 self.relations.clone(),
1342 false,
1343 q.from_micros,
1344 q.to_micros,
1345 None,
1346 );
1347 while let Some((_, rel)) = rels.next_row()? {
1348 if wanted.contains(&rel.log_id) {
1349 rels_by_log.entry(rel.log_id).or_default().push(rel);
1350 }
1351 }
1352 }
1353 Ok(QueryPage {
1354 logs: hits
1355 .into_iter()
1356 .map(|(_, log)| self.render(log, &rels_by_log))
1357 .collect(),
1358 next_cursor,
1359 segments_scanned: scan.segments_opened(),
1360 })
1361 }
1362
1363 pub fn count(&mut self, q: &LogQuery) -> Result<u64> {
1368 let filters = self.resolve_filters(q)?;
1369 let mut q = q.clone();
1370 q.cursor = None;
1371 q.order = Order::Asc; let mut scan = self.log_scan(&q)?;
1373 let mut n = 0u64;
1374 while let Some((_, log)) = scan.next_row()? {
1375 if filters.matches(&q, &log) {
1376 n += 1;
1377 }
1378 }
1379 Ok(n)
1380 }
1381
1382 fn resolve_filters(&mut self, q: &LogQuery) -> Result<ResolvedFilters> {
1385 let mut allowed_by_target: Option<HashSet<Uid>> = None;
1386 for f in &q.targets {
1387 let ids = self.log_ids_for_filter(f, q.from_micros, q.to_micros)?;
1388 allowed_by_target = Some(match allowed_by_target {
1389 Some(prev) => prev.intersection(&ids).copied().collect(),
1390 None => ids,
1391 });
1392 }
1393 let allowed_actor_uids: Option<HashSet<Uid>> = match &q.actor {
1394 Some(f) => Some(self.version_uids_for_filter(f)?),
1395 None => None,
1396 };
1397 Ok(ResolvedFilters {
1398 allowed_by_target,
1399 allowed_actor_uids,
1400 })
1401 }
1402
1403 fn log_scan(&self, q: &LogQuery) -> Result<PositionedScan<AuditLog>> {
1405 let after = match &q.cursor {
1406 Some(c) => Some(crate::query::decode_cursor(c, q.order)?),
1407 None => None,
1408 };
1409 Ok(PositionedScan::new(
1410 self.logs.clone(),
1411 q.order == Order::Desc,
1412 q.from_micros,
1413 q.to_micros,
1414 after,
1415 ))
1416 }
1417
1418 fn log_ids_for_filter(
1424 &mut self,
1425 f: &TargetFilter,
1426 from: Option<u64>,
1427 to: Option<u64>,
1428 ) -> Result<HashSet<Uid>> {
1429 let version_uids = self.version_uids_for_filter(f)?;
1430 let mut out = HashSet::new();
1431 let mut rels: PositionedScan<TargetRelation> =
1432 PositionedScan::new(self.relations.clone(), false, from, to, None);
1433 while let Some((_, rel)) = rels.next_row()? {
1434 if version_uids.contains(&rel.entity_registry_uid) {
1435 out.insert(rel.log_id);
1436 }
1437 }
1438 Ok(out)
1439 }
1440
1441 fn version_uids_for_filter(&mut self, f: &TargetFilter) -> Result<HashSet<Uid>> {
1443 let reg = self.registries.get_mut(&f.entity_type).ok_or_else(|| {
1444 Error::Schema(format!("type '{}' has no registry table", f.entity_type))
1445 })?;
1446 let entity_ids = reg.search(&f.field, &f.value, f.include_past, f.mode, &self.keys)?;
1447 let mut uids = HashSet::new();
1448 for id in entity_ids {
1449 uids.extend(reg.all_version_uids(&id).iter().copied());
1450 }
1451 Ok(uids)
1452 }
1453
1454 fn render(&self, log: AuditLog, rels_by_log: &HashMap<Uid, Vec<TargetRelation>>) -> LogView {
1455 let actor = self.snapshot(&log.actor_type, &log.actor);
1456 let mut targets = Vec::new();
1457 if let Some(rels) = rels_by_log.get(&log.log_id) {
1458 for rel in rels {
1459 targets.push(self.snapshot(&rel.entity_type, &rel.entity_registry_uid));
1460 }
1461 }
1462 LogView {
1463 log_id: log.log_id,
1464 timestamp_micros: log.timestamp,
1465 timestamp: crate::time::format_rfc3339(log.timestamp),
1466 actor,
1467 method: log.method,
1468 url: log.url,
1469 content: log.content,
1470 targets,
1471 custom: log.custom,
1472 }
1473 }
1474
1475 fn snapshot(&self, entity_type: &str, uid: &Uid) -> TargetSnapshot {
1479 match self
1480 .registries
1481 .get(entity_type)
1482 .and_then(|r| r.version(uid))
1483 {
1484 Some(rec) => snapshot_of(rec),
1485 None => missing_snapshot(entity_type, uid),
1486 }
1487 }
1488}