batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
use super::*;
use crate::store::cold_start::rebuild::OpenIndexReport;
use std::collections::BTreeMap;

struct OpenComponents {
    runtime: Arc<config::ValidatedStoreConfig>,
    config: Arc<StoreConfig>,
    index: Arc<StoreIndex>,
    reader: Arc<Reader>,
    open_report: cold_start::rebuild::OpenIndexReport,
    cumulative_reserved_kind_fallbacks: segment::sidx::ReservedKindFallbackStats,
    store_lock: dir_lock::StoreDirLock,
}

/// Open-time health checks over the link-time `EventPayload` registries, both
/// gated by the single [`EventPayloadValidation`] policy (default `FailFast`):
///   1. kind collisions — two payload types claiming the same wire identity;
///   2. incomplete upcast chains — a `version > 1` kind whose registered
///      `Upcast` steps don't cover every `1 -> ... -> N` hop, which would let
///      its older events silently become undecodable at read time.
fn validate_payload_registry_for_open(config: &StoreConfig) -> Result<(), StoreError> {
    validate_payload_collisions_for_open(config)?;
    validate_upcast_chains_for_open(config)?;
    Ok(())
}

fn validate_payload_collisions_for_open(config: &StoreConfig) -> Result<(), StoreError> {
    let Err(error) = event::payload::cached_event_payload_registry_validation() else {
        return Ok(());
    };
    match config.event_payload_validation {
        EventPayloadValidation::Warn => {
            if event::payload::mark_event_payload_registry_warning_emitted() {
                tracing::warn!(
                    target: "batpak::event_registry",
                    collisions = ?error.collisions(),
                    "duplicate EventPayload kind registrations detected; EventPayloadValidation::Warn is set, so the store opened despite ambiguous wire identity. The default EventPayloadValidation::FailFast refuses to open on collision"
                );
            }
            Ok(())
        }
        EventPayloadValidation::FailFast => Err(StoreError::EventPayloadRegistry(error)),
        EventPayloadValidation::Silent => Ok(()),
    }
}

fn validate_upcast_chains_for_open(config: &StoreConfig) -> Result<(), StoreError> {
    let Err(error) = event::upcast::cached_upcast_chain_registry_validation() else {
        return Ok(());
    };
    match config.event_payload_validation {
        EventPayloadValidation::Warn => {
            if event::upcast::mark_upcast_chain_registry_warning_emitted() {
                tracing::warn!(
                    target: "batpak::event_registry",
                    incomplete_chains = ?error.incomplete_chains(),
                    "EventPayload kind(s) declare version > 1 without a complete upcast chain; EventPayloadValidation::Warn is set, so the store opened despite events at older versions being undecodable at read time. The default EventPayloadValidation::FailFast refuses to open on an incomplete chain"
                );
            }
            Ok(())
        }
        EventPayloadValidation::FailFast => Err(StoreError::UpcastChainIncomplete(error)),
        EventPayloadValidation::Silent => Ok(()),
    }
}

