Skip to main content

quipu_core/
store.rs

1use crate::access::{AccessQuery, AccessRecord, RESERVED_TYPE_PREFIX};
2use crate::checkpoint::{Checkpoint, CheckpointLog};
3use crate::crypto::{self, KeyRing, KeyVersion, KEYLESS};
4use crate::error::{Error, Result};
5use crate::id::Uid;
6use crate::merkle::Hash;
7use crate::merkle_log::{ConsistencyProof, InclusionProof};
8use crate::model::{AuditLog, Content, StoredValue, TargetRelation, Value};
9use crate::query::{LogQuery, LogView, Order, QueryPage, TargetFilter, TargetSnapshot};
10use crate::registry::{EntityInput, FieldTokens, RegistryIndex, RegistryRecord, TypeRegistry};
11use crate::retention::RetentionPolicy;
12use crate::schema::{CustomColumnDef, FieldIndex, TypeSchema};
13use crate::storage::{rewrite_table, Position, PositionedScan, SegmentSlice, Table};
14use serde::{Deserialize, Serialize};
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::path::PathBuf;
17use std::sync::Arc;
18
19/// External-anchor callback — receives every checkpoint right after it is
20/// persisted. `Arc` (not `Box`) because [`StoreConfig`] is `Clone`.
21pub type AnchorHook = Arc<dyn Fn(&Checkpoint) + Send + Sync>;
22
23/// Durability/throughput trade-off for log appends.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum SyncPolicy {
26    /// fsync after every append. Safest, slowest.
27    Always,
28    /// fsync after every N appends; otherwise only flush to the OS cache.
29    EveryN(u32),
30    /// Never fsync explicitly; rely on the OS to write back. Fastest.
31    OsManaged,
32}
33
34#[derive(Clone)]
35pub struct StoreConfig {
36    pub root: PathBuf,
37    pub max_segment_bytes: u64,
38    pub sync_policy: SyncPolicy,
39    pub retention: RetentionPolicy,
40    pub keys: KeyRing,
41    /// Opt-in for LIKE ([`crate::MatchMode::Contains`]) search on *protected*
42    /// fields: plaintexts of protected values written by this process are
43    /// held in memory (never persisted), and RSA values are decrypted and
44    /// cached per immutable version on first Contains search. Off by
45    /// default — keeping plaintexts of protected fields in memory is a
46    /// deliberate trade-off the operator must choose; when off, Contains on
47    /// protected fields is rejected (plain fields are always LIKE-searchable).
48    pub plaintext_cache: bool,
49    /// Called with each integrity checkpoint after it is persisted, so its
50    /// chain head can be exported to an external trust domain (another host,
51    /// a ticket, a transparency log) that a disk-level insider cannot rewrite.
52    /// Errors and panics inside the hook are swallowed: availability of the
53    /// write path outranks anchoring — export delivery is the hook's job.
54    pub anchor: Option<AnchorHook>,
55    /// Opt-in meta-audit: record reads and admin actions against the audit
56    /// store itself in a dedicated access-log table (`root/access/`) — see
57    /// [`crate::access`]. Off by default.
58    pub access_log: bool,
59    /// Retention for the access-log table, independent of the main `retention`
60    /// window. Access records are often kept *shorter* than the audit data
61    /// they describe; the split is possible because the access log lives in
62    /// its own table (retention drops whole segments per table).
63    pub access_retention: RetentionPolicy,
64}
65
66impl StoreConfig {
67    pub fn new(root: impl Into<PathBuf>) -> Self {
68        Self {
69            root: root.into(),
70            max_segment_bytes: 64 * 1024 * 1024,
71            sync_policy: SyncPolicy::EveryN(64),
72            retention: RetentionPolicy::keep_forever(),
73            keys: KeyRing::new(),
74            plaintext_cache: false,
75            anchor: None,
76            access_log: false,
77            access_retention: RetentionPolicy::keep_forever(),
78        }
79    }
80
81    /// Enable the meta-audit access log — see
82    /// [`access_log`](Self::access_log).
83    pub fn access_log(mut self, enabled: bool) -> Self {
84        self.access_log = enabled;
85        self
86    }
87
88    /// Retention window for the access-log table — see
89    /// [`access_retention`](Self::access_retention).
90    pub fn access_retention(mut self, r: RetentionPolicy) -> Self {
91        self.access_retention = r;
92        self
93    }
94
95    /// Register the external-anchor hook — see [`anchor`](Self::anchor).
96    pub fn anchor(mut self, hook: impl Fn(&Checkpoint) + Send + Sync + 'static) -> Self {
97        self.anchor = Some(Arc::new(hook));
98        self
99    }
100
101    /// Enable the in-memory plaintext cache — see
102    /// [`plaintext_cache`](Self::plaintext_cache).
103    pub fn plaintext_cache(mut self, enabled: bool) -> Self {
104        self.plaintext_cache = enabled;
105        self
106    }
107
108    pub fn retention(mut self, r: RetentionPolicy) -> Self {
109        self.retention = r;
110        self
111    }
112
113    pub fn sync_policy(mut self, p: SyncPolicy) -> Self {
114        self.sync_policy = p;
115        self
116    }
117
118    pub fn keys(mut self, k: KeyRing) -> Self {
119        self.keys = k;
120        self
121    }
122
123    pub fn max_segment_bytes(mut self, n: u64) -> Self {
124        self.max_segment_bytes = n;
125        self
126    }
127}
128
129/// Persisted meta events: schema and custom-column definitions are themselves
130/// stored in an append-only table and replayed on open.
131///
132/// bincode encodes the variant *index*, so new variants may only be appended
133/// at the end.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135enum MetaEvent {
136    TypeDefined(TypeSchema),
137    CustomColumnDefined(CustomColumnDef),
138    Rekeyed(RekeyEvent),
139}
140
141/// Domain separation for re-key event signatures — never confusable with a
142/// checkpoint signature made by the same key.
143const REKEY_SIGNING_DOMAIN: &[u8] = b"quipu-rekey-v2\0";
144
145/// The signed, persisted record of one [`AuditStore::rekey`] pass.
146///
147/// Re-keying rewrites registry tables, which necessarily produces a fresh
148/// Merkle spine — exactly what tampering looks like. This event is what makes
149/// the rewrite *audited* instead: it is appended to the meta table and signs the
150/// old-root → new-root transition of every rewritten registry with the active
151/// RSA key. [`AuditStore::verify_integrity`] then checks that each registry's
152/// current tree is consistent with the root the latest re-key event signed —
153/// a registry rewritten outside this path contradicts the signature.
154#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
155pub struct RekeyEvent {
156    /// UTC micros at re-key time.
157    pub occurred_at: u64,
158    /// RSA key version values were re-wrapped to (the active one).
159    pub rsa_version: KeyVersion,
160    /// HMAC key version recomputed index tokens were digested with
161    /// ([`KEYLESS`] when the ring holds no HMAC key).
162    pub hmac_version: KeyVersion,
163    pub tables: Vec<RekeyedTable>,
164    /// Version of the RSA key that signed this event.
165    pub signing_key_version: KeyVersion,
166    /// RSA PKCS#1 v1.5 / SHA-256 signature over the fields above.
167    pub signature: Vec<u8>,
168}
169
170/// One registry table rewritten by a re-key pass: the signed Merkle-root
171/// transition.
172#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
173pub struct RekeyedTable {
174    pub type_name: String,
175    /// Tree size of the rewritten registry (records carried over). The new root
176    /// is taken over exactly this many leaves, so verification proves the
177    /// post-rewrite root is a prefix of the (possibly extended) current tree.
178    pub records: u64,
179    /// Hex Merkle root before the rewrite (the tree this event retires).
180    pub old_root: String,
181    /// Hex Merkle root after the rewrite (the tree this event vouches for).
182    pub new_root: String,
183}
184
185fn rekey_signing_bytes(
186    occurred_at: u64,
187    rsa_version: KeyVersion,
188    hmac_version: KeyVersion,
189    tables: &[RekeyedTable],
190) -> Vec<u8> {
191    let mut out = Vec::with_capacity(REKEY_SIGNING_DOMAIN.len() + 16 + tables.len() * 160);
192    out.extend_from_slice(REKEY_SIGNING_DOMAIN);
193    out.extend_from_slice(&occurred_at.to_le_bytes());
194    out.extend_from_slice(&rsa_version.to_le_bytes());
195    out.extend_from_slice(&hmac_version.to_le_bytes());
196    for t in tables {
197        out.extend_from_slice(t.type_name.as_bytes());
198        out.push(0);
199        out.extend_from_slice(&t.records.to_le_bytes());
200        out.extend_from_slice(t.old_root.as_bytes());
201        out.push(0);
202        out.extend_from_slice(t.new_root.as_bytes());
203        out.push(0);
204    }
205    out
206}
207
208/// The embedded audit store. Owns every table under one root directory:
209///
210/// ```text
211/// root/
212///   meta/         type schemas + custom column registry (replayed on open)
213///   logs/         AuditLog rows
214///   relations/    log -> target-entity-version relations
215///   registry/<t>/ one versioned registry table per entity/actor type
216/// ```
217pub struct AuditStore {
218    cfg: StoreConfig,
219    meta: Table<MetaEvent>,
220    logs: Table<AuditLog>,
221    relations: Table<TargetRelation>,
222    /// Meta-audit table (`Some` iff [`StoreConfig::access_log`]) — see
223    /// [`crate::access`]. Kept apart from `logs` on purpose: independent
224    /// retention, and an access append can never recurse into a query.
225    access: Option<Table<AccessRecord>>,
226    registries: HashMap<String, TypeRegistry>,
227    custom_columns: HashMap<String, CustomColumnDef>,
228    checkpoints: CheckpointLog,
229    appends_since_sync: u32,
230    /// Advisory OS lock on `root/LOCK`, held for the store's lifetime. The
231    /// store is single-process by design; without this, a second process
232    /// opening the same root would silently corrupt in-memory indexes and
233    /// interleave segment writes. Released automatically on drop/crash.
234    _lock: std::fs::File,
235}
236
237impl AuditStore {
238    pub fn open(cfg: StoreConfig) -> Result<Self> {
239        std::fs::create_dir_all(&cfg.root)?;
240        let lock = std::fs::OpenOptions::new()
241            .create(true)
242            .truncate(false)
243            .write(true)
244            .open(cfg.root.join("LOCK"))?;
245        lock.try_lock().map_err(|e| match e {
246            std::fs::TryLockError::WouldBlock => Error::Locked(cfg.root.display().to_string()),
247            std::fs::TryLockError::Error(e) => Error::Io(e),
248        })?;
249        let mut meta: Table<MetaEvent> =
250            Table::open(&cfg.root.join("meta"), cfg.max_segment_bytes)?;
251        let logs = Table::open(&cfg.root.join("logs"), cfg.max_segment_bytes)?;
252        let relations = Table::open(&cfg.root.join("relations"), cfg.max_segment_bytes)?;
253        let access = if cfg.access_log {
254            Some(Table::open(
255                &cfg.root.join("access"),
256                cfg.max_segment_bytes,
257            )?)
258        } else {
259            None
260        };
261
262        let mut registries = HashMap::new();
263        let mut custom_columns = HashMap::new();
264        let events: Vec<MetaEvent> = meta.scan()?.collect::<Result<Vec<_>>>()?;
265        for ev in events {
266            match ev {
267                MetaEvent::TypeDefined(schema) => {
268                    // last definition wins (allows additive schema evolution)
269                    let dir = cfg.root.join("registry").join(&schema.type_name);
270                    let reg = TypeRegistry::open(
271                        &dir,
272                        schema.clone(),
273                        cfg.max_segment_bytes,
274                        cfg.plaintext_cache,
275                    )?;
276                    registries.insert(schema.type_name.clone(), reg);
277                }
278                MetaEvent::CustomColumnDefined(def) => {
279                    custom_columns.insert(def.name.clone(), def);
280                }
281                // re-key events are audit records, not state to replay; they
282                // are read back during verify_integrity()
283                MetaEvent::Rekeyed(_) => {}
284            }
285        }
286        let checkpoints = CheckpointLog::new(&cfg.root);
287        Ok(Self {
288            cfg,
289            meta,
290            logs,
291            relations,
292            access,
293            registries,
294            custom_columns,
295            checkpoints,
296            appends_since_sync: 0,
297            _lock: lock,
298        })
299    }
300
301    // ---- schema management -------------------------------------------------
302
303    /// Create (or redefine) the registry table for an entity/actor type.
304    /// Must be called before entities of that type are registered — this is
305    /// the "create the registry table first" step of the write protocol.
306    ///
307    /// Redefinition is additive only: new fields may be added, but existing
308    /// fields cannot be removed or change kind/protection. A protection
309    /// change would split the search index keys (old values silently stop
310    /// matching probes), breaking the "past values stay searchable" promise.
311    pub fn define_type(&mut self, schema: TypeSchema) -> Result<()> {
312        if schema.type_name.starts_with(RESERVED_TYPE_PREFIX) {
313            return Err(Error::Schema(format!(
314                "type name '{}' uses the reserved prefix '{RESERVED_TYPE_PREFIX}' — that \
315                 namespace belongs to quipu's internal record kinds (e.g. the \
316                 '{}' meta-audit type)",
317                schema.type_name,
318                crate::access::ACCESS_TYPE
319            )));
320        }
321        if let Some(existing) = self.registries.get(&schema.type_name) {
322            for old in &existing.schema().fields {
323                match schema.field(&old.name) {
324                    None => {
325                        return Err(Error::Schema(format!(
326                            "type '{}': field '{}' cannot be removed — recorded data still \
327                             references it",
328                            schema.type_name, old.name
329                        )));
330                    }
331                    Some(new) if new.kind != old.kind || new.protection != old.protection => {
332                        return Err(Error::Schema(format!(
333                            "type '{}': field '{}' cannot change kind/protection — existing \
334                             values would become unsearchable or unreadable",
335                            schema.type_name, old.name
336                        )));
337                    }
338                    Some(new) if new.search != old.search => {
339                        return Err(Error::Schema(format!(
340                            "type '{}': field '{}' cannot change its FieldIndex — records \
341                             written under the old index carry no tokens for the new one \
342                             and would silently stop matching",
343                            schema.type_name, old.name
344                        )));
345                    }
346                    _ => {}
347                }
348            }
349        }
350        let dir = self.cfg.root.join("registry").join(&schema.type_name);
351        let reg = TypeRegistry::open(
352            &dir,
353            schema.clone(),
354            self.cfg.max_segment_bytes,
355            self.cfg.plaintext_cache,
356        )?;
357        self.meta.append(
358            &MetaEvent::TypeDefined(schema.clone()),
359            crate::time::now_micros(),
360        )?;
361        self.meta.sync()?;
362        self.registries.insert(schema.type_name.clone(), reg);
363        Ok(())
364    }
365
366    pub fn has_type(&self, type_name: &str) -> bool {
367        self.registries.contains_key(type_name)
368    }
369
370    /// Register an extra audit-log column (text / number / json). For
371    /// `required` columns, the requirement only applies to events that occur
372    /// from now on — see [`CustomColumnDef::required_since`].
373    pub fn define_custom_column(&mut self, mut def: CustomColumnDef) -> Result<()> {
374        if def.required && def.required_since.is_none() {
375            def.required_since = Some(crate::time::now_micros());
376        }
377        self.meta.append(
378            &MetaEvent::CustomColumnDefined(def.clone()),
379            crate::time::now_micros(),
380        )?;
381        self.meta.sync()?;
382        self.custom_columns.insert(def.name.clone(), def);
383        Ok(())
384    }
385
386    pub fn custom_columns(&self) -> impl Iterator<Item = &CustomColumnDef> {
387        self.custom_columns.values()
388    }
389
390    // ---- registry operations -----------------------------------------------
391
392    fn registry(&mut self, type_name: &str) -> Result<&mut TypeRegistry> {
393        self.registries.get_mut(type_name).ok_or_else(|| {
394            Error::Schema(format!(
395                "type '{type_name}' has no registry table — call define_type() first"
396            ))
397        })
398    }
399
400    /// Register/update an entity and get the uid of its current version.
401    pub fn register_entity(&mut self, type_name: &str, input: &EntityInput) -> Result<Uid> {
402        let keys = self.cfg.keys.clone();
403        self.registry(type_name)?.upsert(input, &keys)
404    }
405
406    /// Update is the same operation as register: changed fields produce a new
407    /// version, old versions stay queryable (old-name search keeps working).
408    pub fn update_entity(&mut self, type_name: &str, input: &EntityInput) -> Result<Uid> {
409        self.register_entity(type_name, input)
410    }
411
412    pub fn delete_entity(&mut self, type_name: &str, entity_id: &str) -> Result<Uid> {
413        self.registry(type_name)?.delete(entity_id)
414    }
415
416    pub fn entity_latest(&self, type_name: &str, entity_id: &str) -> Option<&RegistryRecord> {
417        self.registries.get(type_name)?.latest(entity_id)
418    }
419
420    // ---- writing logs --------------------------------------------------------
421
422    /// Append one audit log following the full write protocol:
423    /// 1. the actor is upserted into its type registry,
424    /// 2. every target is upserted into its type registry,
425    /// 3. the log row is appended with the actor's version uid,
426    /// 4. one relation row per target binds the log to the exact entity versions.
427    ///
428    /// Registry tables for every referenced type must already exist.
429    #[allow(clippy::too_many_arguments)]
430    pub fn append(
431        &mut self,
432        actor_type: &str,
433        actor: &EntityInput,
434        method: &str,
435        url: &str,
436        content: Content,
437        targets: &[(String, EntityInput)],
438        custom: BTreeMap<String, Value>,
439    ) -> Result<Uid> {
440        self.append_at(
441            crate::time::now_micros(),
442            actor_type,
443            actor,
444            method,
445            url,
446            content,
447            targets,
448            custom,
449        )
450    }
451
452    /// [`append`](Self::append) with an explicit event time — for events that
453    /// occurred earlier than they are persisted (async pipelines, DLQ
454    /// redrive). Validation of required custom columns uses this time, so a
455    /// column made required *after* the event happened does not reject it.
456    #[allow(clippy::too_many_arguments)]
457    pub fn append_at(
458        &mut self,
459        occurred_at: u64,
460        actor_type: &str,
461        actor: &EntityInput,
462        method: &str,
463        url: &str,
464        content: Content,
465        targets: &[(String, EntityInput)],
466        custom: BTreeMap<String, Value>,
467    ) -> Result<Uid> {
468        self.validate_custom(&custom, occurred_at)?;
469        let actor_uid = self.register_entity(actor_type, actor)?;
470        let mut target_refs = Vec::with_capacity(targets.len());
471        for (t_type, t_input) in targets {
472            let uid = self.register_entity(t_type, t_input)?;
473            target_refs.push((t_type.clone(), uid));
474        }
475        self.append_resolved_at(
476            occurred_at,
477            actor_type,
478            actor_uid,
479            method,
480            url,
481            content,
482            &target_refs,
483            custom,
484        )
485    }
486
487    /// Lower-level append for callers that already hold registry version uids.
488    #[allow(clippy::too_many_arguments)]
489    pub fn append_resolved(
490        &mut self,
491        actor_type: &str,
492        actor_uid: Uid,
493        method: &str,
494        url: &str,
495        content: Content,
496        targets: &[(String, Uid)],
497        custom: BTreeMap<String, Value>,
498    ) -> Result<Uid> {
499        self.append_resolved_at(
500            crate::time::now_micros(),
501            actor_type,
502            actor_uid,
503            method,
504            url,
505            content,
506            targets,
507            custom,
508        )
509    }
510
511    /// See [`append_at`](Self::append_at).
512    #[allow(clippy::too_many_arguments)]
513    pub fn append_resolved_at(
514        &mut self,
515        occurred_at: u64,
516        actor_type: &str,
517        actor_uid: Uid,
518        method: &str,
519        url: &str,
520        content: Content,
521        targets: &[(String, Uid)],
522        custom: BTreeMap<String, Value>,
523    ) -> Result<Uid> {
524        self.validate_custom(&custom, occurred_at)?;
525        let log = AuditLog {
526            log_id: Uid::generate(),
527            timestamp: occurred_at,
528            actor: actor_uid,
529            actor_type: actor_type.to_string(),
530            method: method.to_string(),
531            url: url.to_string(),
532            content,
533            custom,
534        };
535        let seq_before = self.logs.active_seq();
536        self.logs.append(&log, log.timestamp)?;
537        let sealed_a_segment = self.logs.active_seq() != seq_before;
538        for (entity_type, uid) in targets {
539            let rel = TargetRelation {
540                log_id: log.log_id,
541                entity_registry_uid: *uid,
542                entity_type: entity_type.clone(),
543            };
544            self.relations.append(&rel, log.timestamp)?;
545        }
546        self.apply_sync_policy()?;
547        if sealed_a_segment {
548            // checkpoint on segment seal, not on flush/sync: a seal is when a
549            // chain prefix becomes immutable, and its frequency is bounded by
550            // segment size instead of putting an RSA signing operation on the
551            // every-N-appends sync path
552            self.write_checkpoint()?;
553        }
554        Ok(log.log_id)
555    }
556
557    fn validate_custom(&self, custom: &BTreeMap<String, Value>, occurred_at: u64) -> Result<()> {
558        for (name, value) in custom {
559            let def = self.custom_columns.get(name).ok_or_else(|| {
560                Error::Schema(format!(
561                    "custom column '{name}' is not registered — call define_custom_column()"
562                ))
563            })?;
564            if value.kind() != def.kind {
565                return Err(Error::Schema(format!(
566                    "custom column '{name}' expects {:?}, got {:?}",
567                    def.kind,
568                    value.kind()
569                )));
570            }
571        }
572        for def in self.custom_columns.values() {
573            let in_force = def.required_since.is_none_or(|since| occurred_at >= since);
574            if def.required && in_force && !custom.contains_key(&def.name) {
575                return Err(Error::Schema(format!(
576                    "missing required custom column '{}'",
577                    def.name
578                )));
579            }
580        }
581        Ok(())
582    }
583
584    fn apply_sync_policy(&mut self) -> Result<()> {
585        match self.cfg.sync_policy {
586            SyncPolicy::Always => self.sync_all()?,
587            SyncPolicy::EveryN(n) => {
588                self.appends_since_sync += 1;
589                if self.appends_since_sync >= n {
590                    self.sync_all()?;
591                } else {
592                    self.logs.flush()?;
593                    self.relations.flush()?;
594                }
595            }
596            SyncPolicy::OsManaged => {
597                self.logs.flush()?;
598                self.relations.flush()?;
599            }
600        }
601        Ok(())
602    }
603
604    /// fsync in dependency order: a log row must never be durable while the
605    /// registry version it references is not — a crash would otherwise leave
606    /// logs pointing at registry records that evaporated.
607    fn sync_all(&mut self) -> Result<()> {
608        for reg in self.registries.values_mut() {
609            reg.sync()?;
610        }
611        self.logs.sync()?;
612        self.relations.sync()?;
613        self.appends_since_sync = 0;
614        Ok(())
615    }
616
617    /// Force everything to durable storage.
618    pub fn sync(&mut self) -> Result<()> {
619        self.sync_all()?;
620        self.meta.sync()?;
621        if let Some(access) = self.access.as_mut() {
622            access.sync()?;
623        }
624        Ok(())
625    }
626
627    /// Verify the tamper-evidence hash chains of every table (logs, relations,
628    /// meta, all registries). Returns the first chain break found — a record
629    /// that was modified in place, or a segment that was removed/replaced.
630    ///
631    /// If signed checkpoints exist (see [`checkpoint`](Self::checkpoint)),
632    /// they are verified too: every checkpoint signature must check out
633    /// against the public key, and the latest checkpoint's chain head must
634    /// still exist in the log chain. That extends detection to attacks the
635    /// chain alone cannot see — deleting everything and rewriting a
636    /// self-consistent chain from scratch, or truncating the newest records.
637    pub fn verify_integrity(&mut self) -> Result<()> {
638        self.logs.verify()?;
639        self.relations.verify()?;
640        self.meta.verify()?;
641        if let Some(access) = self.access.as_mut() {
642            access.verify()?;
643        }
644        for reg in self.registries.values_mut() {
645            reg.verify()?;
646        }
647        self.verify_checkpoints()?;
648        self.verify_rekey_events()
649    }
650
651    // ---- key rotation / re-key ----------------------------------------------
652
653    /// Re-wrap every RSA-protected registry value under the *active* RSA key
654    /// and re-digest the blind-index tokens of RSA fields under the *active*
655    /// HMAC key — the offline tail of a key rotation, so retired private keys
656    /// can actually be destroyed.
657    ///
658    /// Requirements: the ring must hold the private key of every RSA version
659    /// still referenced by stored values (to unwrap the old data keys) *and*
660    /// the active version's private key (this pass is signed). The store
661    /// must be otherwise idle — run it from the maintenance CLI, not a
662    /// serving process.
663    ///
664    /// What it cannot do: HMAC digests are one-way and the plaintext is not
665    /// on disk, so HMAC-protected fields keep their recorded key version —
666    /// retain old HMAC keys (read-side) for search, and treat a leaked HMAC
667    /// key as having exposed the digests written under it.
668    ///
669    /// Each registry table is rewritten into a fresh hash chain; the
670    /// old-head → new-head transition of every table is recorded and signed
671    /// in a [`RekeyEvent`] on the meta table, which
672    /// [`verify_integrity`](Self::verify_integrity) checks from then on —
673    /// that is what keeps an audited re-key distinguishable from tampering.
674    pub fn rekey(&mut self) -> Result<RekeyEvent> {
675        let keys = self.cfg.keys.clone();
676        let rsa_version = keys.active_rsa_version().ok_or_else(|| {
677            Error::Crypto("re-key: no RSA key in the ring — nothing to re-wrap to".into())
678        })?;
679        if !keys.can_sign() {
680            return Err(Error::Crypto(
681                "re-key requires the active RSA private key (the re-key event is signed)".into(),
682            ));
683        }
684        let hmac_version = keys.active_hmac_version().unwrap_or(KEYLESS);
685
686        let mut names: Vec<String> = self.registries.keys().cloned().collect();
687        names.sort();
688        let mut tables = Vec::with_capacity(names.len());
689        for name in names {
690            // drop the open registry (its table holds open file handles)
691            // before rewriting its directory, then reopen on the new chain
692            let reg = self.registries.remove(&name).expect("name was just listed");
693            let schema = reg.schema().clone();
694            drop(reg);
695            let dir = self.cfg.root.join("registry").join(&name);
696            let stats = rewrite_table::<RegistryRecord>(&dir, self.cfg.max_segment_bytes, |rec| {
697                rekey_record(rec, &schema, &keys)
698            })?;
699            let reopened = TypeRegistry::open(
700                &dir,
701                schema,
702                self.cfg.max_segment_bytes,
703                self.cfg.plaintext_cache,
704            )?;
705            self.registries.insert(name.clone(), reopened);
706            tables.push(RekeyedTable {
707                type_name: name,
708                records: stats.records,
709                old_root: crypto::hex(&stats.old_root),
710                new_root: crypto::hex(&stats.new_root),
711            });
712        }
713
714        let occurred_at = crate::time::now_micros();
715        let (signing_key_version, signature) = keys.sign(&rekey_signing_bytes(
716            occurred_at,
717            rsa_version,
718            hmac_version,
719            &tables,
720        ))?;
721        let event = RekeyEvent {
722            occurred_at,
723            rsa_version,
724            hmac_version,
725            tables,
726            signing_key_version,
727            signature,
728        };
729        self.meta
730            .append(&MetaEvent::Rekeyed(event.clone()), occurred_at)?;
731        self.meta.sync()?;
732        Ok(event)
733    }
734
735    /// Every re-key event recorded so far, oldest first.
736    pub fn rekey_events(&mut self) -> Result<Vec<RekeyEvent>> {
737        let mut out = Vec::new();
738        for ev in self.meta.scan()? {
739            if let MetaEvent::Rekeyed(r) = ev? {
740                out.push(r);
741            }
742        }
743        Ok(out)
744    }
745
746    /// Verify every recorded re-key event's signature, and that each registry's
747    /// current tree is *consistent with* the root the latest re-key event signed
748    /// for it — i.e. that signed root (over `records` leaves) is a genuine prefix
749    /// of the registry's present tree (later upserts only append; a rewrite
750    /// outside an audited re-key would break the consistency proof). Older
751    /// events' roots were legitimately retired by the next re-key.
752    fn verify_rekey_events(&mut self) -> Result<()> {
753        let events = self.rekey_events()?;
754        // type -> (signed tree size, signed root) from the latest event for it
755        let mut latest: HashMap<String, (u64, Hash)> = HashMap::new();
756        for ev in &events {
757            self.cfg
758                .keys
759                .verify_signature(
760                    ev.signing_key_version,
761                    &rekey_signing_bytes(
762                        ev.occurred_at,
763                        ev.rsa_version,
764                        ev.hmac_version,
765                        &ev.tables,
766                    ),
767                    &ev.signature,
768                )
769                .map_err(|e| Error::Crypto(format!("re-key event signature invalid: {e}")))?;
770            for t in &ev.tables {
771                let root: Hash = crypto::hex_decode(&t.new_root)
772                    .and_then(|v| v.try_into().ok())
773                    .ok_or_else(|| Error::Corrupt {
774                        segment: format!("meta (re-key event for '{}')", t.type_name),
775                        offset: 0,
776                        reason: "malformed merkle root in re-key event".into(),
777                    })?;
778                latest.insert(t.type_name.clone(), (t.records, root));
779            }
780        }
781        for (name, (signed_size, signed_root)) in latest {
782            let Some(reg) = self.registries.get_mut(&name) else {
783                continue; // type defined again later under a fresh table
784            };
785            let current_size = reg.spine_size();
786            let current_root = reg.root();
787            let consistent = if signed_size == current_size {
788                signed_root == current_root
789            } else if signed_size < current_size {
790                let proof = reg.prove_consistency(signed_size)?;
791                crate::merkle::verify_consistency(
792                    signed_size as usize,
793                    current_size as usize,
794                    &signed_root,
795                    &current_root,
796                    &proof.path,
797                )
798            } else {
799                false // current tree smaller than the signed one — records vanished
800            };
801            if !consistent {
802                return Err(Error::Corrupt {
803                    segment: format!("registry/{name}"),
804                    offset: 0,
805                    reason: "registry tree is not consistent with the root signed by the latest \
806                             re-key event — the registry was rewritten outside an audited re-key"
807                        .into(),
808                });
809            }
810        }
811        Ok(())
812    }
813
814    // ---- integrity checkpoints --------------------------------------------
815
816    /// Sign and persist an integrity checkpoint of the log chain now, on top
817    /// of the automatic ones (segment seals and retention runs). Returns
818    /// `Ok(None)` when the [`KeyRing`] holds no RSA private key: write-only
819    /// deployments cannot sign, so checkpointing is silently disabled rather
820    /// than an error — appends must keep working there.
821    pub fn checkpoint(&mut self) -> Result<Option<Checkpoint>> {
822        self.write_checkpoint()
823    }
824
825    /// Every checkpoint recorded so far, oldest first.
826    pub fn checkpoints(&self) -> Result<Vec<Checkpoint>> {
827        self.checkpoints.read_all()
828    }
829
830    // ---- merkle proofs ----------------------------------------------------
831
832    /// Current Merkle root over the whole log history (every record ever
833    /// appended, retained or purged). This is the value a checkpoint signs and
834    /// an anchor exports — a third party verifies proofs against it.
835    pub fn merkle_root(&self) -> Hash {
836        self.logs.root()
837    }
838
839    /// Total number of log records ever appended — the current Merkle tree size.
840    /// Inclusion/consistency proofs are issued against this size.
841    pub fn tree_size(&self) -> u64 {
842        self.logs.spine_size()
843    }
844
845    /// Prove that the log with `log_id` is committed to the current Merkle root:
846    /// an O(log n) audit path a third party verifies with
847    /// [`crate::merkle::verify_inclusion`] against [`merkle_root`](Self::merkle_root),
848    /// without the rest of the log and without trusting the operator. Fails if
849    /// the record has been purged by retention (its payload is gone) or is not
850    /// found.
851    pub fn prove_inclusion(&mut self, log_id: Uid) -> Result<InclusionProof> {
852        let base = self.logs.purged_count();
853        let slices = self.logs.slices()?;
854        let mut offset = 0u64;
855        for slice in slices {
856            let mut reader = crate::storage::SegmentReader::open_bounded(&slice.path, slice.bound)?;
857            while let Some((_, payload)) = reader.next_record()? {
858                let log: AuditLog = bincode::deserialize(&payload)?;
859                if log.log_id == log_id {
860                    return self.logs.prove_inclusion(base + offset);
861                }
862                offset += 1;
863            }
864        }
865        Err(Error::NotFound(format!(
866            "log {log_id} not found among retained records — cannot prove inclusion"
867        )))
868    }
869
870    /// Prove that the log tree of size `first_size` is a prefix of the current
871    /// tree — i.e. the history between the two sizes is append-only, nothing was
872    /// edited or removed. Verified with [`crate::merkle::verify_consistency`].
873    /// `first_size` is typically a [`Checkpoint::tree_size`] an auditor holds.
874    pub fn prove_consistency(&mut self, first_size: u64) -> Result<ConsistencyProof> {
875        self.logs.prove_consistency(first_size)
876    }
877
878    fn write_checkpoint(&mut self) -> Result<Option<Checkpoint>> {
879        if !self.cfg.keys.can_sign() {
880            return Ok(None);
881        }
882        // a checkpoint must never claim a head that is less durable than the
883        // checkpoint file itself — fsync the logs before signing (cheap: this
884        // runs per segment seal / retention run, not per append)
885        self.logs.sync()?;
886        let cp = Checkpoint::sign(
887            &self.cfg.keys,
888            crate::time::now_micros(),
889            self.logs.active_seq(),
890            self.logs.record_count(),
891            self.logs.spine_size(),
892            self.logs.root(),
893        )?;
894        self.checkpoints.append(&cp)?;
895        if let Some(hook) = &self.cfg.anchor {
896            // a broken anchor must not take down the write path
897            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| hook(&cp)));
898        }
899        Ok(Some(cp))
900    }
901
902    fn verify_checkpoints(&mut self) -> Result<()> {
903        let cps = self.checkpoints.read_all()?;
904        let Some(latest) = cps.last() else {
905            return Ok(()); // never checkpointed (e.g. write-only deployment)
906        };
907        for cp in &cps {
908            cp.verify(&self.cfg.keys)?;
909        }
910        // The latest checkpoint's root must be a genuine prefix of the current
911        // Merkle tree: same size ⇒ identical root; smaller size ⇒ a consistency
912        // proof must verify. The spine is never purged, so the current tree is
913        // always an extension of any honest past checkpoint — a rewrite or a
914        // truncation of the newest records breaks this. Older checkpoints are
915        // implied by the latest (their roots are prefixes of it).
916        let current_size = self.logs.spine_size();
917        let current_root = self.logs.root();
918        let consistent = if latest.tree_size == current_size {
919            latest.merkle_root == current_root
920        } else if latest.tree_size < current_size {
921            let proof = self.logs.prove_consistency(latest.tree_size)?;
922            crate::merkle::verify_consistency(
923                latest.tree_size as usize,
924                current_size as usize,
925                &latest.merkle_root,
926                &current_root,
927                &proof.path,
928            )
929        } else {
930            false // current tree smaller than the checkpoint — records vanished
931        };
932        if !consistent {
933            return Err(Error::Corrupt {
934                segment: self.checkpoints.path().display().to_string(),
935                offset: 0,
936                reason: "latest checkpoint's merkle root is not consistent with the current \
937                         tree — the log was rewritten or truncated after the checkpoint was signed"
938                    .into(),
939            });
940        }
941        Ok(())
942    }
943
944    // ---- retention -----------------------------------------------------------
945
946    /// Enforce the configured retention limits now (age window, byte budget,
947    /// or both — they combine as OR; see [`RetentionPolicy`]). Returns
948    /// segments dropped.
949    pub fn apply_retention(&mut self) -> Result<usize> {
950        let now = crate::time::now_micros();
951        let mut dropped_main = 0;
952        if let Some(cutoff) = self.cfg.retention.cutoff_micros(now) {
953            dropped_main += self.logs.purge_older_than(cutoff)?;
954            dropped_main += self.relations.purge_older_than(cutoff)?;
955        }
956        if let Some(budget) = self.cfg.retention.max_bytes {
957            dropped_main += self.purge_to_byte_budget(budget)?;
958        }
959        if dropped_main > 0 {
960            // re-anchor after the unlink: the previous latest checkpoint may
961            // point into a purged segment, and verification must not depend
962            // on records that retention legitimately removed
963            self.write_checkpoint()?;
964        }
965        // the access log has its own, independent window (often shorter than
966        // the main one). It is not covered by checkpoints, so no re-anchor.
967        let mut dropped = dropped_main;
968        if let Some(access) = self.access.as_mut() {
969            if let Some(cutoff) = self.cfg.access_retention.cutoff_micros(now) {
970                dropped += access.purge_older_than(cutoff)?;
971            }
972        }
973        Ok(dropped)
974    }
975
976    /// While logs + relations exceed `budget` bytes, drop the globally oldest
977    /// sealed segment (by its max record timestamp, across both tables) —
978    /// the same "whole old segments first" shape as the age window, so the
979    /// hash chain and checkpoint verification stay intact. Stops when only
980    /// active segments remain: the budget is a target, not a hard ceiling.
981    fn purge_to_byte_budget(&mut self, budget: u64) -> Result<usize> {
982        let mut total = self.logs.total_bytes()? + self.relations.total_bytes()?;
983        let mut dropped = 0usize;
984        while total > budget {
985            let from_logs = match (
986                self.logs.oldest_sealed_max_ts(),
987                self.relations.oldest_sealed_max_ts(),
988            ) {
989                (Some(l), Some(r)) => l <= r,
990                (Some(_), None) => true,
991                (None, Some(_)) => false,
992                (None, None) => break, // only active segments left
993            };
994            let freed = if from_logs {
995                self.logs.purge_oldest_sealed()?
996            } else {
997                self.relations.purge_oldest_sealed()?
998            };
999            match freed {
1000                Some(bytes) => {
1001                    total = total.saturating_sub(bytes);
1002                    dropped += 1;
1003                }
1004                None => break,
1005            }
1006        }
1007        Ok(dropped)
1008    }
1009
1010    /// Bytes currently on disk in the retention-governed tables
1011    /// (logs + relations) — the number [`RetentionPolicy::max_bytes`] is
1012    /// compared against. Registries and meta are not included.
1013    pub fn retained_bytes(&mut self) -> Result<u64> {
1014        Ok(self.logs.total_bytes()? + self.relations.total_bytes()?)
1015    }
1016
1017    // ---- registry browsing -----------------------------------------------------
1018
1019    /// Schemas of every defined entity/actor type, sorted by type name.
1020    pub fn type_schemas(&self) -> Vec<TypeSchema> {
1021        let mut out: Vec<TypeSchema> = self
1022            .registries
1023            .values()
1024            .map(|r| r.schema().clone())
1025            .collect();
1026        out.sort_by(|a, b| a.type_name.cmp(&b.type_name));
1027        out
1028    }
1029
1030    /// Latest version of every entity of a type, sorted by entity_id.
1031    pub fn list_entities(
1032        &self,
1033        type_name: &str,
1034        include_deleted: bool,
1035    ) -> Result<Vec<TargetSnapshot>> {
1036        let reg = self
1037            .registries
1038            .get(type_name)
1039            .ok_or_else(|| Error::Schema(format!("type '{type_name}' has no registry table")))?;
1040        let mut ids: Vec<&String> = reg.idx.entity_ids().collect();
1041        ids.sort();
1042        Ok(ids
1043            .into_iter()
1044            .filter_map(|id| reg.latest(id))
1045            .filter(|rec| include_deleted || !rec.deleted)
1046            .map(snapshot_of)
1047            .collect())
1048    }
1049
1050    /// Full version history of one entity, oldest first (a delete shows up as
1051    /// a final version with `deleted: true`).
1052    pub fn entity_history(&self, type_name: &str, entity_id: &str) -> Result<Vec<TargetSnapshot>> {
1053        let reg = self
1054            .registries
1055            .get(type_name)
1056            .ok_or_else(|| Error::Schema(format!("type '{type_name}' has no registry table")))?;
1057        let uids = reg.all_version_uids(entity_id);
1058        if uids.is_empty() {
1059            return Err(Error::NotFound(format!(
1060                "entity '{entity_id}' of type '{type_name}'"
1061            )));
1062        }
1063        Ok(uids
1064            .iter()
1065            .map(|uid| match reg.version(uid) {
1066                Some(rec) => snapshot_of(rec),
1067                None => missing_snapshot(type_name, uid),
1068            })
1069            .collect())
1070    }
1071
1072    // ---- queries ---------------------------------------------------------------
1073
1074    /// A point-in-time, read-only view of the store. Building it is cheap
1075    /// (registry indexes are cloned, log/relation tables contribute only
1076    /// path+length bounds); the actual scan then runs on the snapshot without
1077    /// touching the store, so a slow full-scan query never blocks appends.
1078    pub fn snapshot(&mut self) -> Result<ReadSnapshot> {
1079        Ok(ReadSnapshot {
1080            keys: self.cfg.keys.clone(),
1081            registries: self
1082                .registries
1083                .iter()
1084                .map(|(k, v)| (k.clone(), v.idx.clone()))
1085                .collect(),
1086            logs: self.logs.slices()?,
1087            relations: self.relations.slices()?,
1088        })
1089    }
1090
1091    /// Run a query. Targets and actor are resolved to the registry versions
1092    /// referenced at write time, so renamed entities show their historical
1093    /// values. Convenience for [`snapshot`](Self::snapshot)`()?.query(q)`.
1094    ///
1095    /// With [`StoreConfig::access_log`] enabled, the query is meta-audited
1096    /// under actor `"local"` (this direct embedded API has no caller
1097    /// identity; use [`query_as`](Self::query_as) to attribute it). The
1098    /// recording is fail-closed: if the access record cannot be persisted,
1099    /// the query errors — the synchronous embedded API favors the regulatory
1100    /// guarantee, while the async pipeline above favors availability.
1101    pub fn query(&mut self, q: &LogQuery) -> Result<Vec<LogView>> {
1102        self.query_as("local", q)
1103    }
1104
1105    /// [`query`](Self::query) attributed to a named actor in the access log.
1106    pub fn query_as(&mut self, actor: &str, q: &LogQuery) -> Result<Vec<LogView>> {
1107        let hits = self.snapshot()?.query(q)?;
1108        if self.access.is_some() {
1109            self.record_access(AccessRecord::new(
1110                actor,
1111                "query_logs",
1112                crate::access::summarize_log_query(q),
1113                Some(hits.len() as u64),
1114            ))?;
1115        }
1116        Ok(hits)
1117    }
1118
1119    // ---- meta-audit (access log) -------------------------------------------
1120
1121    /// Whether the meta-audit access log is enabled.
1122    pub fn access_enabled(&self) -> bool {
1123        self.access.is_some()
1124    }
1125
1126    /// Append one meta-audit record. A no-op when [`StoreConfig::access_log`]
1127    /// is off, so callers can record unconditionally. This is a plain append —
1128    /// it never reads, so it can never produce another access record
1129    /// (self-reference loops are structurally impossible).
1130    pub fn record_access(&mut self, rec: AccessRecord) -> Result<()> {
1131        let Some(access) = self.access.as_mut() else {
1132            return Ok(());
1133        };
1134        let ts = rec.timestamp;
1135        access.append(&rec, ts)?;
1136        access.flush()?;
1137        Ok(())
1138    }
1139
1140    /// Read meta-audit records (oldest first) matching the filter. Errors
1141    /// when the access log is not enabled. Note: this is a *read* of the
1142    /// access log — callers exposing it (e.g. the server) record it as an
1143    /// access in turn; the recording itself is an append and does not recurse.
1144    pub fn access_records(&mut self, q: &AccessQuery) -> Result<Vec<AccessRecord>> {
1145        let Some(access) = self.access.as_mut() else {
1146            return Err(Error::Schema(
1147                "the access log is not enabled — set StoreConfig::access_log".into(),
1148            ));
1149        };
1150        let mut out = Vec::new();
1151        for rec in access.scan()? {
1152            let rec = rec?;
1153            if !q.matches(&rec) {
1154                continue;
1155            }
1156            out.push(rec);
1157            if q.limit.is_some_and(|n| out.len() >= n) {
1158                break;
1159            }
1160        }
1161        Ok(out)
1162    }
1163
1164    /// Run a query and get one page plus a continuation cursor. Convenience
1165    /// for [`snapshot`](Self::snapshot)`()?.query_page(q)`.
1166    pub fn query_page(&mut self, q: &LogQuery) -> Result<QueryPage> {
1167        self.snapshot()?.query_page(q)
1168    }
1169
1170    /// Count a query's matches without rendering them. Convenience for
1171    /// [`snapshot`](Self::snapshot)`()?.count(q)`.
1172    pub fn count(&mut self, q: &LogQuery) -> Result<u64> {
1173        self.snapshot()?.count(q)
1174    }
1175
1176    /// Decrypt an RSA-protected stored value (requires the private key).
1177    pub fn decrypt(&self, v: &crate::model::StoredValue) -> Result<Vec<u8>> {
1178        self.cfg.keys.decrypt(v)
1179    }
1180}
1181
1182/// Re-key one registry record: re-wrap RSA values to the active RSA version
1183/// and re-digest the index tokens of RSA fields under the active HMAC key
1184/// (their plaintext is recoverable, unlike one-way hashed fields' — those
1185/// keep their recorded version and stay matchable via multi-version probes).
1186fn rekey_record(
1187    mut rec: RegistryRecord,
1188    schema: &TypeSchema,
1189    keys: &KeyRing,
1190) -> Result<RegistryRecord> {
1191    let names: Vec<String> = rec
1192        .fields
1193        .iter()
1194        .filter(|(_, v)| matches!(v, StoredValue::Rsa { .. }))
1195        .map(|(k, _)| k.clone())
1196        .collect();
1197    for name in names {
1198        let rewrapped = keys.rewrap(&rec.fields[&name])?;
1199        if let Some(def) = schema.field(&name) {
1200            if def.search != FieldIndex::None {
1201                let plaintext = keys.decrypt(&rewrapped)?;
1202                let text = String::from_utf8_lossy(&plaintext).into_owned();
1203                let mut key_version = KEYLESS;
1204                let mut digests = Vec::new();
1205                for t in crate::tokens::value_tokens(&text, def.search) {
1206                    let (v, d) = keys.index_token_digest(&def.name, def.protection, &t)?;
1207                    key_version = v;
1208                    digests.push(d);
1209                }
1210                rec.tokens.insert(
1211                    name.clone(),
1212                    FieldTokens {
1213                        key_version,
1214                        digests,
1215                    },
1216                );
1217            }
1218        }
1219        rec.fields.insert(name, rewrapped);
1220    }
1221    Ok(rec)
1222}
1223
1224fn snapshot_of(rec: &crate::registry::RegistryRecord) -> TargetSnapshot {
1225    TargetSnapshot {
1226        entity_registry_uid: rec.uid,
1227        entity_type: rec.entity_type.clone(),
1228        entity_id: rec.entity_id.clone(),
1229        version: rec.version,
1230        fields: rec.fields.clone(),
1231        deleted: rec.deleted,
1232        missing: false,
1233    }
1234}
1235
1236fn missing_snapshot(entity_type: &str, uid: &Uid) -> TargetSnapshot {
1237    TargetSnapshot {
1238        entity_registry_uid: *uid,
1239        entity_type: entity_type.to_string(),
1240        entity_id: String::new(),
1241        version: 0,
1242        fields: BTreeMap::new(),
1243        deleted: false,
1244        missing: true,
1245    }
1246}
1247
1248/// A point-in-time, read-only view of an [`AuditStore`] (see
1249/// [`AuditStore::snapshot`]). `Send`, so it can be handed to another thread:
1250/// the async pipeline runs query scans on the *caller's* thread against a
1251/// snapshot, which keeps the writer thread free to persist events — a slow
1252/// full-scan query can no longer push `emit` into `QueueFull` territory.
1253/// Rows appended after the snapshot was taken are not visible.
1254pub struct ReadSnapshot {
1255    keys: KeyRing,
1256    registries: HashMap<String, RegistryIndex>,
1257    logs: Vec<SegmentSlice>,
1258    relations: Vec<SegmentSlice>,
1259}
1260
1261/// The per-query filter state shared by [`ReadSnapshot::query_page`] and
1262/// [`ReadSnapshot::count`]: registry filters resolved to uid sets once, so
1263/// the log scan itself is pure set/field checks per row.
1264struct ResolvedFilters {
1265    allowed_by_target: Option<HashSet<Uid>>,
1266    allowed_actor_uids: Option<HashSet<Uid>>,
1267}
1268
1269impl ResolvedFilters {
1270    fn matches(&self, q: &LogQuery, log: &AuditLog) -> bool {
1271        // from/to are not re-checked here: the positioned scan already
1272        // filtered on the frame-header timestamp before deserializing
1273        if let Some(m) = &q.method {
1274            if !log.method.eq_ignore_ascii_case(m) {
1275                return false;
1276            }
1277        }
1278        if let Some(p) = &q.url_prefix {
1279            if !log.url.starts_with(p.as_str()) {
1280                return false;
1281            }
1282        }
1283        if let Some(allowed) = &self.allowed_by_target {
1284            if !allowed.contains(&log.log_id) {
1285                return false;
1286            }
1287        }
1288        if let Some(uids) = &self.allowed_actor_uids {
1289            if !uids.contains(&log.actor) {
1290                return false;
1291            }
1292        }
1293        q.custom.iter().all(|(k, v)| log.custom.get(k) == Some(v))
1294    }
1295}
1296
1297impl ReadSnapshot {
1298    /// Run a query against this snapshot and return the matches of the first
1299    /// page (see [`query_page`](Self::query_page) for the cursor). `&mut
1300    /// self` because Contains searches lazily decrypt-and-cache RSA values.
1301    pub fn query(&mut self, q: &LogQuery) -> Result<Vec<LogView>> {
1302        Ok(self.query_page(q)?.logs)
1303    }
1304
1305    /// Run a query and return one page plus a continuation cursor.
1306    ///
1307    /// Scan cost is bounded three ways:
1308    /// - log segments entirely outside `from_micros..=to_micros` are never
1309    ///   opened (per-segment time bounds live in the snapshot),
1310    /// - the scan walks in `q.order` and stops at `limit` matches, so a
1311    ///   newest-first page over a long history reads only the newest data,
1312    /// - relations are resolved only for the page's hits (never a whole-table
1313    ///   relation map in memory).
1314    pub fn query_page(&mut self, q: &LogQuery) -> Result<QueryPage> {
1315        let filters = self.resolve_filters(q)?;
1316        let mut scan = self.log_scan(q)?;
1317        let mut hits: Vec<(Position, AuditLog)> = Vec::new();
1318        let mut more = false;
1319        while let Some((pos, log)) = scan.next_row()? {
1320            if !filters.matches(q, &log) {
1321                continue;
1322            }
1323            if q.limit.is_some_and(|limit| hits.len() >= limit) {
1324                // a (limit+1)-th match exists, so the cursor is worth issuing
1325                more = true;
1326                break;
1327            }
1328            hits.push((pos, log));
1329        }
1330        let next_cursor = match (more, hits.last()) {
1331            (true, Some((pos, _))) => Some(crate::query::encode_cursor(q.order, *pos)),
1332            _ => None,
1333        };
1334
1335        // resolve relations for the page's hits only; relation rows carry
1336        // their log's timestamp, so the same time window prunes here too
1337        let wanted: HashSet<Uid> = hits.iter().map(|(_, log)| log.log_id).collect();
1338        let mut rels_by_log: HashMap<Uid, Vec<TargetRelation>> = HashMap::new();
1339        if !wanted.is_empty() {
1340            let mut rels: PositionedScan<TargetRelation> = PositionedScan::new(
1341                self.relations.clone(),
1342                false,
1343                q.from_micros,
1344                q.to_micros,
1345                None,
1346            );
1347            while let Some((_, rel)) = rels.next_row()? {
1348                if wanted.contains(&rel.log_id) {
1349                    rels_by_log.entry(rel.log_id).or_default().push(rel);
1350                }
1351            }
1352        }
1353        Ok(QueryPage {
1354            logs: hits
1355                .into_iter()
1356                .map(|(_, log)| self.render(log, &rels_by_log))
1357                .collect(),
1358            next_cursor,
1359            segments_scanned: scan.segments_opened(),
1360        })
1361    }
1362
1363    /// Count the query's matches without rendering them: no registry
1364    /// resolution, no relation lookup per hit, no decryption — just the
1365    /// pruned log scan and per-row filter checks. `limit` and `cursor` are
1366    /// ignored: the count is the total for the filters.
1367    pub fn count(&mut self, q: &LogQuery) -> Result<u64> {
1368        let filters = self.resolve_filters(q)?;
1369        let mut q = q.clone();
1370        q.cursor = None;
1371        q.order = Order::Asc; // cheapest direction; order is irrelevant to a count
1372        let mut scan = self.log_scan(&q)?;
1373        let mut n = 0u64;
1374        while let Some((_, log)) = scan.next_row()? {
1375            if filters.matches(&q, &log) {
1376                n += 1;
1377            }
1378        }
1379        Ok(n)
1380    }
1381
1382    /// Resolve registry-side filters (targets, actor) to uid sets. Multiple
1383    /// target filters intersect (AND).
1384    fn resolve_filters(&mut self, q: &LogQuery) -> Result<ResolvedFilters> {
1385        let mut allowed_by_target: Option<HashSet<Uid>> = None;
1386        for f in &q.targets {
1387            let ids = self.log_ids_for_filter(f, q.from_micros, q.to_micros)?;
1388            allowed_by_target = Some(match allowed_by_target {
1389                Some(prev) => prev.intersection(&ids).copied().collect(),
1390                None => ids,
1391            });
1392        }
1393        let allowed_actor_uids: Option<HashSet<Uid>> = match &q.actor {
1394            Some(f) => Some(self.version_uids_for_filter(f)?),
1395            None => None,
1396        };
1397        Ok(ResolvedFilters {
1398            allowed_by_target,
1399            allowed_actor_uids,
1400        })
1401    }
1402
1403    /// The time/cursor-bounded log scan for a query.
1404    fn log_scan(&self, q: &LogQuery) -> Result<PositionedScan<AuditLog>> {
1405        let after = match &q.cursor {
1406            Some(c) => Some(crate::query::decode_cursor(c, q.order)?),
1407            None => None,
1408        };
1409        Ok(PositionedScan::new(
1410            self.logs.clone(),
1411            q.order == Order::Desc,
1412            q.from_micros,
1413            q.to_micros,
1414            after,
1415        ))
1416    }
1417
1418    /// All log_ids whose relations point at an entity matching the filter.
1419    /// Matching is by *entity*, not version: searching the current name also
1420    /// finds logs recorded under an older name, and vice versa. The relation
1421    /// scan is time-pruned: a relation row carries its log's timestamp, so
1422    /// logs outside the window cannot enter the set anyway.
1423    fn log_ids_for_filter(
1424        &mut self,
1425        f: &TargetFilter,
1426        from: Option<u64>,
1427        to: Option<u64>,
1428    ) -> Result<HashSet<Uid>> {
1429        let version_uids = self.version_uids_for_filter(f)?;
1430        let mut out = HashSet::new();
1431        let mut rels: PositionedScan<TargetRelation> =
1432            PositionedScan::new(self.relations.clone(), false, from, to, None);
1433        while let Some((_, rel)) = rels.next_row()? {
1434            if version_uids.contains(&rel.entity_registry_uid) {
1435                out.insert(rel.log_id);
1436            }
1437        }
1438        Ok(out)
1439    }
1440
1441    /// Every version uid of every entity matching the filter.
1442    fn version_uids_for_filter(&mut self, f: &TargetFilter) -> Result<HashSet<Uid>> {
1443        let reg = self.registries.get_mut(&f.entity_type).ok_or_else(|| {
1444            Error::Schema(format!("type '{}' has no registry table", f.entity_type))
1445        })?;
1446        let entity_ids = reg.search(&f.field, &f.value, f.include_past, f.mode, &self.keys)?;
1447        let mut uids = HashSet::new();
1448        for id in entity_ids {
1449            uids.extend(reg.all_version_uids(&id).iter().copied());
1450        }
1451        Ok(uids)
1452    }
1453
1454    fn render(&self, log: AuditLog, rels_by_log: &HashMap<Uid, Vec<TargetRelation>>) -> LogView {
1455        let actor = self.snapshot(&log.actor_type, &log.actor);
1456        let mut targets = Vec::new();
1457        if let Some(rels) = rels_by_log.get(&log.log_id) {
1458            for rel in rels {
1459                targets.push(self.snapshot(&rel.entity_type, &rel.entity_registry_uid));
1460            }
1461        }
1462        LogView {
1463            log_id: log.log_id,
1464            timestamp_micros: log.timestamp,
1465            timestamp: crate::time::format_rfc3339(log.timestamp),
1466            actor,
1467            method: log.method,
1468            url: log.url,
1469            content: log.content,
1470            targets,
1471            custom: log.custom,
1472        }
1473    }
1474
1475    /// Resolve one registry version. An unresolvable reference renders as a
1476    /// `missing` placeholder instead of an error: one broken/lost registry
1477    /// record must not make every query touching that log fail.
1478    fn snapshot(&self, entity_type: &str, uid: &Uid) -> TargetSnapshot {
1479        match self
1480            .registries
1481            .get(entity_type)
1482            .and_then(|r| r.version(uid))
1483        {
1484            Some(rec) => snapshot_of(rec),
1485            None => missing_snapshot(entity_type, uid),
1486        }
1487    }
1488}