quipu-core 0.2.0

Embedded, OS-independent audit log storage engine: typed entity registries, field encryption, retention, and time-travel queries.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
use crate::access::{AccessQuery, AccessRecord, RESERVED_TYPE_PREFIX};
use crate::checkpoint::{Checkpoint, CheckpointLog};
use crate::crypto::{self, KeyRing, KeyVersion, KEYLESS};
use crate::error::{Error, Result};
use crate::id::Uid;
use crate::merkle::Hash;
use crate::merkle_log::{ConsistencyProof, InclusionProof};
use crate::model::{AuditLog, Content, StoredValue, TargetRelation, Value};
use crate::query::{LogQuery, LogView, Order, QueryPage, TargetFilter, TargetSnapshot};
use crate::registry::{EntityInput, FieldTokens, RegistryIndex, RegistryRecord, TypeRegistry};
use crate::retention::RetentionPolicy;
use crate::schema::{CustomColumnDef, FieldIndex, TypeSchema};
use crate::storage::{rewrite_table, Position, PositionedScan, SegmentSlice, Table};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;

/// External-anchor callback — receives every checkpoint right after it is
/// persisted. `Arc` (not `Box`) because [`StoreConfig`] is `Clone`.
pub type AnchorHook = Arc<dyn Fn(&Checkpoint) + Send + Sync>;

/// Durability/throughput trade-off for log appends.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncPolicy {
    /// fsync after every append. Safest, slowest.
    Always,
    /// fsync after every N appends; otherwise only flush to the OS cache.
    EveryN(u32),
    /// Never fsync explicitly; rely on the OS to write back. Fastest.
    OsManaged,
}

#[derive(Clone)]
pub struct StoreConfig {
    pub root: PathBuf,
    pub max_segment_bytes: u64,
    pub sync_policy: SyncPolicy,
    pub retention: RetentionPolicy,
    pub keys: KeyRing,
    /// Opt-in for LIKE ([`crate::MatchMode::Contains`]) search on *protected*
    /// fields: plaintexts of protected values written by this process are
    /// held in memory (never persisted), and RSA values are decrypted and
    /// cached per immutable version on first Contains search. Off by
    /// default — keeping plaintexts of protected fields in memory is a
    /// deliberate trade-off the operator must choose; when off, Contains on
    /// protected fields is rejected (plain fields are always LIKE-searchable).
    pub plaintext_cache: bool,
    /// Called with each integrity checkpoint after it is persisted, so its
    /// chain head can be exported to an external trust domain (another host,
    /// a ticket, a transparency log) that a disk-level insider cannot rewrite.
    /// Errors and panics inside the hook are swallowed: availability of the
    /// write path outranks anchoring — export delivery is the hook's job.
    pub anchor: Option<AnchorHook>,
    /// Opt-in meta-audit: record reads and admin actions against the audit
    /// store itself in a dedicated access-log table (`root/access/`) — see
    /// [`crate::access`]. Off by default.
    pub access_log: bool,
    /// Retention for the access-log table, independent of the main `retention`
    /// window. Access records are often kept *shorter* than the audit data
    /// they describe; the split is possible because the access log lives in
    /// its own table (retention drops whole segments per table).
    pub access_retention: RetentionPolicy,
}

impl StoreConfig {
    pub fn new(root: impl Into<PathBuf>) -> Self {
        Self {
            root: root.into(),
            max_segment_bytes: 64 * 1024 * 1024,
            sync_policy: SyncPolicy::EveryN(64),
            retention: RetentionPolicy::keep_forever(),
            keys: KeyRing::new(),
            plaintext_cache: false,
            anchor: None,
            access_log: false,
            access_retention: RetentionPolicy::keep_forever(),
        }
    }

    /// Enable the meta-audit access log — see
    /// [`access_log`](Self::access_log).
    pub fn access_log(mut self, enabled: bool) -> Self {
        self.access_log = enabled;
        self
    }

    /// Retention window for the access-log table — see
    /// [`access_retention`](Self::access_retention).
    pub fn access_retention(mut self, r: RetentionPolicy) -> Self {
        self.access_retention = r;
        self
    }

    /// Register the external-anchor hook — see [`anchor`](Self::anchor).
    pub fn anchor(mut self, hook: impl Fn(&Checkpoint) + Send + Sync + 'static) -> Self {
        self.anchor = Some(Arc::new(hook));
        self
    }

    /// Enable the in-memory plaintext cache — see
    /// [`plaintext_cache`](Self::plaintext_cache).
    pub fn plaintext_cache(mut self, enabled: bool) -> Self {
        self.plaintext_cache = enabled;
        self
    }

    pub fn retention(mut self, r: RetentionPolicy) -> Self {
        self.retention = r;
        self
    }

    pub fn sync_policy(mut self, p: SyncPolicy) -> Self {
        self.sync_policy = p;
        self
    }

    pub fn keys(mut self, k: KeyRing) -> Self {
        self.keys = k;
        self
    }

    pub fn max_segment_bytes(mut self, n: u64) -> Self {
        self.max_segment_bytes = n;
        self
    }
}

/// Persisted meta events: schema and custom-column definitions are themselves
/// stored in an append-only table and replayed on open.
///
/// bincode encodes the variant *index*, so new variants may only be appended
/// at the end.
#[derive(Debug, Clone, Serialize, Deserialize)]
enum MetaEvent {
    TypeDefined(TypeSchema),
    CustomColumnDefined(CustomColumnDef),
    Rekeyed(RekeyEvent),
}

/// Domain separation for re-key event signatures — never confusable with a
/// checkpoint signature made by the same key.
const REKEY_SIGNING_DOMAIN: &[u8] = b"quipu-rekey-v2\0";

/// The signed, persisted record of one [`AuditStore::rekey`] pass.
///
/// Re-keying rewrites registry tables, which necessarily produces a fresh
/// Merkle spine — exactly what tampering looks like. This event is what makes
/// the rewrite *audited* instead: it is appended to the meta table and signs the
/// old-root → new-root transition of every rewritten registry with the active
/// RSA key. [`AuditStore::verify_integrity`] then checks that each registry's
/// current tree is consistent with the root the latest re-key event signed —
/// a registry rewritten outside this path contradicts the signature.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RekeyEvent {
    /// UTC micros at re-key time.
    pub occurred_at: u64,
    /// RSA key version values were re-wrapped to (the active one).
    pub rsa_version: KeyVersion,
    /// HMAC key version recomputed index tokens were digested with
    /// ([`KEYLESS`] when the ring holds no HMAC key).
    pub hmac_version: KeyVersion,
    pub tables: Vec<RekeyedTable>,
    /// Version of the RSA key that signed this event.
    pub signing_key_version: KeyVersion,
    /// RSA PKCS#1 v1.5 / SHA-256 signature over the fields above.
    pub signature: Vec<u8>,
}

