Skip to main content

elastik_core/
engine.rs

1//! Unstable protocol-neutral Engine facade.
2//!
3//! This module is always compiled so internal adapters can migrate to it
4//! without tying default builds to a public API feature. External crates only
5//! see these types through crate-root re-exports when `unstable-engine` is
6//! enabled.
7
8#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
9
10use std::{
11    collections::VecDeque,
12    fmt,
13    path::PathBuf,
14    sync::{
15        atomic::{AtomicBool, AtomicUsize},
16        Arc, Mutex as StdMutex,
17    },
18};
19
20use dashmap::DashMap;
21use tokio::sync::{broadcast, watch, Semaphore};
22
23use crate::{
24    audit,
25    auth::{self, AuthGate, NonEmptyBytes},
26    defaults::{
27        DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES,
28        DEFAULT_MAX_WORLD_BYTES, DEFAULT_READ_CACHE_MAX_ENTRIES,
29    },
30    engine_types::{AccessTier, SecretBytes},
31    read_cache::ReadCache,
32    state::{new_event_counter, Core},
33    storage_class, store, world,
34};
35
36/// Public handle for the protocol-neutral Elastik engine.
37///
38/// `Engine` is cloneable and owns the startup writer lock for the data root.
39/// Dropping the last clone releases the lock.
40#[derive(Clone)]
41pub struct Engine {
42    inner: Arc<EngineInner>,
43}
44
45struct EngineInner {
46    core: Arc<Core>,
47    shutdown_tx: watch::Sender<bool>,
48    _data_lock: StdMutex<rusqlite::Connection>,
49}
50
51/// Adapter-side handle for waiting on Engine shutdown without exposing Tokio's
52/// watch channel type in the public facade.
53#[cfg(feature = "unstable-engine")]
54#[doc(hidden)]
55pub struct ShutdownToken {
56    rx: watch::Receiver<bool>,
57}
58
59/// Builder for an `Engine`.
60///
61/// The builder is the only public construction path. New fields can gain
62/// defaults without breaking callers.
63pub struct EngineBuilder {
64    data_root: PathBuf,
65    key: Option<SecretBytes>,
66    tokens: auth::Tokens,
67    max_world_bytes: usize,
68    max_memory_bytes: usize,
69    max_storage_bytes: Option<usize>,
70    max_listen_connections: usize,
71    listen_replay_max: usize,
72    read_cache_max_entries: usize,
73}
74
75/// Errors that can occur while constructing an [`Engine`].
76///
77/// Build failures are setup-time failures: they happen once at
78/// [`EngineBuilder::build`] and never as part of per-operation runtime errors,
79/// which are reported as [`EngineError`].
80#[derive(Debug)]
81#[non_exhaustive]
82pub enum EngineBuildError {
83    /// Creating the `data_root` directory or its writer-lock file failed.
84    DataRootIo(std::io::Error),
85    /// Another process holds the writer lock on `data_root`.
86    DataRootLockHeld {
87        /// The data root that failed to acquire the writer lock.
88        path: PathBuf,
89    },
90    /// [`EngineBuilder::key`] was never called.
91    HmacKeyMissing,
92    /// Startup audit verification found a tampered HMAC chain.
93    AuditChainCorrupted {
94        /// Canonical name of the world whose chain failed verification.
95        world: String,
96        /// Human-readable failure detail (do not parse).
97        detail: String,
98    },
99    /// Storage layer failure during startup (schema, IO, or quota).
100    Storage {
101        /// Underlying SQLite extended result code, when available.
102        sqlite_code: Option<i32>,
103        /// Human-readable failure detail (do not parse).
104        detail: String,
105    },
106}
107
108/// Runtime operation errors reported by the Engine facade.
109///
110/// Distinct from [`EngineBuildError`]: these are per-operation failures
111/// returned by [`Engine::read`], [`Engine::replace`], [`Engine::append`],
112/// [`Engine::delete`], [`Engine::subscribe`], and the introspection methods.
113/// The enum is `#[non_exhaustive]`; match on the variants you care about and
114/// route the rest through a default arm.
115#[derive(Debug)]
116#[non_exhaustive]
117pub enum EngineError {
118    /// Caller's tier is too low for the requested gate.
119    Auth(AuthGate),
120    /// The supplied world name failed canonical-path validation.
121    InvalidWorldName,
122    /// The world does not exist.
123    NotFound,
124    /// The target world is append-only and refuses delete/overwrite (for
125    /// example `var/log/deletes`).
126    AppendOnly,
127    /// Request body exceeded the per-world byte limit.
128    PayloadTooLarge {
129        /// Configured maximum body length, in bytes.
130        max: usize,
131    },
132    /// An `If-Match` / `If-None-Match` precondition rejected the write.
133    PreconditionFailed {
134        /// Static reason string (`stale`, `exists`, ...).
135        message: &'static str,
136    },
137    /// Write would exceed the configured durable storage quota.
138    QuotaExceeded {
139        /// Bytes already in use.
140        used: usize,
141        /// Configured quota ceiling.
142        quota: usize,
143        /// Projected total if the write were applied.
144        projected: usize,
145    },
146    /// Storage is temporarily unavailable (e.g. SQLite `BUSY`/`LOCKED`).
147    /// Callers should retry with backoff.
148    TransientStorage {
149        /// Underlying SQLite extended result code, when available.
150        sqlite_code: Option<i32>,
151    },
152    /// Storage backing exhausted (full disk, IO failure that maps to
153    /// "no space left"). Callers must surface this as 5xx-class to operators.
154    InsufficientStorage {
155        /// Underlying SQLite extended result code, when available.
156        sqlite_code: Option<i32>,
157    },
158    /// Generic storage failure that is neither transient nor insufficient.
159    Storage {
160        /// Underlying SQLite extended result code, when available.
161        sqlite_code: Option<i32>,
162    },
163    /// Subscription slot semaphore is exhausted.
164    SubscriptionLimit,
165    /// [`Engine::shutdown`] has been called; do not start new operations.
166    ShuttingDown,
167    /// Internal invariant violation. Indicates a bug in the engine.
168    InternalInvariant(&'static str),
169}
170
171impl Default for EngineBuilder {
172    fn default() -> Self {
173        Self {
174            data_root: PathBuf::from("./data"),
175            key: None,
176            tokens: auth::Tokens {
177                read: None,
178                write: None,
179                approve: None,
180            },
181            max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
182            max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
183            max_storage_bytes: None,
184            max_listen_connections: DEFAULT_MAX_LISTEN_CONNECTIONS,
185            listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
186            read_cache_max_entries: DEFAULT_READ_CACHE_MAX_ENTRIES,
187        }
188    }
189}
190
191impl EngineBuilder {
192    /// Sets the directory where durable worlds and the audit log live.
193    ///
194    /// Default: `./data`. The directory is created if missing.
195    pub fn data_root(mut self, path: impl Into<PathBuf>) -> Self {
196        self.data_root = path.into();
197        self
198    }
199
200    /// Sets the HMAC key that protects the audit chain. Required.
201    pub fn key(mut self, key: SecretBytes) -> Self {
202        self.key = Some(key);
203        self
204    }
205
206    /// Sets the optional read-tier token.
207    ///
208    /// Empty or all-whitespace bytes are treated as "unset" — they never
209    /// silently grant access.
210    pub fn read_token(mut self, token: impl Into<Vec<u8>>) -> Self {
211        self.tokens.read = NonEmptyBytes::new(token);
212        self
213    }
214
215    /// Sets the optional write-tier token (covers read + write).
216    ///
217    /// Empty or all-whitespace bytes are treated as "unset" — they never
218    /// silently grant access.
219    pub fn write_token(mut self, token: impl Into<Vec<u8>>) -> Self {
220        self.tokens.write = NonEmptyBytes::new(token);
221        self
222    }
223
224    /// Sets the optional approve-tier token (covers read + write + system
225    /// writes + delete).
226    ///
227    /// Empty or all-whitespace bytes are treated as "unset" — they never
228    /// silently grant access.
229    pub fn approve_token(mut self, token: impl Into<Vec<u8>>) -> Self {
230        self.tokens.approve = NonEmptyBytes::new(token);
231        self
232    }
233
234    /// Caps the per-world payload size in bytes. Defaults to 64 MiB.
235    pub fn max_world_bytes(mut self, value: usize) -> Self {
236        self.max_world_bytes = value;
237        self
238    }
239
240    /// Caps the in-memory backend's total bytes. Defaults to 256 MiB.
241    pub fn max_memory_bytes(mut self, value: usize) -> Self {
242        self.max_memory_bytes = value;
243        self
244    }
245
246    /// Sets an optional durable storage quota in bytes.
247    ///
248    /// `Some(0)` is treated as "no quota". Writes that would push the total
249    /// past the quota fail with [`EngineError::QuotaExceeded`].
250    pub fn max_storage_bytes(mut self, value: Option<usize>) -> Self {
251        self.max_storage_bytes = value.filter(|value| *value > 0);
252        self
253    }
254
255    /// Sets the maximum simultaneous subscription slots. `0` reverts to the
256    /// default (1024).
257    pub fn max_listen_connections(mut self, value: usize) -> Self {
258        self.max_listen_connections = nonzero_or_default(value, DEFAULT_MAX_LISTEN_CONNECTIONS);
259        self
260    }
261
262    /// Sets the per-subscription replay ring depth. `0` reverts to the
263    /// default (1024).
264    pub fn listen_replay_max(mut self, value: usize) -> Self {
265        self.listen_replay_max = nonzero_or_default(value, DEFAULT_LISTEN_REPLAY_MAX);
266        self
267    }
268
269    /// Sets the read cache's maximum tracked entries. `0` reverts to the
270    /// default (5000).
271    pub fn read_cache_max_entries(mut self, value: usize) -> Self {
272        self.read_cache_max_entries = nonzero_or_default(value, DEFAULT_READ_CACHE_MAX_ENTRIES);
273        self
274    }
275
276    /// Acquires the data-root writer lock, verifies every audit chain, and
277    /// returns a ready-to-serve [`Engine`].
278    ///
279    /// # Errors
280    /// - [`EngineBuildError::HmacKeyMissing`] if [`EngineBuilder::key`] was
281    ///   never called.
282    /// - [`EngineBuildError::DataRootIo`] for filesystem errors creating
283    ///   `data_root`.
284    /// - [`EngineBuildError::DataRootLockHeld`] if another process holds the
285    ///   writer lock.
286    /// - [`EngineBuildError::AuditChainCorrupted`] if any existing world's
287    ///   HMAC chain fails verification.
288    /// - [`EngineBuildError::Storage`] for other storage-layer failures.
289    pub fn build(self) -> Result<Engine, EngineBuildError> {
290        std::fs::create_dir_all(&self.data_root).map_err(EngineBuildError::DataRootIo)?;
291        let data_lock = crate::acquire_data_root_writer_lock(&self.data_root)
292            .map_err(|err| map_writer_lock_error(&self.data_root, err))?;
293        let hmac_key = self.key.ok_or(EngineBuildError::HmacKeyMissing)?.into_vec();
294
295        verify_all_worlds_with_names(&self.data_root, &hmac_key)?;
296        let durable_sizes = world::sizes(&self.data_root).map_err(|err| {
297            if storage_class::is_transient_storage_error(&err) {
298                EngineBuildError::DataRootLockHeld {
299                    path: self.data_root.clone(),
300                }
301            } else {
302                EngineBuildError::Storage {
303                    sqlite_code: sqlite_code(&err),
304                    detail: err.to_string(),
305                }
306            }
307        })?;
308        let storage_body_bytes = durable_sizes.iter().map(|(_, size)| *size).sum();
309        let durable_world_count = durable_sizes.len();
310        let delete_ledger_created = durable_sizes
311            .iter()
312            .any(|(world_name, _)| world_name == "var/log/deletes");
313
314        let (events, _) = broadcast::channel(self.listen_replay_max);
315        let (shutdown_tx, shutdown_rx) = watch::channel(false);
316        let core = Arc::new(Core {
317            data: self.data_root,
318            tokens: self.tokens,
319            hmac_key,
320            mem: Arc::new(store::MemoryStore::new()),
321            max_world_bytes: self.max_world_bytes,
322            max_memory_bytes: self.max_memory_bytes,
323            max_storage_bytes: self.max_storage_bytes,
324            storage_body_bytes: Arc::new(AtomicUsize::new(storage_body_bytes)),
325            durable_world_count: Arc::new(AtomicUsize::new(durable_world_count)),
326            delete_ledger_created: Arc::new(AtomicBool::new(delete_ledger_created)),
327            events,
328            listen_slots: Arc::new(Semaphore::new(self.max_listen_connections)),
329            listen_replay_max: self.listen_replay_max,
330            event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
331                self.listen_replay_max,
332            ))),
333            shutdown: shutdown_rx,
334            next_event: new_event_counter(),
335            world_locks: Arc::new(DashMap::new()),
336            ledger: Arc::new(crate::ledger::LedgerWriter::new()),
337            read_cache: Arc::new(ReadCache::new(self.read_cache_max_entries)),
338        });
339
340        Ok(Engine {
341            inner: Arc::new(EngineInner {
342                core,
343                shutdown_tx,
344                _data_lock: StdMutex::new(data_lock),
345            }),
346        })
347    }
348}
349
350impl Engine {
351    /// Returns a fresh [`EngineBuilder`] populated with crate defaults.
352    pub fn builder() -> EngineBuilder {
353        EngineBuilder::default()
354    }
355
356    pub(crate) fn core(&self) -> &Core {
357        self.inner.core.as_ref()
358    }
359
360    /// Subscribes to the engine shutdown signal.
361    ///
362    /// Returned receiver yields `true` exactly once when [`Engine::shutdown`]
363    /// is called. Intended for adapter graceful-shutdown loops; not part of
364    /// the documented stable surface.
365    #[cfg(feature = "unstable-engine")]
366    #[doc(hidden)]
367    pub fn shutdown_receiver(&self) -> ShutdownToken {
368        ShutdownToken {
369            rx: self.inner.shutdown_tx.subscribe(),
370        }
371    }
372
373    /// Maps raw token bytes to an [`AccessTier`].
374    ///
375    /// Constant-time comparison against configured tokens. Returns
376    /// [`AccessTier::Anon`] for empty, unrecognized, or invalid token bytes;
377    /// returns the highest matching tier otherwise.
378    pub fn verify_token(&self, token: &[u8]) -> AccessTier {
379        self.inner.core.tokens.check_token_bytes(token).into()
380    }
381
382    /// Returns whether `tier` satisfies the engine's configured read gate.
383    ///
384    /// Adapters use this for non-world read-only surfaces that still need to
385    /// mirror `/proc/*` read policy, such as protocol-local metrics.
386    pub fn allows_read(&self, tier: AccessTier) -> bool {
387        crate::can_read(self.core(), tier.into())
388    }
389
390    /// Starts orderly shutdown.
391    ///
392    /// Sets the engine-owned shutdown signal so subscribers
393    /// ([`crate::EngineSubscription`] recv loops, adapter graceful-shutdown
394    /// futures) can drain in-flight work. Repeated calls are no-ops; only
395    /// the first call flips the signal.
396    pub fn shutdown(&self) {
397        self.inner.shutdown_tx.send_if_modified(|shutdown| {
398            if *shutdown {
399                false
400            } else {
401                *shutdown = true;
402                true
403            }
404        });
405    }
406}
407
408#[cfg(feature = "unstable-engine")]
409impl ShutdownToken {
410    /// Returns whether shutdown has already been requested.
411    pub fn is_shutdown(&self) -> bool {
412        *self.rx.borrow()
413    }
414
415    /// Waits until shutdown is requested or the Engine owner is dropped.
416    pub async fn wait(&mut self) {
417        if self.is_shutdown() {
418            return;
419        }
420        let _ = self.rx.changed().await;
421    }
422}
423
424impl EngineError {
425    /// Returns the underlying SQLite extended result code, when this error
426    /// originated in the storage backend.
427    ///
428    /// Non-storage variants (auth, not-found, append-only, precondition,
429    /// quota, subscription-limit, shutting-down, internal-invariant) always
430    /// return `None`. Adapters can use this for protocol-specific code
431    /// mapping without inspecting the variant.
432    pub fn sqlite_code(&self) -> Option<i32> {
433        match self {
434            Self::TransientStorage { sqlite_code }
435            | Self::InsufficientStorage { sqlite_code }
436            | Self::Storage { sqlite_code } => *sqlite_code,
437            Self::Auth(_)
438            | Self::InvalidWorldName
439            | Self::NotFound
440            | Self::AppendOnly
441            | Self::PayloadTooLarge { .. }
442            | Self::PreconditionFailed { .. }
443            | Self::QuotaExceeded { .. }
444            | Self::SubscriptionLimit
445            | Self::ShuttingDown
446            | Self::InternalInvariant(_) => None,
447        }
448    }
449}
450
451impl fmt::Debug for Engine {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        f.debug_struct("Engine").finish_non_exhaustive()
454    }
455}
456
457fn nonzero_or_default(value: usize, default: usize) -> usize {
458    match value {
459        0 => default,
460        value => value,
461    }
462}
463
464pub(crate) fn sqlite_code(err: &rusqlite::Error) -> Option<i32> {
465    err.sqlite_error_code().map(|code| code as i32)
466}
467
468fn map_writer_lock_error(data_root: &std::path::Path, err: rusqlite::Error) -> EngineBuildError {
469    if storage_class::is_transient_storage_error(&err) {
470        return EngineBuildError::DataRootLockHeld {
471            path: data_root.to_path_buf(),
472        };
473    }
474    EngineBuildError::Storage {
475        sqlite_code: sqlite_code(&err),
476        detail: format!("writer lock open failed: {err}"),
477    }
478}
479
480fn verify_all_worlds_with_names(
481    data_root: &std::path::Path,
482    key: &[u8],
483) -> Result<(), EngineBuildError> {
484    let worlds = world::list(data_root).map_err(|err| EngineBuildError::Storage {
485        sqlite_code: sqlite_code(&err),
486        detail: format!("list worlds for audit verification failed: {err}"),
487    })?;
488    for world_name in worlds {
489        audit::verify_world(data_root, &world_name, key).map_err(|err| {
490            if storage_class::is_transient_storage_error(&err) {
491                EngineBuildError::DataRootLockHeld {
492                    path: data_root.to_path_buf(),
493                }
494            } else if storage_class::is_insufficient_storage_error(&err) {
495                EngineBuildError::Storage {
496                    sqlite_code: sqlite_code(&err),
497                    detail: format!(
498                        "audit verification for {world_name} failed with sqlite code {:?}: {err}",
499                        sqlite_code(&err)
500                    ),
501                }
502            } else if audit::is_audit_chain_broken_error(&err) {
503                EngineBuildError::AuditChainCorrupted {
504                    world: world_name.clone(),
505                    detail: err.to_string(),
506                }
507            } else {
508                EngineBuildError::Storage {
509                    sqlite_code: sqlite_code(&err),
510                    detail: format!("audit verification for {world_name} failed: {err}"),
511                }
512            }
513        })?;
514    }
515    Ok(())
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use std::time::{SystemTime, UNIX_EPOCH};
522
523    fn temp_root(name: &str) -> PathBuf {
524        let nonce = SystemTime::now()
525            .duration_since(UNIX_EPOCH)
526            .unwrap()
527            .as_nanos();
528        let root = std::env::temp_dir().join(format!(
529            "elastik-engine-{name}-{}-{nonce}",
530            std::process::id()
531        ));
532        let _ = std::fs::remove_dir_all(&root);
533        root
534    }
535
536    #[test]
537    fn secret_bytes_rejects_empty_keys() {
538        assert!(SecretBytes::new(Vec::new()).is_err());
539        assert!(SecretBytes::try_from_slice(b"").is_err());
540        assert!(SecretBytes::try_from_slice(b" \t\r\n").is_err());
541        assert!(SecretBytes::try_from_slice("\u{2003}\n".as_bytes()).is_err());
542        assert!(SecretBytes::try_from_slice(b"key").is_ok());
543        assert!(SecretBytes::try_from_slice(&[0xff, b'k', b'e', b'y']).is_ok());
544    }
545
546    #[test]
547    fn verify_token_maps_raw_bytes_to_access_tier() {
548        let root = temp_root("verify-token");
549        let engine = Engine::builder()
550            .data_root(root.clone())
551            .key(SecretBytes::try_from_slice(b"key").unwrap())
552            .read_token(b"reader".to_vec())
553            .write_token(b"writer".to_vec())
554            .approve_token(b"approve".to_vec())
555            .build()
556            .unwrap();
557
558        assert_eq!(engine.verify_token(b""), AccessTier::Anon);
559        assert_eq!(engine.verify_token(b"missing"), AccessTier::Anon);
560        assert_eq!(engine.verify_token(b"reader"), AccessTier::Read);
561        assert_eq!(engine.verify_token(b"writer"), AccessTier::Write);
562        assert_eq!(engine.verify_token(b"approve"), AccessTier::Approve);
563
564        drop(engine);
565        let _ = std::fs::remove_dir_all(root);
566    }
567
568    #[test]
569    fn token_setters_treat_empty_and_whitespace_as_unset() {
570        let root = temp_root("token-whitespace");
571        let engine = Engine::builder()
572            .data_root(root.clone())
573            .key(SecretBytes::try_from_slice(b"key").unwrap())
574            .read_token(b" \t\n".to_vec())
575            .write_token(Vec::new())
576            .approve_token("\u{2003}\n".as_bytes().to_vec())
577            .build()
578            .unwrap();
579
580        assert_eq!(engine.verify_token(b" \t\n"), AccessTier::Anon);
581        assert_eq!(
582            engine.verify_token("\u{2003}\n".as_bytes()),
583            AccessTier::Anon
584        );
585
586        drop(engine);
587        let _ = std::fs::remove_dir_all(root);
588    }
589
590    #[test]
591    fn zero_numeric_limits_match_env_default_semantics() {
592        let root = temp_root("zero-limits");
593        let engine = Engine::builder()
594            .data_root(root.clone())
595            .key(SecretBytes::try_from_slice(b"key").unwrap())
596            .max_storage_bytes(Some(0))
597            .max_listen_connections(0)
598            .listen_replay_max(0)
599            .read_cache_max_entries(0)
600            .build()
601            .unwrap();
602
603        assert_eq!(engine.inner.core.max_storage_bytes, None);
604        assert_eq!(
605            engine.inner.core.listen_slots.available_permits(),
606            DEFAULT_MAX_LISTEN_CONNECTIONS
607        );
608        assert_eq!(
609            engine.inner.core.listen_replay_max,
610            DEFAULT_LISTEN_REPLAY_MAX
611        );
612        assert_eq!(
613            engine.inner.core.read_cache.max_entries,
614            DEFAULT_READ_CACHE_MAX_ENTRIES
615        );
616
617        drop(engine);
618        let _ = std::fs::remove_dir_all(root);
619    }
620
621    #[test]
622    fn shutdown_is_idempotent_without_extra_notifications() {
623        let root = temp_root("shutdown-idempotent");
624        let engine = Engine::builder()
625            .data_root(root.clone())
626            .key(SecretBytes::try_from_slice(b"key").unwrap())
627            .build()
628            .unwrap();
629        let mut shutdown = engine.inner.core.shutdown.clone();
630
631        assert!(!*shutdown.borrow());
632        engine.shutdown();
633        assert!(shutdown.has_changed().unwrap());
634        assert!(*shutdown.borrow_and_update());
635
636        engine.shutdown();
637        assert!(!shutdown.has_changed().unwrap());
638
639        drop(engine);
640        let _ = std::fs::remove_dir_all(root);
641    }
642
643    #[test]
644    fn build_holds_the_data_root_writer_lock() {
645        let root = temp_root("writer-lock");
646        let engine = Engine::builder()
647            .data_root(root.clone())
648            .key(SecretBytes::try_from_slice(b"key").unwrap())
649            .build()
650            .unwrap();
651
652        let second = Engine::builder()
653            .data_root(root.clone())
654            .key(SecretBytes::try_from_slice(b"key").unwrap())
655            .build();
656
657        assert!(matches!(
658            second,
659            Err(EngineBuildError::DataRootLockHeld { .. })
660        ));
661
662        drop(engine);
663        let _ = std::fs::remove_dir_all(root);
664    }
665
666    #[test]
667    fn build_verifies_audit_chains_before_returning_engine() {
668        let root = temp_root("audit-verify");
669        let key = b"key".to_vec();
670        let write_result = world::write_with_audit_checked(
671            &root,
672            "home/audit",
673            b"body",
674            "text/plain",
675            &[],
676            &key,
677            None,
678        );
679        assert!(write_result.is_ok(), "fixture write should succeed");
680        let db = world::world_db(&root, "home/audit");
681        let conn = rusqlite::Connection::open(db).unwrap();
682        conn.execute(
683            "UPDATE events SET body_sha256='tampered' WHERE id=(SELECT max(id) FROM events)",
684            [],
685        )
686        .unwrap();
687        drop(conn);
688
689        let result = Engine::builder()
690            .data_root(root.clone())
691            .key(SecretBytes::new(key).unwrap())
692            .build();
693
694        assert!(matches!(
695            result,
696            Err(EngineBuildError::AuditChainCorrupted { .. })
697        ));
698
699        let _ = std::fs::remove_dir_all(root);
700    }
701
702    #[test]
703    fn build_classifies_non_chain_verify_failures_as_storage_errors() {
704        let root = temp_root("audit-storage-error");
705        let key = b"key".to_vec();
706        let write_result = world::write_with_audit_checked(
707            &root,
708            "home/schema",
709            b"body",
710            "text/plain",
711            &[],
712            &key,
713            None,
714        );
715        assert!(write_result.is_ok(), "fixture write should succeed");
716        let db = world::world_db(&root, "home/schema");
717        let conn = rusqlite::Connection::open(db).unwrap();
718        conn.execute("DROP TABLE events", []).unwrap();
719        drop(conn);
720
721        let result = Engine::builder()
722            .data_root(root.clone())
723            .key(SecretBytes::new(key).unwrap())
724            .build();
725
726        match result {
727            Err(EngineBuildError::Storage {
728                sqlite_code: Some(_),
729                detail,
730            }) => assert!(detail.contains("audit verification for home/schema failed")),
731            other => panic!("expected storage error with sqlite code, got {other:?}"),
732        }
733
734        let _ = std::fs::remove_dir_all(root);
735    }
736}