fn open_components(
    mut config: StoreConfig,
    lock_mode: StoreLockMode,
) -> Result<OpenComponents, StoreError> {
    validate_payload_registry_for_open(&config)?;
    config.fs().create_dir_all(&config.data_dir)?;
    config.data_dir = platform::fs::canonicalize(&config.data_dir).map_err(StoreError::Io)?;
    let configured_signing_keys = config.signing_keys.len();
    tracing::debug!(
        configured_signing_keys,
        "opening store with configured signing registry"
    );
    let store_lock = dir_lock::StoreDirLock::acquire(&config.data_dir, lock_mode)?;
    // Rehydrate the durable crypto-shred keyset UNDER the store lock (fail closed
    // on corrupt) and share it through the runtime, so the writer's mint/seal/
    // flush path and the store's decrypt-on-read path operate on ONE live keyset.
    // Both the read-write and read-only open paths flow through here, so neither
    // can drift into loading the keyset differently.
    let runtime = {
        #[cfg(feature = "payload-encryption")]
        {
            let mut validated = config.validated()?;
            validated.key_store = load_key_store(&config)?;
            Arc::new(validated)
        }
        #[cfg(not(feature = "payload-encryption"))]
        {
            Arc::new(config.validated()?)
        }
    };
    if let Some(profile_path) = config.platform_profile_path.as_ref() {
        let _verified_platform_evidence =
            platform::profile::PlatformProfile::verify_current_store_path(
                profile_path,
                &config.data_dir,
                runtime.clock(),
            )?;
    }
    let config = Arc::new(config);
    let index = Arc::new(StoreIndex::with_config(&config.index));
    let reader = Arc::new(Reader::new(
        config.data_dir.clone(),
        config.fd_budget,
        &runtime.clock_arc(),
        Arc::clone(config.fs()),
    ));

    // Cold start: checkpoint/mmap fast paths or full segment scan.
    // Segment files are named so lexicographic order matches replay order.
    // The fault injector only exists when `dangerous-test-hooks` is enabled;
    // otherwise the cold-start path takes an inert `&()` (see FaultInjectorRef).
    #[cfg(feature = "dangerous-test-hooks")]
    let cold_start_fault_injector = &config.fault_injector;
    #[cfg(not(feature = "dangerous-test-hooks"))]
    let cold_start_fault_injector = &();
    let open_outcome = cold_start::rebuild::open_index(
        &index,
        &reader,
        &config.data_dir,
        runtime.cold_start,
        runtime.clock(),
        cold_start_fault_injector,
    )?;

    // Tell the reader which segment is active (for mmap dispatch).
    // The writer's initial segment ID is the highest existing + 1.
    let active_seg_id = next_active_segment_id(&config.data_dir)?;
    reader.set_active_segment(active_seg_id);

    Ok(OpenComponents {
        runtime,
        config,
        index,
        reader,
        open_report: open_outcome.report,
        cumulative_reserved_kind_fallbacks: open_outcome.cumulative_reserved_kind_fallbacks,
        store_lock,
    })
}

fn next_active_segment_id(data_dir: &std::path::Path) -> Result<u64, StoreError> {
    Ok(write::writer::find_latest_segment_id(data_dir)?.unwrap_or(0) + 1)
}

/// Cold-start hook for the opt-in crypto-shred keyset (Stage B).
///
/// When `payload_encryption` is `Some(granularity)`, rehydrate the durable
/// keyset from the store directory into an in-memory [`keyscope::KeyStore`]; when
/// `None`, do nothing (no encryption configured). A corrupt/unreadable keyset
/// fails the open closed via [`keyscope::KeyStore::load`] — it must NOT degrade
/// to an empty store, which would silently crypto-shred live payloads. Shared by
/// the read-write and read-only open paths so neither can drift into loading the
/// keyset differently. Stage C reads/mints into the held store from the
/// append/read paths; Stage B only loads and holds it.
#[cfg(feature = "payload-encryption")]
fn load_key_store(
    config: &StoreConfig,
) -> Result<Option<Arc<Mutex<keyscope::KeyStore>>>, StoreError> {
    let Some(granularity) = config.payload_encryption else {
        return Ok(None);
    };
    let key_store =
        keyscope::KeyStore::load_with_fs(&config.data_dir, config.fs().as_ref(), granularity)?;
    Ok(Some(Arc::new(Mutex::new(key_store))))
}

#[cfg(feature = "payload-encryption")]
#[cfg_attr(
    all(docsrs, not(batpak_stable_docs)),
    doc(cfg(feature = "payload-encryption"))
)]
impl<State: StoreState> Store<State> {
    /// Number of crypto-shred payload keys the store loaded from disk at open,
    /// or `None` when `payload_encryption` is not configured.
    ///
    /// Observability only — never exposes key material. This is the public window
    /// onto the Stage B cold-start rehydration: after opening an encrypted store
    /// whose keyset was flushed, this reports the recovered key count. Stage C
    /// mints and destroys through the same held store as it wires the payload
    /// paths, so this count tracks the live keyset thereafter.
    #[must_use]
    pub fn payload_key_count(&self) -> Option<usize> {
        self.key_store
            .as_ref()
            .map(|key_store| key_store.lock().key_count())
    }
}