/// One registry table rewritten by a re-key pass: the signed Merkle-root
/// transition.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RekeyedTable {
    pub type_name: String,
    /// Tree size of the rewritten registry (records carried over). The new root
    /// is taken over exactly this many leaves, so verification proves the
    /// post-rewrite root is a prefix of the (possibly extended) current tree.
    pub records: u64,
    /// Hex Merkle root before the rewrite (the tree this event retires).
    pub old_root: String,
    /// Hex Merkle root after the rewrite (the tree this event vouches for).
    pub new_root: String,
}

fn rekey_signing_bytes(
    occurred_at: u64,
    rsa_version: KeyVersion,
    hmac_version: KeyVersion,
    tables: &[RekeyedTable],
) -> Vec<u8> {
    let mut out = Vec::with_capacity(REKEY_SIGNING_DOMAIN.len() + 16 + tables.len() * 160);
    out.extend_from_slice(REKEY_SIGNING_DOMAIN);
    out.extend_from_slice(&occurred_at.to_le_bytes());
    out.extend_from_slice(&rsa_version.to_le_bytes());
    out.extend_from_slice(&hmac_version.to_le_bytes());
    for t in tables {
        out.extend_from_slice(t.type_name.as_bytes());
        out.push(0);
        out.extend_from_slice(&t.records.to_le_bytes());
        out.extend_from_slice(t.old_root.as_bytes());
        out.push(0);
        out.extend_from_slice(t.new_root.as_bytes());
        out.push(0);
    }
    out
}

/// The embedded audit store. Owns every table under one root directory:
///
/// ```text
/// root/
///   meta/         type schemas + custom column registry (replayed on open)
///   logs/         AuditLog rows
///   relations/    log -> target-entity-version relations
///   registry/<t>/ one versioned registry table per entity/actor type
/// ```
pub struct AuditStore {
    cfg: StoreConfig,
    meta: Table<MetaEvent>,
    logs: Table<AuditLog>,
    relations: Table<TargetRelation>,
    /// Meta-audit table (`Some` iff [`StoreConfig::access_log`]) — see
    /// [`crate::access`]. Kept apart from `logs` on purpose: independent
    /// retention, and an access append can never recurse into a query.
    access: Option<Table<AccessRecord>>,
    registries: HashMap<String, TypeRegistry>,
    custom_columns: HashMap<String, CustomColumnDef>,
    checkpoints: CheckpointLog,
    appends_since_sync: u32,
    /// Advisory OS lock on `root/LOCK`, held for the store's lifetime. The
    /// store is single-process by design; without this, a second process
    /// opening the same root would silently corrupt in-memory indexes and
    /// interleave segment writes. Released automatically on drop/crash.
    _lock: std::fs::File,
}

impl AuditStore {
    pub fn open(cfg: StoreConfig) -> Result<Self> {
        std::fs::create_dir_all(&cfg.root)?;
        let lock = std::fs::OpenOptions::new()
            .create(true)
            .truncate(false)
            .write(true)
            .open(cfg.root.join("LOCK"))?;
        lock.try_lock().map_err(|e| match e {
            std::fs::TryLockError::WouldBlock => Error::Locked(cfg.root.display().to_string()),
            std::fs::TryLockError::Error(e) => Error::Io(e),
        })?;
        let mut meta: Table<MetaEvent> =
            Table::open(&cfg.root.join("meta"), cfg.max_segment_bytes)?;
        let logs = Table::open(&cfg.root.join("logs"), cfg.max_segment_bytes)?;
        let relations = Table::open(&cfg.root.join("relations"), cfg.max_segment_bytes)?;
        let access = if cfg.access_log {
            Some(Table::open(
                &cfg.root.join("access"),
                cfg.max_segment_bytes,
            )?)
        } else {
            None
        };

        let mut registries = HashMap::new();
        let mut custom_columns = HashMap::new();
        let events: Vec<MetaEvent> = meta.scan()?.collect::<Result<Vec<_>>>()?;
        for ev in events {
            match ev {
                MetaEvent::TypeDefined(schema) => {
                    // last definition wins (allows additive schema evolution)
                    let dir = cfg.root.join("registry").join(&schema.type_name);
                    let reg = TypeRegistry::open(
                        &dir,
                        schema.clone(),
                        cfg.max_segment_bytes,
                        cfg.plaintext_cache,
                    )?;
                    registries.insert(schema.type_name.clone(), reg);
                }
                MetaEvent::CustomColumnDefined(def) => {
                    custom_columns.insert(def.name.clone(), def);
                }
                // re-key events are audit records, not state to replay; they
                // are read back during verify_integrity()
                MetaEvent::Rekeyed(_) => {}
            }
        }
        let checkpoints = CheckpointLog::new(&cfg.root);
        Ok(Self {
            cfg,
            meta,
            logs,
            relations,
            access,
            registries,
            custom_columns,
            checkpoints,
            appends_since_sync: 0,
            _lock: lock,
        })
    }

    // ---- schema management -------------------------------------------------

    /// Create (or redefine) the registry table for an entity/actor type.
    /// Must be called before entities of that type are registered — this is
    /// the "create the registry table first" step of the write protocol.
    ///
    /// Redefinition is additive only: new fields may be added, but existing
    /// fields cannot be removed or change kind/protection. A protection
    /// change would split the search index keys (old values silently stop
    /// matching probes), breaking the "past values stay searchable" promise.
    pub fn define_type(&mut self, schema: TypeSchema) -> Result<()> {
        if schema.type_name.starts_with(RESERVED_TYPE_PREFIX) {
            return Err(Error::Schema(format!(
                "type name '{}' uses the reserved prefix '{RESERVED_TYPE_PREFIX}' — that \
                 namespace belongs to quipu's internal record kinds (e.g. the \
                 '{}' meta-audit type)",
                schema.type_name,
                crate::access::ACCESS_TYPE
            )));
        }
        if let Some(existing) = self.registries.get(&schema.type_name) {
            for old in &existing.schema().fields {
                match schema.field(&old.name) {
                    None => {
                        return Err(Error::Schema(format!(
                            "type '{}': field '{}' cannot be removed — recorded data still \
                             references it",
                            schema.type_name, old.name
                        )));
                    }
                    Some(new) if new.kind != old.kind || new.protection != old.protection => {
                        return Err(Error::Schema(format!(
                            "type '{}': field '{}' cannot change kind/protection — existing \
                             values would become unsearchable or unreadable",
                            schema.type_name, old.name
                        )));
                    }
                    Some(new) if new.search != old.search => {
                        return Err(Error::Schema(format!(
                            "type '{}': field '{}' cannot change its FieldIndex — records \
                             written under the old index carry no tokens for the new one \
                             and would silently stop matching",
                            schema.type_name, old.name
                        )));
                    }
                    _ => {}
                }
            }
        }
        let dir = self.cfg.root.join("registry").join(&schema.type_name);
        let reg = TypeRegistry::open(
            &dir,
            schema.clone(),
            self.cfg.max_segment_bytes,
            self.cfg.plaintext_cache,
        )?;
        self.meta.append(
            &MetaEvent::TypeDefined(schema.clone()),
            crate::time::now_micros(),
        )?;
        self.meta.sync()?;
        self.registries.insert(schema.type_name.clone(), reg);
        Ok(())
    }

