1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
9
10use std::{
11 collections::VecDeque,
12 fmt,
13 path::PathBuf,
14 sync::{
15 atomic::{AtomicBool, AtomicUsize},
16 Arc, Mutex as StdMutex,
17 },
18};
19
20use dashmap::DashMap;
21use tokio::sync::{broadcast, watch, Semaphore};
22
23use crate::{
24 audit,
25 auth::{self, AuthGate, NonEmptyBytes},
26 defaults::{
27 DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES,
28 DEFAULT_MAX_WORLD_BYTES, DEFAULT_READ_CACHE_MAX_ENTRIES,
29 },
30 engine_types::{AccessTier, SecretBytes},
31 read_cache::ReadCache,
32 state::{new_event_counter, Core},
33 storage_class, store, world,
34};
35
36#[derive(Clone)]
41pub struct Engine {
42 inner: Arc<EngineInner>,
43}
44
45struct EngineInner {
46 core: Arc<Core>,
47 shutdown_tx: watch::Sender<bool>,
48 _data_lock: StdMutex<rusqlite::Connection>,
49}
50
51#[cfg(feature = "unstable-engine")]
54#[doc(hidden)]
55pub struct ShutdownToken {
56 rx: watch::Receiver<bool>,
57}
58
59pub struct EngineBuilder {
64 data_root: PathBuf,
65 key: Option<SecretBytes>,
66 tokens: auth::Tokens,
67 max_world_bytes: usize,
68 max_memory_bytes: usize,
69 max_storage_bytes: Option<usize>,
70 max_listen_connections: usize,
71 listen_replay_max: usize,
72 read_cache_max_entries: usize,
73}
74
75#[derive(Debug)]
81#[non_exhaustive]
82pub enum EngineBuildError {
83 DataRootIo(std::io::Error),
85 DataRootLockHeld {
87 path: PathBuf,
89 },
90 HmacKeyMissing,
92 AuditChainCorrupted {
94 world: String,
96 detail: String,
98 },
99 Storage {
101 sqlite_code: Option<i32>,
103 detail: String,
105 },
106}
107
108#[derive(Debug)]
116#[non_exhaustive]
117pub enum EngineError {
118 Auth(AuthGate),
120 InvalidWorldName,
122 NotFound,
124 AppendOnly,
127 PayloadTooLarge {
129 max: usize,
131 },
132 PreconditionFailed {
134 message: &'static str,
136 },
137 QuotaExceeded {
139 used: usize,
141 quota: usize,
143 projected: usize,
145 },
146 TransientStorage {
149 sqlite_code: Option<i32>,
151 },
152 InsufficientStorage {
155 sqlite_code: Option<i32>,
157 },
158 Storage {
160 sqlite_code: Option<i32>,
162 },
163 SubscriptionLimit,
165 ShuttingDown,
167 InternalInvariant(&'static str),
169}
170
171impl Default for EngineBuilder {
172 fn default() -> Self {
173 Self {
174 data_root: PathBuf::from("./data"),
175 key: None,
176 tokens: auth::Tokens {
177 read: None,
178 write: None,
179 approve: None,
180 },
181 max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
182 max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
183 max_storage_bytes: None,
184 max_listen_connections: DEFAULT_MAX_LISTEN_CONNECTIONS,
185 listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
186 read_cache_max_entries: DEFAULT_READ_CACHE_MAX_ENTRIES,
187 }
188 }
189}
190
191impl EngineBuilder {
192 pub fn data_root(mut self, path: impl Into<PathBuf>) -> Self {
196 self.data_root = path.into();
197 self
198 }
199
200 pub fn key(mut self, key: SecretBytes) -> Self {
202 self.key = Some(key);
203 self
204 }
205
206 pub fn read_token(mut self, token: impl Into<Vec<u8>>) -> Self {
211 self.tokens.read = NonEmptyBytes::new(token);
212 self
213 }
214
215 pub fn write_token(mut self, token: impl Into<Vec<u8>>) -> Self {
220 self.tokens.write = NonEmptyBytes::new(token);
221 self
222 }
223
224 pub fn approve_token(mut self, token: impl Into<Vec<u8>>) -> Self {
230 self.tokens.approve = NonEmptyBytes::new(token);
231 self
232 }
233
234 pub fn max_world_bytes(mut self, value: usize) -> Self {
236 self.max_world_bytes = value;
237 self
238 }
239
240 pub fn max_memory_bytes(mut self, value: usize) -> Self {
242 self.max_memory_bytes = value;
243 self
244 }
245
246 pub fn max_storage_bytes(mut self, value: Option<usize>) -> Self {
251 self.max_storage_bytes = value.filter(|value| *value > 0);
252 self
253 }
254
255 pub fn max_listen_connections(mut self, value: usize) -> Self {
258 self.max_listen_connections = nonzero_or_default(value, DEFAULT_MAX_LISTEN_CONNECTIONS);
259 self
260 }
261
262 pub fn listen_replay_max(mut self, value: usize) -> Self {
265 self.listen_replay_max = nonzero_or_default(value, DEFAULT_LISTEN_REPLAY_MAX);
266 self
267 }
268
269 pub fn read_cache_max_entries(mut self, value: usize) -> Self {
272 self.read_cache_max_entries = nonzero_or_default(value, DEFAULT_READ_CACHE_MAX_ENTRIES);
273 self
274 }
275
276 pub fn build(self) -> Result<Engine, EngineBuildError> {
290 std::fs::create_dir_all(&self.data_root).map_err(EngineBuildError::DataRootIo)?;
291 let data_lock = crate::acquire_data_root_writer_lock(&self.data_root)
292 .map_err(|err| map_writer_lock_error(&self.data_root, err))?;
293 let hmac_key = self.key.ok_or(EngineBuildError::HmacKeyMissing)?.into_vec();
294
295 verify_all_worlds_with_names(&self.data_root, &hmac_key)?;
296 let durable_sizes = world::sizes(&self.data_root).map_err(|err| {
297 if storage_class::is_transient_storage_error(&err) {
298 EngineBuildError::DataRootLockHeld {
299 path: self.data_root.clone(),
300 }
301 } else {
302 EngineBuildError::Storage {
303 sqlite_code: sqlite_code(&err),
304 detail: err.to_string(),
305 }
306 }
307 })?;
308 let storage_body_bytes = durable_sizes.iter().map(|(_, size)| *size).sum();
309 let durable_world_count = durable_sizes.len();
310 let delete_ledger_created = durable_sizes
311 .iter()
312 .any(|(world_name, _)| world_name == "var/log/deletes");
313
314 let (events, _) = broadcast::channel(self.listen_replay_max);
315 let (shutdown_tx, shutdown_rx) = watch::channel(false);
316 let core = Arc::new(Core {
317 data: self.data_root,
318 tokens: self.tokens,
319 hmac_key,
320 mem: Arc::new(store::MemoryStore::new()),
321 max_world_bytes: self.max_world_bytes,
322 max_memory_bytes: self.max_memory_bytes,
323 max_storage_bytes: self.max_storage_bytes,
324 storage_body_bytes: Arc::new(AtomicUsize::new(storage_body_bytes)),
325 durable_world_count: Arc::new(AtomicUsize::new(durable_world_count)),
326 delete_ledger_created: Arc::new(AtomicBool::new(delete_ledger_created)),
327 events,
328 listen_slots: Arc::new(Semaphore::new(self.max_listen_connections)),
329 listen_replay_max: self.listen_replay_max,
330 event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
331 self.listen_replay_max,
332 ))),
333 shutdown: shutdown_rx,
334 next_event: new_event_counter(),
335 world_locks: Arc::new(DashMap::new()),
336 ledger: Arc::new(crate::ledger::LedgerWriter::new()),
337 read_cache: Arc::new(ReadCache::new(self.read_cache_max_entries)),
338 });
339
340 Ok(Engine {
341 inner: Arc::new(EngineInner {
342 core,
343 shutdown_tx,
344 _data_lock: StdMutex::new(data_lock),
345 }),
346 })
347 }
348}
349
350impl Engine {
351 pub fn builder() -> EngineBuilder {
353 EngineBuilder::default()
354 }
355
356 pub(crate) fn core(&self) -> &Core {
357 self.inner.core.as_ref()
358 }
359
360 #[cfg(feature = "unstable-engine")]
366 #[doc(hidden)]
367 pub fn shutdown_receiver(&self) -> ShutdownToken {
368 ShutdownToken {
369 rx: self.inner.shutdown_tx.subscribe(),
370 }
371 }
372
373 pub fn verify_token(&self, token: &[u8]) -> AccessTier {
379 self.inner.core.tokens.check_token_bytes(token).into()
380 }
381
382 pub fn allows_read(&self, tier: AccessTier) -> bool {
387 crate::can_read(self.core(), tier.into())
388 }
389
390 pub fn shutdown(&self) {
397 self.inner.shutdown_tx.send_if_modified(|shutdown| {
398 if *shutdown {
399 false
400 } else {
401 *shutdown = true;
402 true
403 }
404 });
405 }
406}
407
408#[cfg(feature = "unstable-engine")]
409impl ShutdownToken {
410 pub fn is_shutdown(&self) -> bool {
412 *self.rx.borrow()
413 }
414
415 pub async fn wait(&mut self) {
417 if self.is_shutdown() {
418 return;
419 }
420 let _ = self.rx.changed().await;
421 }
422}
423
424impl EngineError {
425 pub fn sqlite_code(&self) -> Option<i32> {
433 match self {
434 Self::TransientStorage { sqlite_code }
435 | Self::InsufficientStorage { sqlite_code }
436 | Self::Storage { sqlite_code } => *sqlite_code,
437 Self::Auth(_)
438 | Self::InvalidWorldName
439 | Self::NotFound
440 | Self::AppendOnly
441 | Self::PayloadTooLarge { .. }
442 | Self::PreconditionFailed { .. }
443 | Self::QuotaExceeded { .. }
444 | Self::SubscriptionLimit
445 | Self::ShuttingDown
446 | Self::InternalInvariant(_) => None,
447 }
448 }
449}
450
451impl fmt::Debug for Engine {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 f.debug_struct("Engine").finish_non_exhaustive()
454 }
455}
456
457fn nonzero_or_default(value: usize, default: usize) -> usize {
458 match value {
459 0 => default,
460 value => value,
461 }
462}
463
464pub(crate) fn sqlite_code(err: &rusqlite::Error) -> Option<i32> {
465 err.sqlite_error_code().map(|code| code as i32)
466}
467
468fn map_writer_lock_error(data_root: &std::path::Path, err: rusqlite::Error) -> EngineBuildError {
469 if storage_class::is_transient_storage_error(&err) {
470 return EngineBuildError::DataRootLockHeld {
471 path: data_root.to_path_buf(),
472 };
473 }
474 EngineBuildError::Storage {
475 sqlite_code: sqlite_code(&err),
476 detail: format!("writer lock open failed: {err}"),
477 }
478}
479
480fn verify_all_worlds_with_names(
481 data_root: &std::path::Path,
482 key: &[u8],
483) -> Result<(), EngineBuildError> {
484 let worlds = world::list(data_root).map_err(|err| EngineBuildError::Storage {
485 sqlite_code: sqlite_code(&err),
486 detail: format!("list worlds for audit verification failed: {err}"),
487 })?;
488 for world_name in worlds {
489 audit::verify_world(data_root, &world_name, key).map_err(|err| {
490 if storage_class::is_transient_storage_error(&err) {
491 EngineBuildError::DataRootLockHeld {
492 path: data_root.to_path_buf(),
493 }
494 } else if storage_class::is_insufficient_storage_error(&err) {
495 EngineBuildError::Storage {
496 sqlite_code: sqlite_code(&err),
497 detail: format!(
498 "audit verification for {world_name} failed with sqlite code {:?}: {err}",
499 sqlite_code(&err)
500 ),
501 }
502 } else if audit::is_audit_chain_broken_error(&err) {
503 EngineBuildError::AuditChainCorrupted {
504 world: world_name.clone(),
505 detail: err.to_string(),
506 }
507 } else {
508 EngineBuildError::Storage {
509 sqlite_code: sqlite_code(&err),
510 detail: format!("audit verification for {world_name} failed: {err}"),
511 }
512 }
513 })?;
514 }
515 Ok(())
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use std::time::{SystemTime, UNIX_EPOCH};
522
523 fn temp_root(name: &str) -> PathBuf {
524 let nonce = SystemTime::now()
525 .duration_since(UNIX_EPOCH)
526 .unwrap()
527 .as_nanos();
528 let root = std::env::temp_dir().join(format!(
529 "elastik-engine-{name}-{}-{nonce}",
530 std::process::id()
531 ));
532 let _ = std::fs::remove_dir_all(&root);
533 root
534 }
535
536 #[test]
537 fn secret_bytes_rejects_empty_keys() {
538 assert!(SecretBytes::new(Vec::new()).is_err());
539 assert!(SecretBytes::try_from_slice(b"").is_err());
540 assert!(SecretBytes::try_from_slice(b" \t\r\n").is_err());
541 assert!(SecretBytes::try_from_slice("\u{2003}\n".as_bytes()).is_err());
542 assert!(SecretBytes::try_from_slice(b"key").is_ok());
543 assert!(SecretBytes::try_from_slice(&[0xff, b'k', b'e', b'y']).is_ok());
544 }
545
546 #[test]
547 fn verify_token_maps_raw_bytes_to_access_tier() {
548 let root = temp_root("verify-token");
549 let engine = Engine::builder()
550 .data_root(root.clone())
551 .key(SecretBytes::try_from_slice(b"key").unwrap())
552 .read_token(b"reader".to_vec())
553 .write_token(b"writer".to_vec())
554 .approve_token(b"approve".to_vec())
555 .build()
556 .unwrap();
557
558 assert_eq!(engine.verify_token(b""), AccessTier::Anon);
559 assert_eq!(engine.verify_token(b"missing"), AccessTier::Anon);
560 assert_eq!(engine.verify_token(b"reader"), AccessTier::Read);
561 assert_eq!(engine.verify_token(b"writer"), AccessTier::Write);
562 assert_eq!(engine.verify_token(b"approve"), AccessTier::Approve);
563
564 drop(engine);
565 let _ = std::fs::remove_dir_all(root);
566 }
567
568 #[test]
569 fn token_setters_treat_empty_and_whitespace_as_unset() {
570 let root = temp_root("token-whitespace");
571 let engine = Engine::builder()
572 .data_root(root.clone())
573 .key(SecretBytes::try_from_slice(b"key").unwrap())
574 .read_token(b" \t\n".to_vec())
575 .write_token(Vec::new())
576 .approve_token("\u{2003}\n".as_bytes().to_vec())
577 .build()
578 .unwrap();
579
580 assert_eq!(engine.verify_token(b" \t\n"), AccessTier::Anon);
581 assert_eq!(
582 engine.verify_token("\u{2003}\n".as_bytes()),
583 AccessTier::Anon
584 );
585
586 drop(engine);
587 let _ = std::fs::remove_dir_all(root);
588 }
589
590 #[test]
591 fn zero_numeric_limits_match_env_default_semantics() {
592 let root = temp_root("zero-limits");
593 let engine = Engine::builder()
594 .data_root(root.clone())
595 .key(SecretBytes::try_from_slice(b"key").unwrap())
596 .max_storage_bytes(Some(0))
597 .max_listen_connections(0)
598 .listen_replay_max(0)
599 .read_cache_max_entries(0)
600 .build()
601 .unwrap();
602
603 assert_eq!(engine.inner.core.max_storage_bytes, None);
604 assert_eq!(
605 engine.inner.core.listen_slots.available_permits(),
606 DEFAULT_MAX_LISTEN_CONNECTIONS
607 );
608 assert_eq!(
609 engine.inner.core.listen_replay_max,
610 DEFAULT_LISTEN_REPLAY_MAX
611 );
612 assert_eq!(
613 engine.inner.core.read_cache.max_entries,
614 DEFAULT_READ_CACHE_MAX_ENTRIES
615 );
616
617 drop(engine);
618 let _ = std::fs::remove_dir_all(root);
619 }
620
621 #[test]
622 fn shutdown_is_idempotent_without_extra_notifications() {
623 let root = temp_root("shutdown-idempotent");
624 let engine = Engine::builder()
625 .data_root(root.clone())
626 .key(SecretBytes::try_from_slice(b"key").unwrap())
627 .build()
628 .unwrap();
629 let mut shutdown = engine.inner.core.shutdown.clone();
630
631 assert!(!*shutdown.borrow());
632 engine.shutdown();
633 assert!(shutdown.has_changed().unwrap());
634 assert!(*shutdown.borrow_and_update());
635
636 engine.shutdown();
637 assert!(!shutdown.has_changed().unwrap());
638
639 drop(engine);
640 let _ = std::fs::remove_dir_all(root);
641 }
642
643 #[test]
644 fn build_holds_the_data_root_writer_lock() {
645 let root = temp_root("writer-lock");
646 let engine = Engine::builder()
647 .data_root(root.clone())
648 .key(SecretBytes::try_from_slice(b"key").unwrap())
649 .build()
650 .unwrap();
651
652 let second = Engine::builder()
653 .data_root(root.clone())
654 .key(SecretBytes::try_from_slice(b"key").unwrap())
655 .build();
656
657 assert!(matches!(
658 second,
659 Err(EngineBuildError::DataRootLockHeld { .. })
660 ));
661
662 drop(engine);
663 let _ = std::fs::remove_dir_all(root);
664 }
665
666 #[test]
667 fn build_verifies_audit_chains_before_returning_engine() {
668 let root = temp_root("audit-verify");
669 let key = b"key".to_vec();
670 let write_result = world::write_with_audit_checked(
671 &root,
672 "home/audit",
673 b"body",
674 "text/plain",
675 &[],
676 &key,
677 None,
678 );
679 assert!(write_result.is_ok(), "fixture write should succeed");
680 let db = world::world_db(&root, "home/audit");
681 let conn = rusqlite::Connection::open(db).unwrap();
682 conn.execute(
683 "UPDATE events SET body_sha256='tampered' WHERE id=(SELECT max(id) FROM events)",
684 [],
685 )
686 .unwrap();
687 drop(conn);
688
689 let result = Engine::builder()
690 .data_root(root.clone())
691 .key(SecretBytes::new(key).unwrap())
692 .build();
693
694 assert!(matches!(
695 result,
696 Err(EngineBuildError::AuditChainCorrupted { .. })
697 ));
698
699 let _ = std::fs::remove_dir_all(root);
700 }
701
702 #[test]
703 fn build_classifies_non_chain_verify_failures_as_storage_errors() {
704 let root = temp_root("audit-storage-error");
705 let key = b"key".to_vec();
706 let write_result = world::write_with_audit_checked(
707 &root,
708 "home/schema",
709 b"body",
710 "text/plain",
711 &[],
712 &key,
713 None,
714 );
715 assert!(write_result.is_ok(), "fixture write should succeed");
716 let db = world::world_db(&root, "home/schema");
717 let conn = rusqlite::Connection::open(db).unwrap();
718 conn.execute("DROP TABLE events", []).unwrap();
719 drop(conn);
720
721 let result = Engine::builder()
722 .data_root(root.clone())
723 .key(SecretBytes::new(key).unwrap())
724 .build();
725
726 match result {
727 Err(EngineBuildError::Storage {
728 sqlite_code: Some(_),
729 detail,
730 }) => assert!(detail.contains("audit verification for home/schema failed")),
731 other => panic!("expected storage error with sqlite code, got {other:?}"),
732 }
733
734 let _ = std::fs::remove_dir_all(root);
735 }
736}