fn emit_open_report_observability(config: &StoreConfig, report: &OpenIndexReport) {
    tracing::info!(
        target: "batpak::open",
        path = ?report.path,
        restored_entries = report.restored_entries,
        tail_entries = report.tail_entries,
        elapsed_us = report.elapsed_us,
        phase_plan_build_us = report.phase_plan_build_us,
        phase_interner_us = report.phase_interner_us,
        phase_restore_index_us = report.phase_restore_index_us,
        phase_hidden_ranges_us = report.phase_hidden_ranges_us,
        unknown_reserved_system_kind_fallbacks = report.unknown_reserved_system_kind_fallbacks,
        unknown_reserved_effect_kind_fallbacks = report.unknown_reserved_effect_kind_fallbacks,
        cumulative_unknown_reserved_system_kind_fallbacks = report
            .cumulative_unknown_reserved_system_kind_fallbacks,
        cumulative_unknown_reserved_effect_kind_fallbacks = report
            .cumulative_unknown_reserved_effect_kind_fallbacks,
        unknown_reserved_system_kind_histogram = ?report.unknown_reserved_system_kind_histogram,
        unknown_reserved_effect_kind_histogram = ?report.unknown_reserved_effect_kind_histogram,
        cumulative_unknown_reserved_system_kind_histogram =
            ?report.cumulative_unknown_reserved_system_kind_histogram,
        cumulative_unknown_reserved_effect_kind_histogram =
            ?report.cumulative_unknown_reserved_effect_kind_histogram,
        "store open completed"
    );

    let Some(observer) = config.open_report_observer.as_ref() else {
        return;
    };
    let observer = Arc::clone(observer);
    if catch_unwind(AssertUnwindSafe(|| observer(report))).is_err() {
        tracing::warn!(
            target: "batpak::open",
            "open report observer panicked; continuing with successful open"
        );
    }
}

/// Run the opt-in at-open hash-chain recompute.
///
/// Default [`ChainVerification::Crc`] pays nothing — the per-frame CRC already
/// guarded every frame at read time, so this skips the `O(events)` rehash.
/// Under [`ChainVerification::Recompute`] the store recomputes blake3 over every
/// committed event at open and FAILS CLOSED with
/// [`StoreError::ChainVerificationFailed`] on any content-hash mismatch or
/// dangling chain link — the regulated tamper-evidence-at-open posture. Shared
/// by both the read-write and read-only open paths so neither drifts.
fn run_open_chain_verification<State: StoreState>(store: &Store<State>) -> Result<(), StoreError> {
    if store.config.chain_verification() != ChainVerification::Recompute {
        return Ok(());
    }
    if let Some(error) = chain_verification_failure(&store.verify_chain()?) {
        return Err(error);
    }
    Ok(())
}

/// Map an at-open recompute report to its fail-closed error, or `None` when the
/// store verifies intact. Pure so the Recompute-vs-intact decision is unit
/// testable without forging on-disk tampering.
fn chain_verification_failure(report: &ChainVerificationReport) -> Option<StoreError> {
    if report.is_intact() {
        return None;
    }
    Some(StoreError::ChainVerificationFailed {
        content_hash_mismatches: report.content_hash_mismatches.len(),
        dangling_links: report.dangling_links.len(),
    })
}

fn highest_index_hlc(index: &StoreIndex) -> HlcPoint {
    index
        .all_entries()
        .into_iter()
        .map(|entry| HlcPoint {
            wall_ms: entry.wall_ms,
            global_sequence: entry.global_sequence,
        })
        .reduce(HlcPoint::max_by_sequence)
        .unwrap_or(HlcPoint::ORIGIN)
}

fn highest_visible_index_hlc(index: &StoreIndex) -> HlcPoint {
    index
        .visible_entries()
        .into_iter()
        .map(|entry| HlcPoint {
            wall_ms: entry.wall_ms,
            global_sequence: entry.global_sequence,
        })
        .reduce(HlcPoint::max_by_sequence)
        .unwrap_or(HlcPoint::ORIGIN)
}

fn highest_index_hlc_by_lane(
    entries: impl IntoIterator<Item = crate::store::index::IndexEntry>,
) -> BTreeMap<u32, HlcPoint> {
    let mut lanes = BTreeMap::new();
    for entry in entries {
        let point = HlcPoint {
            wall_ms: entry.wall_ms,
            global_sequence: entry.global_sequence,
        };
        lanes
            .entry(entry.dag_lane)
            .and_modify(|current: &mut HlcPoint| *current = (*current).max_by_sequence(point))
            .or_insert(point);
    }
    lanes
}

fn highest_durable_index_hlc_by_lane(index: &StoreIndex) -> BTreeMap<u32, HlcPoint> {
    highest_index_hlc_by_lane(index.all_entries())
}

fn highest_visible_index_hlc_by_lane(index: &StoreIndex) -> BTreeMap<u32, HlcPoint> {
    highest_index_hlc_by_lane(index.visible_entries())
}