    pub fn has_type(&self, type_name: &str) -> bool {
        self.registries.contains_key(type_name)
    }

    /// Register an extra audit-log column (text / number / json). For
    /// `required` columns, the requirement only applies to events that occur
    /// from now on — see [`CustomColumnDef::required_since`].
    pub fn define_custom_column(&mut self, mut def: CustomColumnDef) -> Result<()> {
        if def.required && def.required_since.is_none() {
            def.required_since = Some(crate::time::now_micros());
        }
        self.meta.append(
            &MetaEvent::CustomColumnDefined(def.clone()),
            crate::time::now_micros(),
        )?;
        self.meta.sync()?;
        self.custom_columns.insert(def.name.clone(), def);
        Ok(())
    }

    pub fn custom_columns(&self) -> impl Iterator<Item = &CustomColumnDef> {
        self.custom_columns.values()
    }

    // ---- registry operations -----------------------------------------------

    fn registry(&mut self, type_name: &str) -> Result<&mut TypeRegistry> {
        self.registries.get_mut(type_name).ok_or_else(|| {
            Error::Schema(format!(
                "type '{type_name}' has no registry table — call define_type() first"
            ))
        })
    }

    /// Register/update an entity and get the uid of its current version.
    pub fn register_entity(&mut self, type_name: &str, input: &EntityInput) -> Result<Uid> {
        let keys = self.cfg.keys.clone();
        self.registry(type_name)?.upsert(input, &keys)
    }

    /// Update is the same operation as register: changed fields produce a new
    /// version, old versions stay queryable (old-name search keeps working).
    pub fn update_entity(&mut self, type_name: &str, input: &EntityInput) -> Result<Uid> {
        self.register_entity(type_name, input)
    }

    pub fn delete_entity(&mut self, type_name: &str, entity_id: &str) -> Result<Uid> {
        self.registry(type_name)?.delete(entity_id)
    }

    pub fn entity_latest(&self, type_name: &str, entity_id: &str) -> Option<&RegistryRecord> {
        self.registries.get(type_name)?.latest(entity_id)
    }

    // ---- writing logs --------------------------------------------------------

    /// Append one audit log following the full write protocol:
    /// 1. the actor is upserted into its type registry,
    /// 2. every target is upserted into its type registry,
    /// 3. the log row is appended with the actor's version uid,
    /// 4. one relation row per target binds the log to the exact entity versions.
    ///
    /// Registry tables for every referenced type must already exist.
    #[allow(clippy::too_many_arguments)]
    pub fn append(
        &mut self,
        actor_type: &str,
        actor: &EntityInput,
        method: &str,
        url: &str,
        content: Content,
        targets: &[(String, EntityInput)],
        custom: BTreeMap<String, Value>,
    ) -> Result<Uid> {
        self.append_at(
            crate::time::now_micros(),
            actor_type,
            actor,
            method,
            url,
            content,
            targets,
            custom,
        )
    }

    /// [`append`](Self::append) with an explicit event time — for events that
    /// occurred earlier than they are persisted (async pipelines, DLQ
    /// redrive). Validation of required custom columns uses this time, so a
    /// column made required *after* the event happened does not reject it.
    #[allow(clippy::too_many_arguments)]
    pub fn append_at(
        &mut self,
        occurred_at: u64,
        actor_type: &str,
        actor: &EntityInput,
        method: &str,
        url: &str,
        content: Content,
        targets: &[(String, EntityInput)],
        custom: BTreeMap<String, Value>,
    ) -> Result<Uid> {
        self.validate_custom(&custom, occurred_at)?;
        let actor_uid = self.register_entity(actor_type, actor)?;
        let mut target_refs = Vec::with_capacity(targets.len());
        for (t_type, t_input) in targets {
            let uid = self.register_entity(t_type, t_input)?;
            target_refs.push((t_type.clone(), uid));
        }
        self.append_resolved_at(
            occurred_at,
            actor_type,
            actor_uid,
            method,
            url,
            content,
            &target_refs,
            custom,
        )
    }

    /// Lower-level append for callers that already hold registry version uids.
    #[allow(clippy::too_many_arguments)]
    pub fn append_resolved(
        &mut self,
        actor_type: &str,
        actor_uid: Uid,
        method: &str,
        url: &str,
        content: Content,
        targets: &[(String, Uid)],
        custom: BTreeMap<String, Value>,
    ) -> Result<Uid> {
        self.append_resolved_at(
            crate::time::now_micros(),
            actor_type,
            actor_uid,
            method,
            url,
            content,
            targets,
            custom,
        )
    }

    /// See [`append_at`](Self::append_at).
    #[allow(clippy::too_many_arguments)]
    pub fn append_resolved_at(
        &mut self,
        occurred_at: u64,
        actor_type: &str,
        actor_uid: Uid,
        method: &str,
        url: &str,
        content: Content,
        targets: &[(String, Uid)],
        custom: BTreeMap<String, Value>,
    ) -> Result<Uid> {
        self.validate_custom(&custom, occurred_at)?;
        let log = AuditLog {
            log_id: Uid::generate(),
            timestamp: occurred_at,
            actor: actor_uid,
            actor_type: actor_type.to_string(),
            method: method.to_string(),
            url: url.to_string(),
            content,
            custom,
        };
        let seq_before = self.logs.active_seq();
        self.logs.append(&log, log.timestamp)?;
        let sealed_a_segment = self.logs.active_seq() != seq_before;
        for (entity_type, uid) in targets {
            let rel = TargetRelation {
                log_id: log.log_id,
                entity_registry_uid: *uid,
                entity_type: entity_type.clone(),
            };
            self.relations.append(&rel, log.timestamp)?;
        }
        self.apply_sync_policy()?;
        if sealed_a_segment {
            // checkpoint on segment seal, not on flush/sync: a seal is when a
            // chain prefix becomes immutable, and its frequency is bounded by
            // segment size instead of putting an RSA signing operation on the
            // every-N-appends sync path
            self.write_checkpoint()?;
        }
        Ok(log.log_id)
    }