fn last_close_hlc(index: &StoreIndex) -> Result<HlcPoint, StoreError> {
    let mut close_points: Vec<_> = index
        .all_entries()
        .into_iter()
        .filter(|entry| entry.kind == EventKind::SYSTEM_CLOSE_COMPLETED)
        .map(|entry| HlcPoint {
            wall_ms: entry.wall_ms,
            global_sequence: entry.global_sequence,
        })
        .collect();
    close_points.sort_by_key(|point| point.global_sequence);

    let mut latest = HlcPoint::ORIGIN;
    for close_hlc in close_points {
        if close_hlc < latest {
            return Err(StoreError::InvariantViolation {
                kind: StoreInvariant::CloseHlcRegression {
                    previous: latest,
                    later: close_hlc,
                },
            });
        }
        latest = close_hlc;
    }

    Ok(latest)
}

fn lifecycle_open_candidate(
    runtime: &config::ValidatedStoreConfig,
    max_recovered_hlc: HlcPoint,
    last_close_hlc: HlcPoint,
) -> Result<HlcPoint, StoreError> {
    let now_ms = match config::wall_ms_from_timestamp_us(runtime.now_us()) {
        Ok(now_ms) => now_ms,
        Err(StoreError::InvalidClock { .. }) => 0,
        Err(error) => return Err(error),
    };
    Ok(max_recovered_hlc.max(last_close_hlc).max(HlcPoint {
        wall_ms: now_ms,
        global_sequence: max_recovered_hlc.global_sequence,
    }))
}

fn validate_bootstrap_hlc(
    open_hlc: HlcPoint,
    max_recovered_hlc: HlcPoint,
    last_close_hlc: HlcPoint,
) -> Result<(), StoreError> {
    if open_hlc < max_recovered_hlc || open_hlc < last_close_hlc {
        return Err(StoreError::InvariantViolation {
            kind: StoreInvariant::BootstrapHlcOutOfOrder {
                open_hlc,
                max_recovered_hlc,
                last_close_hlc,
            },
        });
    }
    Ok(())
}

fn bootstrap_open_hlc(
    runtime: &config::ValidatedStoreConfig,
    index: &StoreIndex,
) -> Result<HlcPoint, StoreError> {
    let max_recovered_hlc = highest_index_hlc(index);
    let last_close_hlc = last_close_hlc(index)?;
    let open_hlc = lifecycle_open_candidate(runtime, max_recovered_hlc, last_close_hlc)?;
    validate_bootstrap_hlc(open_hlc, max_recovered_hlc, last_close_hlc)?;
    Ok(open_hlc)
}

fn bootstrap_read_only_hlc(index: &StoreIndex) -> Result<HlcPoint, StoreError> {
    let max_recovered_hlc = highest_index_hlc(index);
    let _last_close_hlc = last_close_hlc(index)?;
    Ok(max_recovered_hlc)
}

pub(super) fn timestamp_us_for_hlc(point: HlcPoint) -> Result<i64, StoreError> {
    let timestamp_us =
        point
            .wall_ms
            .checked_mul(1000)
            .ok_or_else(|| StoreError::InvariantViolation {
                kind: StoreInvariant::OpenHlcWallMsOverflow {
                    wall_ms: point.wall_ms,
                },
            })?;
    i64::try_from(timestamp_us).map_err(|_| StoreError::InvariantViolation {
        kind: StoreInvariant::OpenHlcTimestampOutOfRange {
            wall_ms: point.wall_ms,
        },
    })
}

fn append_open_completed_event(
    store: &Store<Open>,
    report: &OpenIndexReport,
    open_candidate: HlcPoint,
) -> Result<HlcPoint, StoreError> {
    let coord = Coordinate::new("batpak:store", "batpak:lifecycle")?;
    let submission = AppendSubmission::with_options(
        AppendOptions::default().with_idempotency(crate::id::IdempotencyKey::from(
            crate::id::generate_v7_id_with_clock(store.runtime.clock()),
        )),
        store.runtime.clock(),
    );
    submission.validate_route(store)?;
    submission.validate_idempotency(store)?;
    let event = submission.build_event(
        report,
        EventKind::SYSTEM_OPEN_COMPLETED,
        timestamp_us_for_hlc(open_candidate)?,
    )?;

    let (tx, rx) = flume::bounded(1);
    let command = submission.into_command(coord, EventKind::SYSTEM_OPEN_COMPLETED, event, tx);
    store
        .writer_handle()?
        .tx
        .send(command)
        .map_err(|_| StoreError::WriterCrashed)?;
    // Cooperative mode: drive the queued command inline before awaiting its
    // reply (no-op under the threaded path, so production is unaffected).
    store.writer_handle()?.pump();
    let receipt = recv_writer_reply(&rx)?;
    let receipt_event_id_raw = {
        use crate::id::EntityIdType;
        receipt.event_id.as_u128()
    };
    let open_hlc = store
        .index
        .get_by_id(receipt_event_id_raw)
        .map(|entry| HlcPoint {
            wall_ms: entry.wall_ms,
            global_sequence: entry.global_sequence,
        })
        .ok_or_else(|| StoreError::InvariantViolation {
            kind: StoreInvariant::OpenReceiptNotIndexed {
                event_id: receipt_event_id_raw,
            },
        })?;
    validate_bootstrap_hlc(open_hlc, open_candidate, last_close_hlc(&store.index)?)?;
    Ok(open_hlc)
}