    fn validate_custom(&self, custom: &BTreeMap<String, Value>, occurred_at: u64) -> Result<()> {
        for (name, value) in custom {
            let def = self.custom_columns.get(name).ok_or_else(|| {
                Error::Schema(format!(
                    "custom column '{name}' is not registered — call define_custom_column()"
                ))
            })?;
            if value.kind() != def.kind {
                return Err(Error::Schema(format!(
                    "custom column '{name}' expects {:?}, got {:?}",
                    def.kind,
                    value.kind()
                )));
            }
        }
        for def in self.custom_columns.values() {
            let in_force = def.required_since.is_none_or(|since| occurred_at >= since);
            if def.required && in_force && !custom.contains_key(&def.name) {
                return Err(Error::Schema(format!(
                    "missing required custom column '{}'",
                    def.name
                )));
            }
        }
        Ok(())
    }

    fn apply_sync_policy(&mut self) -> Result<()> {
        match self.cfg.sync_policy {
            SyncPolicy::Always => self.sync_all()?,
            SyncPolicy::EveryN(n) => {
                self.appends_since_sync += 1;
                if self.appends_since_sync >= n {
                    self.sync_all()?;
                } else {
                    self.logs.flush()?;
                    self.relations.flush()?;
                }
            }
            SyncPolicy::OsManaged => {
                self.logs.flush()?;
                self.relations.flush()?;
            }
        }
        Ok(())
    }

    /// fsync in dependency order: a log row must never be durable while the
    /// registry version it references is not — a crash would otherwise leave
    /// logs pointing at registry records that evaporated.
    fn sync_all(&mut self) -> Result<()> {
        for reg in self.registries.values_mut() {
            reg.sync()?;
        }
        self.logs.sync()?;
        self.relations.sync()?;
        self.appends_since_sync = 0;
        Ok(())
    }

    /// Force everything to durable storage.
    pub fn sync(&mut self) -> Result<()> {
        self.sync_all()?;
        self.meta.sync()?;
        if let Some(access) = self.access.as_mut() {
            access.sync()?;
        }
        Ok(())
    }

    /// Verify the tamper-evidence hash chains of every table (logs, relations,
    /// meta, all registries). Returns the first chain break found — a record
    /// that was modified in place, or a segment that was removed/replaced.
    ///
    /// If signed checkpoints exist (see [`checkpoint`](Self::checkpoint)),
    /// they are verified too: every checkpoint signature must check out
    /// against the public key, and the latest checkpoint's chain head must
    /// still exist in the log chain. That extends detection to attacks the
    /// chain alone cannot see — deleting everything and rewriting a
    /// self-consistent chain from scratch, or truncating the newest records.
    pub fn verify_integrity(&mut self) -> Result<()> {
        self.logs.verify()?;
        self.relations.verify()?;
        self.meta.verify()?;
        if let Some(access) = self.access.as_mut() {
            access.verify()?;
        }
        for reg in self.registries.values_mut() {
            reg.verify()?;
        }
        self.verify_checkpoints()?;
        self.verify_rekey_events()
    }

    // ---- key rotation / re-key ----------------------------------------------

    /// Re-wrap every RSA-protected registry value under the *active* RSA key
    /// and re-digest the blind-index tokens of RSA fields under the *active*
    /// HMAC key — the offline tail of a key rotation, so retired private keys
    /// can actually be destroyed.
    ///
    /// Requirements: the ring must hold the private key of every RSA version
    /// still referenced by stored values (to unwrap the old data keys) *and*
    /// the active version's private key (this pass is signed). The store
    /// must be otherwise idle — run it from the maintenance CLI, not a
    /// serving process.
    ///
    /// What it cannot do: HMAC digests are one-way and the plaintext is not
    /// on disk, so HMAC-protected fields keep their recorded key version —
    /// retain old HMAC keys (read-side) for search, and treat a leaked HMAC
    /// key as having exposed the digests written under it.
    ///
    /// Each registry table is rewritten into a fresh hash chain; the
    /// old-head → new-head transition of every table is recorded and signed
    /// in a [`RekeyEvent`] on the meta table, which
    /// [`verify_integrity`](Self::verify_integrity) checks from then on —
    /// that is what keeps an audited re-key distinguishable from tampering.
    pub fn rekey(&mut self) -> Result<RekeyEvent> {
        let keys = self.cfg.keys.clone();
        let rsa_version = keys.active_rsa_version().ok_or_else(|| {
            Error::Crypto("re-key: no RSA key in the ring — nothing to re-wrap to".into())
        })?;
        if !keys.can_sign() {
            return Err(Error::Crypto(
                "re-key requires the active RSA private key (the re-key event is signed)".into(),
            ));
        }
        let hmac_version = keys.active_hmac_version().unwrap_or(KEYLESS);

        let mut names: Vec<String> = self.registries.keys().cloned().collect();
        names.sort();
        let mut tables = Vec::with_capacity(names.len());
        for name in names {
            // drop the open registry (its table holds open file handles)
            // before rewriting its directory, then reopen on the new chain
            let reg = self.registries.remove(&name).expect("name was just listed");
            let schema = reg.schema().clone();
            drop(reg);
            let dir = self.cfg.root.join("registry").join(&name);
            let stats = rewrite_table::<RegistryRecord>(&dir, self.cfg.max_segment_bytes, |rec| {
                rekey_record(rec, &schema, &keys)
            })?;
            let reopened = TypeRegistry::open(
                &dir,
                schema,
                self.cfg.max_segment_bytes,
                self.cfg.plaintext_cache,
            )?;
            self.registries.insert(name.clone(), reopened);
            tables.push(RekeyedTable {
                type_name: name,
                records: stats.records,
                old_root: crypto::hex(&stats.old_root),
                new_root: crypto::hex(&stats.new_root),
            });
        }

        let occurred_at = crate::time::now_micros();
        let (signing_key_version, signature) = keys.sign(&rekey_signing_bytes(
            occurred_at,
            rsa_version,
            hmac_version,
            &tables,
        ))?;
        let event = RekeyEvent {
            occurred_at,
            rsa_version,
            hmac_version,
            tables,
            signing_key_version,
            signature,
        };
        self.meta
            .append(&MetaEvent::Rekeyed(event.clone()), occurred_at)?;
        self.meta.sync()?;
        Ok(event)
    }

    /// Every re-key event recorded so far, oldest first.
    pub fn rekey_events(&mut self) -> Result<Vec<RekeyEvent>> {
        let mut out = Vec::new();
        for ev in self.meta.scan()? {
            if let MetaEvent::Rekeyed(r) = ev? {
                out.push(r);
            }
        }
        Ok(out)
    }