impl Store<Open> {
    /// Open a store at the given config's data directory. Creates the directory if absent.
    /// Uses `NoCache` for projection (no external cache backend).
    ///
    /// # Errors
    /// Returns [`StoreError::StoreLocked`] if another live store handle already
    /// owns the directory lock.
    /// Returns `StoreError::Io` if the data directory cannot be created or segments cannot be read.
    pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
        Self::open_with_cache(config, Box::new(NoCache))
    }

    /// Test-only: open a store in cooperative (single-threaded) writer mode.
    ///
    /// There is NO writer thread — the writer pipeline runs inline on the
    /// calling thread, driven by pumping the command queue at every reply-await
    /// funnel. Exposed under `dangerous-test-hooks` because `WriterMode` and
    /// its builder are crate-internal; this is not a public API surface.
    ///
    /// # Errors
    /// Same as [`Store::open`].
    #[cfg(feature = "dangerous-test-hooks")]
    #[cfg_attr(
        all(docsrs, not(batpak_stable_docs)),
        doc(cfg(feature = "dangerous-test-hooks"))
    )]
    pub fn open_cooperative(config: StoreConfig) -> Result<Self, StoreError> {
        Self::open_with_cache(
            config.with_writer_mode(crate::store::config::WriterMode::Cooperative),
            Box::new(NoCache),
        )
    }

    /// Open a store with the built-in file-backed projection cache.
    ///
    /// # Errors
    /// Returns [`StoreError::CacheFailed`] if the cache directory cannot be created,
    /// or any error from [`Store::open_with_cache`].
    pub fn open_with_native_cache(
        config: StoreConfig,
        cache_path: impl AsRef<std::path::Path>,
    ) -> Result<Self, StoreError> {
        Self::open_with_cache(config, Box::new(NativeCache::open(cache_path)?))
    }

    /// Open a store with a custom projection cache backend.
    /// Use [`NativeCache`] for file-backed cache-accelerated `project()` calls.
    ///
    /// # Errors
    /// Returns [`StoreError::StoreLocked`] if another live store handle already
    /// owns the directory lock.
    /// Returns `StoreError::Io` if the data directory cannot be created or segments cannot be read.
    pub fn open_with_cache(
        config: StoreConfig,
        cache: Box<dyn ProjectionCache>,
    ) -> Result<Self, StoreError> {
        let OpenComponents {
            runtime,
            config,
            index,
            reader,
            open_report,
            cumulative_reserved_kind_fallbacks,
            store_lock,
        } = open_components(config, StoreLockMode::Mutable)?;

        let open_candidate = bootstrap_open_hlc(&runtime, &index)?;
        let subscribers = Arc::new(SubscriberList::new());
        let reactor_subscribers = Arc::new(ReactorSubscriberList::new());
        let writer = match config.writer_mode() {
            crate::store::config::WriterMode::Threaded => WriterHandle::spawn(
                &config,
                &runtime,
                &index,
                &subscribers,
                &reactor_subscribers,
                &reader,
            )?,
            #[cfg(feature = "dangerous-test-hooks")]
            crate::store::config::WriterMode::Cooperative => WriterHandle::cooperative(
                &config,
                &runtime,
                &index,
                &subscribers,
                &reactor_subscribers,
                &reader,
            )?,
        };
        let watermark_handle = writer.watermark_handle();
        let projection_registry = ProjectionRegistry::new(watermark_handle.clone());

        // Share the ONE keyset the runtime rehydrated at open (see
        // `open_components`) with the store's read path — the writer already
        // holds the identical `Arc` through the runtime.
        #[cfg(feature = "payload-encryption")]
        let key_store = runtime.key_store.clone();

        let store = Self {
            index,
            reader,
            cache,
            watermark_handle,
            projection_registry,
            lifecycle_gate: Mutex::new(()),
            config,
            runtime,
            should_shutdown_on_drop: true,
            open_report: Some(open_report.clone()),
            cumulative_reserved_kind_fallbacks,
            #[cfg(feature = "payload-encryption")]
            key_store,
            state: Open(writer),
            _store_lock: store_lock,
        };

        emit_open_report_observability(&store.config, &open_report);
        let open_hlc = append_open_completed_event(&store, &open_report, open_candidate)?;
        lifecycle::sync(&store)?;
        store.watermark_handle.lock().reset_to_bootstrap_lanes(
            open_hlc,
            open_hlc,
            highest_durable_index_hlc_by_lane(&store.index),
            highest_visible_index_hlc_by_lane(&store.index),
        );

        // Opt-in tamper-evidence at open: with the store fully built, recompute
        // the hash chain and fail closed if it is not intact (no-op under Crc).
        run_open_chain_verification(&store)?;
        Ok(store)
    }
}