    /// Verify every recorded re-key event's signature, and that each registry's
    /// current tree is *consistent with* the root the latest re-key event signed
    /// for it — i.e. that signed root (over `records` leaves) is a genuine prefix
    /// of the registry's present tree (later upserts only append; a rewrite
    /// outside an audited re-key would break the consistency proof). Older
    /// events' roots were legitimately retired by the next re-key.
    fn verify_rekey_events(&mut self) -> Result<()> {
        let events = self.rekey_events()?;
        // type -> (signed tree size, signed root) from the latest event for it
        let mut latest: HashMap<String, (u64, Hash)> = HashMap::new();
        for ev in &events {
            self.cfg
                .keys
                .verify_signature(
                    ev.signing_key_version,
                    &rekey_signing_bytes(
                        ev.occurred_at,
                        ev.rsa_version,
                        ev.hmac_version,
                        &ev.tables,
                    ),
                    &ev.signature,
                )
                .map_err(|e| Error::Crypto(format!("re-key event signature invalid: {e}")))?;
            for t in &ev.tables {
                let root: Hash = crypto::hex_decode(&t.new_root)
                    .and_then(|v| v.try_into().ok())
                    .ok_or_else(|| Error::Corrupt {
                        segment: format!("meta (re-key event for '{}')", t.type_name),
                        offset: 0,
                        reason: "malformed merkle root in re-key event".into(),
                    })?;
                latest.insert(t.type_name.clone(), (t.records, root));
            }
        }
        for (name, (signed_size, signed_root)) in latest {
            let Some(reg) = self.registries.get_mut(&name) else {
                continue; // type defined again later under a fresh table
            };
            let current_size = reg.spine_size();
            let current_root = reg.root();
            let consistent = if signed_size == current_size {
                signed_root == current_root
            } else if signed_size < current_size {
                let proof = reg.prove_consistency(signed_size)?;
                crate::merkle::verify_consistency(
                    signed_size as usize,
                    current_size as usize,
                    &signed_root,
                    &current_root,
                    &proof.path,
                )
            } else {
                false // current tree smaller than the signed one — records vanished
            };
            if !consistent {
                return Err(Error::Corrupt {
                    segment: format!("registry/{name}"),
                    offset: 0,
                    reason: "registry tree is not consistent with the root signed by the latest \
                             re-key event — the registry was rewritten outside an audited re-key"
                        .into(),
                });
            }
        }
        Ok(())
    }

    // ---- integrity checkpoints --------------------------------------------

    /// Sign and persist an integrity checkpoint of the log chain now, on top
    /// of the automatic ones (segment seals and retention runs). Returns
    /// `Ok(None)` when the [`KeyRing`] holds no RSA private key: write-only
    /// deployments cannot sign, so checkpointing is silently disabled rather
    /// than an error — appends must keep working there.
    pub fn checkpoint(&mut self) -> Result<Option<Checkpoint>> {
        self.write_checkpoint()
    }

    /// Every checkpoint recorded so far, oldest first.
    pub fn checkpoints(&self) -> Result<Vec<Checkpoint>> {
        self.checkpoints.read_all()
    }

    // ---- merkle proofs ----------------------------------------------------

    /// Current Merkle root over the whole log history (every record ever
    /// appended, retained or purged). This is the value a checkpoint signs and
    /// an anchor exports — a third party verifies proofs against it.
    pub fn merkle_root(&self) -> Hash {
        self.logs.root()
    }

    /// Total number of log records ever appended — the current Merkle tree size.
    /// Inclusion/consistency proofs are issued against this size.
    pub fn tree_size(&self) -> u64 {
        self.logs.spine_size()
    }

    /// Prove that the log with `log_id` is committed to the current Merkle root:
    /// an O(log n) audit path a third party verifies with
    /// [`crate::merkle::verify_inclusion`] against [`merkle_root`](Self::merkle_root),
    /// without the rest of the log and without trusting the operator. Fails if
    /// the record has been purged by retention (its payload is gone) or is not
    /// found.
    pub fn prove_inclusion(&mut self, log_id: Uid) -> Result<InclusionProof> {
        let base = self.logs.purged_count();
        let slices = self.logs.slices()?;
        let mut offset = 0u64;
        for slice in slices {
            let mut reader = crate::storage::SegmentReader::open_bounded(&slice.path, slice.bound)?;
            while let Some((_, payload)) = reader.next_record()? {
                let log: AuditLog = bincode::deserialize(&payload)?;
                if log.log_id == log_id {
                    return self.logs.prove_inclusion(base + offset);
                }
                offset += 1;
            }
        }
        Err(Error::NotFound(format!(
            "log {log_id} not found among retained records — cannot prove inclusion"
        )))
    }

    /// Prove that the log tree of size `first_size` is a prefix of the current
    /// tree — i.e. the history between the two sizes is append-only, nothing was
    /// edited or removed. Verified with [`crate::merkle::verify_consistency`].
    /// `first_size` is typically a [`Checkpoint::tree_size`] an auditor holds.
    pub fn prove_consistency(&mut self, first_size: u64) -> Result<ConsistencyProof> {
        self.logs.prove_consistency(first_size)
    }

    fn write_checkpoint(&mut self) -> Result<Option<Checkpoint>> {
        if !self.cfg.keys.can_sign() {
            return Ok(None);
        }
        // a checkpoint must never claim a head that is less durable than the
        // checkpoint file itself — fsync the logs before signing (cheap: this
        // runs per segment seal / retention run, not per append)
        self.logs.sync()?;
        let cp = Checkpoint::sign(
            &self.cfg.keys,
            crate::time::now_micros(),
            self.logs.active_seq(),
            self.logs.record_count(),
            self.logs.spine_size(),
            self.logs.root(),
        )?;
        self.checkpoints.append(&cp)?;
        if let Some(hook) = &self.cfg.anchor {
            // a broken anchor must not take down the write path
            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| hook(&cp)));
        }
        Ok(Some(cp))
    }

    fn verify_checkpoints(&mut self) -> Result<()> {
        let cps = self.checkpoints.read_all()?;
        let Some(latest) = cps.last() else {
            return Ok(()); // never checkpointed (e.g. write-only deployment)
        };
        for cp in &cps {
            cp.verify(&self.cfg.keys)?;
        }
        // The latest checkpoint's root must be a genuine prefix of the current
        // Merkle tree: same size ⇒ identical root; smaller size ⇒ a consistency
        // proof must verify. The spine is never purged, so the current tree is
        // always an extension of any honest past checkpoint — a rewrite or a
        // truncation of the newest records breaks this. Older checkpoints are
        // implied by the latest (their roots are prefixes of it).
        let current_size = self.logs.spine_size();
        let current_root = self.logs.root();
        let consistent = if latest.tree_size == current_size {
            latest.merkle_root == current_root
        } else if latest.tree_size < current_size {
            let proof = self.logs.prove_consistency(latest.tree_size)?;
            crate::merkle::verify_consistency(
                latest.tree_size as usize,
                current_size as usize,
                &latest.merkle_root,
                &current_root,
                &proof.path,
            )
        } else {
            false // current tree smaller than the checkpoint — records vanished
        };
        if !consistent {
            return Err(Error::Corrupt {
                segment: self.checkpoints.path().display().to_string(),
                offset: 0,
                reason: "latest checkpoint's merkle root is not consistent with the current \
                         tree — the log was rewritten or truncated after the checkpoint was signed"
                    .into(),
            });
        }
        Ok(())
    }

    // ---- retention -----------------------------------------------------------

    /// Enforce the configured retention limits now (age window, byte budget,
    /// or both — they combine as OR; see [`RetentionPolicy`]). Returns
    /// segments dropped.
    pub fn apply_retention(&mut self) -> Result<usize> {
        let now = crate::time::now_micros();
        let mut dropped_main = 0;
        if let Some(cutoff) = self.cfg.retention.cutoff_micros(now) {
            dropped_main += self.logs.purge_older_than(cutoff)?;
            dropped_main += self.relations.purge_older_than(cutoff)?;
        }
        if let Some(budget) = self.cfg.retention.max_bytes {
            dropped_main += self.purge_to_byte_budget(budget)?;
        }
        if dropped_main > 0 {
            // re-anchor after the unlink: the previous latest checkpoint may
            // point into a purged segment, and verification must not depend
            // on records that retention legitimately removed
            self.write_checkpoint()?;
        }
        // the access log has its own, independent window (often shorter than
        // the main one). It is not covered by checkpoints, so no re-anchor.
        let mut dropped = dropped_main;
        if let Some(access) = self.access.as_mut() {
            if let Some(cutoff) = self.cfg.access_retention.cutoff_micros(now) {
                dropped += access.purge_older_than(cutoff)?;
            }
        }
        Ok(dropped)
    }

    /// While logs + relations exceed `budget` bytes, drop the globally oldest
    /// sealed segment (by its max record timestamp, across both tables) —
    /// the same "whole old segments first" shape as the age window, so the
    /// hash chain and checkpoint verification stay intact. Stops when only
    /// active segments remain: the budget is a target, not a hard ceiling.
    fn purge_to_byte_budget(&mut self, budget: u64) -> Result<usize> {
        let mut total = self.logs.total_bytes()? + self.relations.total_bytes()?;
        let mut dropped = 0usize;
        while total > budget {
            let from_logs = match (
                self.logs.oldest_sealed_max_ts(),
                self.relations.oldest_sealed_max_ts(),
            ) {
                (Some(l), Some(r)) => l <= r,
                (Some(_), None) => true,
                (None, Some(_)) => false,
                (None, None) => break, // only active segments left
            };
            let freed = if from_logs {
                self.logs.purge_oldest_sealed()?
            } else {
                self.relations.purge_oldest_sealed()?
            };
            match freed {
                Some(bytes) => {
                    total = total.saturating_sub(bytes);
                    dropped += 1;
                }
                None => break,
            }
        }
        Ok(dropped)
    }

    /// Bytes currently on disk in the retention-governed tables
    /// (logs + relations) — the number [`RetentionPolicy::max_bytes`] is
    /// compared against. Registries and meta are not included.
    pub fn retained_bytes(&mut self) -> Result<u64> {
        Ok(self.logs.total_bytes()? + self.relations.total_bytes()?)
    }

    // ---- registry browsing -----------------------------------------------------

    /// Schemas of every defined entity/actor type, sorted by type name.
    pub fn type_schemas(&self) -> Vec<TypeSchema> {
        let mut out: Vec<TypeSchema> = self
            .registries
            .values()
            .map(|r| r.schema().clone())
            .collect();
        out.sort_by(|a, b| a.type_name.cmp(&b.type_name));
        out
    }

    /// Latest version of every entity of a type, sorted by entity_id.
    pub fn list_entities(
        &self,
        type_name: &str,
        include_deleted: bool,
    ) -> Result<Vec<TargetSnapshot>> {
        let reg = self
            .registries
            .get(type_name)
            .ok_or_else(|| Error::Schema(format!("type '{type_name}' has no registry table")))?;
        let mut ids: Vec<&String> = reg.idx.entity_ids().collect();
        ids.sort();
        Ok(ids
            .into_iter()
            .filter_map(|id| reg.latest(id))
            .filter(|rec| include_deleted || !rec.deleted)
            .map(snapshot_of)
            .collect())
    }

    /// Full version history of one entity, oldest first (a delete shows up as
    /// a final version with `deleted: true`).
    pub fn entity_history(&self, type_name: &str, entity_id: &str) -> Result<Vec<TargetSnapshot>> {
        let reg = self
            .registries
            .get(type_name)
            .ok_or_else(|| Error::Schema(format!("type '{type_name}' has no registry table")))?;
        let uids = reg.all_version_uids(entity_id);
        if uids.is_empty() {
            return Err(Error::NotFound(format!(
                "entity '{entity_id}' of type '{type_name}'"
            )));
        }
        Ok(uids
            .iter()
            .map(|uid| match reg.version(uid) {
                Some(rec) => snapshot_of(rec),
                None => missing_snapshot(type_name, uid),
            })
            .collect())
    }

    // ---- queries ---------------------------------------------------------------

    /// A point-in-time, read-only view of the store. Building it is cheap
    /// (registry indexes are cloned, log/relation tables contribute only
    /// path+length bounds); the actual scan then runs on the snapshot without
    /// touching the store, so a slow full-scan query never blocks appends.
    pub fn snapshot(&mut self) -> Result<ReadSnapshot> {
        Ok(ReadSnapshot {
            keys: self.cfg.keys.clone(),
            registries: self
                .registries
                .iter()
                .map(|(k, v)| (k.clone(), v.idx.clone()))
                .collect(),
            logs: self.logs.slices()?,
            relations: self.relations.slices()?,
        })
    }

    /// Run a query. Targets and actor are resolved to the registry versions
    /// referenced at write time, so renamed entities show their historical
    /// values. Convenience for [`snapshot`](Self::snapshot)`()?.query(q)`.
    ///
    /// With [`StoreConfig::access_log`] enabled, the query is meta-audited
    /// under actor `"local"` (this direct embedded API has no caller
    /// identity; use [`query_as`](Self::query_as) to attribute it). The
    /// recording is fail-closed: if the access record cannot be persisted,
    /// the query errors — the synchronous embedded API favors the regulatory
    /// guarantee, while the async pipeline above favors availability.
    pub fn query(&mut self, q: &LogQuery) -> Result<Vec<LogView>> {
        self.query_as("local", q)
    }

    /// [`query`](Self::query) attributed to a named actor in the access log.
    pub fn query_as(&mut self, actor: &str, q: &LogQuery) -> Result<Vec<LogView>> {
        let hits = self.snapshot()?.query(q)?;
        if self.access.is_some() {
            self.record_access(AccessRecord::new(
                actor,
                "query_logs",
                crate::access::summarize_log_query(q),
                Some(hits.len() as u64),
            ))?;
        }
        Ok(hits)
    }

    // ---- meta-audit (access log) -------------------------------------------

    /// Whether the meta-audit access log is enabled.
    pub fn access_enabled(&self) -> bool {
        self.access.is_some()
    }

    /// Append one meta-audit record. A no-op when [`StoreConfig::access_log`]
    /// is off, so callers can record unconditionally. This is a plain append —
    /// it never reads, so it can never produce another access record
    /// (self-reference loops are structurally impossible).
    pub fn record_access(&mut self, rec: AccessRecord) -> Result<()> {
        let Some(access) = self.access.as_mut() else {
            return Ok(());
        };
        let ts = rec.timestamp;
        access.append(&rec, ts)?;
        access.flush()?;
        Ok(())
    }

    /// Read meta-audit records (oldest first) matching the filter. Errors
    /// when the access log is not enabled. Note: this is a *read* of the
    /// access log — callers exposing it (e.g. the server) record it as an
    /// access in turn; the recording itself is an append and does not recurse.
    pub fn access_records(&mut self, q: &AccessQuery) -> Result<Vec<AccessRecord>> {
        let Some(access) = self.access.as_mut() else {
            return Err(Error::Schema(
                "the access log is not enabled — set StoreConfig::access_log".into(),
            ));
        };
        let mut out = Vec::new();
        for rec in access.scan()? {
            let rec = rec?;
            if !q.matches(&rec) {
                continue;
            }
            out.push(rec);
            if q.limit.is_some_and(|n| out.len() >= n) {
                break;
            }
        }
        Ok(out)
    }

    /// Run a query and get one page plus a continuation cursor. Convenience
    /// for [`snapshot`](Self::snapshot)`()?.query_page(q)`.
    pub fn query_page(&mut self, q: &LogQuery) -> Result<QueryPage> {
        self.snapshot()?.query_page(q)
    }

    /// Count a query's matches without rendering them. Convenience for
    /// [`snapshot`](Self::snapshot)`()?.count(q)`.
    pub fn count(&mut self, q: &LogQuery) -> Result<u64> {
        self.snapshot()?.count(q)
    }

    /// Decrypt an RSA-protected stored value (requires the private key).
    pub fn decrypt(&self, v: &crate::model::StoredValue) -> Result<Vec<u8>> {
        self.cfg.keys.decrypt(v)
    }
}