impl Store<ReadOnly> {
    /// Open the store without starting a writer thread.
    ///
    /// # Errors
    /// Returns any configuration, directory-creation, or cold-start rebuild
    /// error surfaced while opening the store in read-only mode.
    pub fn open_read_only(config: StoreConfig) -> Result<Self, StoreError> {
        Self::open_read_only_with_cache(config, Box::new(NoCache))
    }

    /// Open the store in read-only mode with the built-in projection cache.
    ///
    /// # Errors
    /// Returns [`StoreError::CacheFailed`] if the native cache cannot be
    /// opened, or any error returned by [`Store::open_read_only_with_cache`].
    pub fn open_read_only_with_native_cache(
        config: StoreConfig,
        cache_path: impl AsRef<std::path::Path>,
    ) -> Result<Self, StoreError> {
        Self::open_read_only_with_cache(config, Box::new(NativeCache::open(cache_path)?))
    }

    /// Open the store in read-only mode with a custom projection cache backend.
    ///
    /// # Errors
    /// Returns [`StoreError::StoreLocked`] if another live store handle already
    /// owns the directory lock. Read-only opens are also exclusive under the
    /// current store-ownership contract.
    /// Returns any configuration, directory-creation, or cold-start rebuild
    /// error surfaced while opening the store in read-only mode.
    pub fn open_read_only_with_cache(
        config: StoreConfig,
        cache: Box<dyn ProjectionCache>,
    ) -> Result<Self, StoreError> {
        let OpenComponents {
            runtime,
            config,
            index,
            reader,
            open_report,
            cumulative_reserved_kind_fallbacks,
            store_lock,
        } = open_components(config, StoreLockMode::ReadOnly)?;

        let recovered_hlc = bootstrap_read_only_hlc(&index)?;
        let watermark_handle = WatermarkState::bootstrap_handle(recovered_hlc, runtime.clock_arc());
        watermark_handle.lock().reset_to_bootstrap_lanes(
            recovered_hlc,
            highest_visible_index_hlc(&index),
            highest_durable_index_hlc_by_lane(&index),
            highest_visible_index_hlc_by_lane(&index),
        );
        let projection_registry = ProjectionRegistry::new(watermark_handle.clone());

        // Read-only opens decrypt on the read path too: reuse the ONE keyset the
        // runtime rehydrated at open (see `open_components`).
        #[cfg(feature = "payload-encryption")]
        let key_store = runtime.key_store.clone();

        let store = Self {
            index,
            reader,
            cache,
            watermark_handle,
            projection_registry,
            lifecycle_gate: Mutex::new(()),
            config,
            runtime,
            should_shutdown_on_drop: false,
            open_report: Some(open_report.clone()),
            cumulative_reserved_kind_fallbacks,
            #[cfg(feature = "payload-encryption")]
            key_store,
            state: ReadOnly,
            _store_lock: store_lock,
        };

        emit_open_report_observability(&store.config, &open_report);

        // Opt-in tamper-evidence at open: with the store fully built, recompute
        // the hash chain and fail closed if it is not intact (no-op under Crc).
        run_open_chain_verification(&store)?;
        Ok(store)
    }
}

#[cfg(test)]
mod tests;