/// Re-key one registry record: re-wrap RSA values to the active RSA version
/// and re-digest the index tokens of RSA fields under the active HMAC key
/// (their plaintext is recoverable, unlike one-way hashed fields' — those
/// keep their recorded version and stay matchable via multi-version probes).
fn rekey_record(
    mut rec: RegistryRecord,
    schema: &TypeSchema,
    keys: &KeyRing,
) -> Result<RegistryRecord> {
    let names: Vec<String> = rec
        .fields
        .iter()
        .filter(|(_, v)| matches!(v, StoredValue::Rsa { .. }))
        .map(|(k, _)| k.clone())
        .collect();
    for name in names {
        let rewrapped = keys.rewrap(&rec.fields[&name])?;
        if let Some(def) = schema.field(&name) {
            if def.search != FieldIndex::None {
                let plaintext = keys.decrypt(&rewrapped)?;
                let text = String::from_utf8_lossy(&plaintext).into_owned();
                let mut key_version = KEYLESS;
                let mut digests = Vec::new();
                for t in crate::tokens::value_tokens(&text, def.search) {
                    let (v, d) = keys.index_token_digest(&def.name, def.protection, &t)?;
                    key_version = v;
                    digests.push(d);
                }
                rec.tokens.insert(
                    name.clone(),
                    FieldTokens {
                        key_version,
                        digests,
                    },
                );
            }
        }
        rec.fields.insert(name, rewrapped);
    }
    Ok(rec)
}

fn snapshot_of(rec: &crate::registry::RegistryRecord) -> TargetSnapshot {
    TargetSnapshot {
        entity_registry_uid: rec.uid,
        entity_type: rec.entity_type.clone(),
        entity_id: rec.entity_id.clone(),
        version: rec.version,
        fields: rec.fields.clone(),
        deleted: rec.deleted,
        missing: false,
    }
}

fn missing_snapshot(entity_type: &str, uid: &Uid) -> TargetSnapshot {
    TargetSnapshot {
        entity_registry_uid: *uid,
        entity_type: entity_type.to_string(),
        entity_id: String::new(),
        version: 0,
        fields: BTreeMap::new(),
        deleted: false,
        missing: true,
    }
}

/// A point-in-time, read-only view of an [`AuditStore`] (see
/// [`AuditStore::snapshot`]). `Send`, so it can be handed to another thread:
/// the async pipeline runs query scans on the *caller's* thread against a
/// snapshot, which keeps the writer thread free to persist events — a slow
/// full-scan query can no longer push `emit` into `QueueFull` territory.
/// Rows appended after the snapshot was taken are not visible.
pub struct ReadSnapshot {
    keys: KeyRing,
    registries: HashMap<String, RegistryIndex>,
    logs: Vec<SegmentSlice>,
    relations: Vec<SegmentSlice>,
}

/// The per-query filter state shared by [`ReadSnapshot::query_page`] and
/// [`ReadSnapshot::count`]: registry filters resolved to uid sets once, so
/// the log scan itself is pure set/field checks per row.
struct ResolvedFilters {
    allowed_by_target: Option<HashSet<Uid>>,
    allowed_actor_uids: Option<HashSet<Uid>>,
}

impl ResolvedFilters {
    fn matches(&self, q: &LogQuery, log: &AuditLog) -> bool {
        // from/to are not re-checked here: the positioned scan already
        // filtered on the frame-header timestamp before deserializing
        if let Some(m) = &q.method {
            if !log.method.eq_ignore_ascii_case(m) {
                return false;
            }
        }
        if let Some(p) = &q.url_prefix {
            if !log.url.starts_with(p.as_str()) {
                return false;
            }
        }
        if let Some(allowed) = &self.allowed_by_target {
            if !allowed.contains(&log.log_id) {
                return false;
            }
        }
        if let Some(uids) = &self.allowed_actor_uids {
            if !uids.contains(&log.actor) {
                return false;
            }
        }
        q.custom.iter().all(|(k, v)| log.custom.get(k) == Some(v))
    }
}

impl ReadSnapshot {
    /// Run a query against this snapshot and return the matches of the first
    /// page (see [`query_page`](Self::query_page) for the cursor). `&mut
    /// self` because Contains searches lazily decrypt-and-cache RSA values.
    pub fn query(&mut self, q: &LogQuery) -> Result<Vec<LogView>> {
        Ok(self.query_page(q)?.logs)
    }

    /// Run a query and return one page plus a continuation cursor.
    ///
    /// Scan cost is bounded three ways:
    /// - log segments entirely outside `from_micros..=to_micros` are never
    ///   opened (per-segment time bounds live in the snapshot),
    /// - the scan walks in `q.order` and stops at `limit` matches, so a
    ///   newest-first page over a long history reads only the newest data,
    /// - relations are resolved only for the page's hits (never a whole-table
    ///   relation map in memory).
    pub fn query_page(&mut self, q: &LogQuery) -> Result<QueryPage> {
        let filters = self.resolve_filters(q)?;
        let mut scan = self.log_scan(q)?;
        let mut hits: Vec<(Position, AuditLog)> = Vec::new();
        let mut more = false;
        while let Some((pos, log)) = scan.next_row()? {
            if !filters.matches(q, &log) {
                continue;
            }
            if q.limit.is_some_and(|limit| hits.len() >= limit) {
                // a (limit+1)-th match exists, so the cursor is worth issuing
                more = true;
                break;
            }
            hits.push((pos, log));
        }
        let next_cursor = match (more, hits.last()) {
            (true, Some((pos, _))) => Some(crate::query::encode_cursor(q.order, *pos)),
            _ => None,
        };

        // resolve relations for the page's hits only; relation rows carry
        // their log's timestamp, so the same time window prunes here too
        let wanted: HashSet<Uid> = hits.iter().map(|(_, log)| log.log_id).collect();
        let mut rels_by_log: HashMap<Uid, Vec<TargetRelation>> = HashMap::new();
        if !wanted.is_empty() {
            let mut rels: PositionedScan<TargetRelation> = PositionedScan::new(
                self.relations.clone(),
                false,
                q.from_micros,
                q.to_micros,
                None,
            );
            while let Some((_, rel)) = rels.next_row()? {
                if wanted.contains(&rel.log_id) {
                    rels_by_log.entry(rel.log_id).or_default().push(rel);
                }
            }
        }
        Ok(QueryPage {
            logs: hits
                .into_iter()
                .map(|(_, log)| self.render(log, &rels_by_log))
                .collect(),
            next_cursor,
            segments_scanned: scan.segments_opened(),
        })
    }

    /// Count the query's matches without rendering them: no registry
    /// resolution, no relation lookup per hit, no decryption — just the
    /// pruned log scan and per-row filter checks. `limit` and `cursor` are
    /// ignored: the count is the total for the filters.
    pub fn count(&mut self, q: &LogQuery) -> Result<u64> {
        let filters = self.resolve_filters(q)?;
        let mut q = q.clone();
        q.cursor = None;
        q.order = Order::Asc; // cheapest direction; order is irrelevant to a count
        let mut scan = self.log_scan(&q)?;
        let mut n = 0u64;
        while let Some((_, log)) = scan.next_row()? {
            if filters.matches(&q, &log) {
                n += 1;
            }
        }
        Ok(n)
    }

    /// Resolve registry-side filters (targets, actor) to uid sets. Multiple
    /// target filters intersect (AND).
    fn resolve_filters(&mut self, q: &LogQuery) -> Result<ResolvedFilters> {
        let mut allowed_by_target: Option<HashSet<Uid>> = None;
        for f in &q.targets {
            let ids = self.log_ids_for_filter(f, q.from_micros, q.to_micros)?;
            allowed_by_target = Some(match allowed_by_target {
                Some(prev) => prev.intersection(&ids).copied().collect(),
                None => ids,
            });
        }
        let allowed_actor_uids: Option<HashSet<Uid>> = match &q.actor {
            Some(f) => Some(self.version_uids_for_filter(f)?),
            None => None,
        };
        Ok(ResolvedFilters {
            allowed_by_target,
            allowed_actor_uids,
        })
    }

    /// The time/cursor-bounded log scan for a query.
    fn log_scan(&self, q: &LogQuery) -> Result<PositionedScan<AuditLog>> {
        let after = match &q.cursor {
            Some(c) => Some(crate::query::decode_cursor(c, q.order)?),
            None => None,
        };
        Ok(PositionedScan::new(
            self.logs.clone(),
            q.order == Order::Desc,
            q.from_micros,
            q.to_micros,
            after,
        ))
    }

    /// All log_ids whose relations point at an entity matching the filter.
    /// Matching is by *entity*, not version: searching the current name also
    /// finds logs recorded under an older name, and vice versa. The relation
    /// scan is time-pruned: a relation row carries its log's timestamp, so
    /// logs outside the window cannot enter the set anyway.
    fn log_ids_for_filter(
        &mut self,
        f: &TargetFilter,
        from: Option<u64>,
        to: Option<u64>,
    ) -> Result<HashSet<Uid>> {
        let version_uids = self.version_uids_for_filter(f)?;
        let mut out = HashSet::new();
        let mut rels: PositionedScan<TargetRelation> =
            PositionedScan::new(self.relations.clone(), false, from, to, None);
        while let Some((_, rel)) = rels.next_row()? {
            if version_uids.contains(&rel.entity_registry_uid) {
                out.insert(rel.log_id);
            }
        }
        Ok(out)
    }

    /// Every version uid of every entity matching the filter.
    fn version_uids_for_filter(&mut self, f: &TargetFilter) -> Result<HashSet<Uid>> {
        let reg = self.registries.get_mut(&f.entity_type).ok_or_else(|| {
            Error::Schema(format!("type '{}' has no registry table", f.entity_type))
        })?;
        let entity_ids = reg.search(&f.field, &f.value, f.include_past, f.mode, &self.keys)?;
        let mut uids = HashSet::new();
        for id in entity_ids {
            uids.extend(reg.all_version_uids(&id).iter().copied());
        }
        Ok(uids)
    }

    fn render(&self, log: AuditLog, rels_by_log: &HashMap<Uid, Vec<TargetRelation>>) -> LogView {
        let actor = self.snapshot(&log.actor_type, &log.actor);
        let mut targets = Vec::new();
        if let Some(rels) = rels_by_log.get(&log.log_id) {
            for rel in rels {
                targets.push(self.snapshot(&rel.entity_type, &rel.entity_registry_uid));
            }
        }
        LogView {
            log_id: log.log_id,
            timestamp_micros: log.timestamp,
            timestamp: crate::time::format_rfc3339(log.timestamp),
            actor,
            method: log.method,
            url: log.url,
            content: log.content,
            targets,
            custom: log.custom,
        }
    }

    /// Resolve one registry version. An unresolvable reference renders as a
    /// `missing` placeholder instead of an error: one broken/lost registry
    /// record must not make every query touching that log fail.
    fn snapshot(&self, entity_type: &str, uid: &Uid) -> TargetSnapshot {
        match self
            .registries
            .get(entity_type)
            .and_then(|r| r.version(uid))
        {
            Some(rec) => snapshot_of(rec),
            None => missing_snapshot(entity_type, uid),
        }
    }
}