1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
9
10use std::{
11 collections::VecDeque,
12 fmt,
13 path::PathBuf,
14 sync::{
15 atomic::{AtomicBool, AtomicUsize},
16 Arc, Mutex as StdMutex,
17 },
18};
19
20use dashmap::DashMap;
21use tokio::sync::{broadcast, watch, Semaphore};
22
23use crate::{
24 audit,
25 auth::{self, AuthGate, NonEmptyBytes},
26 defaults::{
27 DEFAULT_LISTEN_REPLAY_MAX, DEFAULT_MAX_LISTEN_CONNECTIONS, DEFAULT_MAX_MEMORY_BYTES,
28 DEFAULT_MAX_WORLD_BYTES, DEFAULT_READ_CACHE_MAX_ENTRIES,
29 },
30 engine_types::{AccessTier, SecretBytes},
31 read_cache::ReadCache,
32 state::{new_event_counter, Core},
33 storage_class, store, world,
34};
35
36#[derive(Clone)]
41pub struct Engine {
42 inner: Arc<EngineInner>,
43}
44
45struct EngineInner {
46 core: Arc<Core>,
47 shutdown_tx: watch::Sender<bool>,
48 _data_lock: StdMutex<rusqlite::Connection>,
49}
50
51#[cfg(feature = "coap")]
54#[doc(hidden)]
55pub struct ShutdownToken {
56 rx: watch::Receiver<bool>,
57}
58
59pub struct EngineBuilder {
64 data_root: PathBuf,
65 key: Option<SecretBytes>,
66 tokens: auth::Tokens,
67 max_world_bytes: usize,
68 max_memory_bytes: usize,
69 max_storage_bytes: Option<usize>,
70 max_listen_connections: usize,
71 listen_replay_max: usize,
72 read_cache_max_entries: usize,
73}
74
75#[derive(Debug)]
81#[non_exhaustive]
82pub enum EngineBuildError {
83 DataRootIo(std::io::Error),
85 DataRootLockHeld {
87 path: PathBuf,
89 },
90 HmacKeyMissing,
92 AuditChainCorrupted {
94 world: String,
96 detail: String,
98 },
99 Storage {
101 sqlite_code: Option<i32>,
103 detail: String,
105 },
106}
107
108#[derive(Debug)]
116#[non_exhaustive]
117pub enum EngineError {
118 Auth(AuthGate),
120 InvalidWorldName,
122 NotFound,
124 AppendOnly,
127 PayloadTooLarge {
129 max: usize,
131 },
132 PreconditionFailed {
134 message: &'static str,
136 },
137 QuotaExceeded {
139 used: usize,
141 quota: usize,
143 projected: usize,
145 },
146 TransientStorage {
149 sqlite_code: Option<i32>,
151 },
152 InsufficientStorage {
155 sqlite_code: Option<i32>,
157 },
158 Storage {
160 sqlite_code: Option<i32>,
162 },
163 SubscriptionLimit,
165 ShuttingDown,
167 InternalInvariant(&'static str),
169}
170
171impl Default for EngineBuilder {
172 fn default() -> Self {
173 Self {
174 data_root: PathBuf::from("./data"),
175 key: None,
176 tokens: auth::Tokens {
177 read: None,
178 write: None,
179 approve: None,
180 },
181 max_world_bytes: DEFAULT_MAX_WORLD_BYTES,
182 max_memory_bytes: DEFAULT_MAX_MEMORY_BYTES,
183 max_storage_bytes: None,
184 max_listen_connections: DEFAULT_MAX_LISTEN_CONNECTIONS,
185 listen_replay_max: DEFAULT_LISTEN_REPLAY_MAX,
186 read_cache_max_entries: DEFAULT_READ_CACHE_MAX_ENTRIES,
187 }
188 }
189}
190
191impl EngineBuilder {
192 pub fn data_root(mut self, path: impl Into<PathBuf>) -> Self {
196 self.data_root = path.into();
197 self
198 }
199
200 pub fn key(mut self, key: SecretBytes) -> Self {
202 self.key = Some(key);
203 self
204 }
205
206 pub fn read_token(mut self, token: impl Into<Vec<u8>>) -> Self {
211 self.tokens.read = NonEmptyBytes::new(token);
212 self
213 }
214
215 pub fn write_token(mut self, token: impl Into<Vec<u8>>) -> Self {
220 self.tokens.write = NonEmptyBytes::new(token);
221 self
222 }
223
224 pub fn approve_token(mut self, token: impl Into<Vec<u8>>) -> Self {
230 self.tokens.approve = NonEmptyBytes::new(token);
231 self
232 }
233
234 pub fn max_world_bytes(mut self, value: usize) -> Self {
236 self.max_world_bytes = value;
237 self
238 }
239
240 pub fn max_memory_bytes(mut self, value: usize) -> Self {
242 self.max_memory_bytes = value;
243 self
244 }
245
246 pub fn max_storage_bytes(mut self, value: Option<usize>) -> Self {
251 self.max_storage_bytes = value.filter(|value| *value > 0);
252 self
253 }
254
255 pub fn max_listen_connections(mut self, value: usize) -> Self {
258 self.max_listen_connections = nonzero_or_default(value, DEFAULT_MAX_LISTEN_CONNECTIONS);
259 self
260 }
261
262 pub fn listen_replay_max(mut self, value: usize) -> Self {
265 self.listen_replay_max = nonzero_or_default(value, DEFAULT_LISTEN_REPLAY_MAX);
266 self
267 }
268
269 pub fn read_cache_max_entries(mut self, value: usize) -> Self {
272 self.read_cache_max_entries = nonzero_or_default(value, DEFAULT_READ_CACHE_MAX_ENTRIES);
273 self
274 }
275
276 pub fn build(self) -> Result<Engine, EngineBuildError> {
290 std::fs::create_dir_all(&self.data_root).map_err(EngineBuildError::DataRootIo)?;
291 let data_lock = crate::acquire_data_root_writer_lock(&self.data_root)
292 .map_err(|err| map_writer_lock_error(&self.data_root, err))?;
293 let hmac_key = self.key.ok_or(EngineBuildError::HmacKeyMissing)?.into_vec();
294
295 verify_all_worlds_with_names(&self.data_root, &hmac_key)?;
296 let durable_sizes = world::sizes(&self.data_root).map_err(|err| {
297 if storage_class::is_transient_storage_error(&err) {
298 EngineBuildError::DataRootLockHeld {
299 path: self.data_root.clone(),
300 }
301 } else {
302 EngineBuildError::Storage {
303 sqlite_code: sqlite_code(&err),
304 detail: err.to_string(),
305 }
306 }
307 })?;
308 let storage_body_bytes = durable_sizes.iter().map(|(_, size)| *size).sum();
309 let durable_world_count = durable_sizes.len();
310 let delete_ledger_created = durable_sizes
311 .iter()
312 .any(|(world_name, _)| world_name == "var/log/deletes");
313
314 let (events, _) = broadcast::channel(self.listen_replay_max);
315 let (shutdown_tx, shutdown_rx) = watch::channel(false);
316 let core = Arc::new(Core {
317 data: self.data_root,
318 tokens: self.tokens,
319 hmac_key,
320 mem: Arc::new(store::MemoryStore::new()),
321 max_world_bytes: self.max_world_bytes,
322 max_memory_bytes: self.max_memory_bytes,
323 max_storage_bytes: self.max_storage_bytes,
324 storage_body_bytes: Arc::new(AtomicUsize::new(storage_body_bytes)),
325 durable_world_count: Arc::new(AtomicUsize::new(durable_world_count)),
326 delete_ledger_created: Arc::new(AtomicBool::new(delete_ledger_created)),
327 events,
328 listen_slots: Arc::new(Semaphore::new(self.max_listen_connections)),
329 listen_replay_max: self.listen_replay_max,
330 event_log: Arc::new(StdMutex::new(VecDeque::with_capacity(
331 self.listen_replay_max,
332 ))),
333 shutdown: shutdown_rx,
334 next_event: new_event_counter(),
335 world_locks: Arc::new(DashMap::new()),
336 ledger: Arc::new(crate::ledger::LedgerWriter::new()),
337 read_cache: Arc::new(ReadCache::new(self.read_cache_max_entries)),
338 });
339
340 Ok(Engine {
341 inner: Arc::new(EngineInner {
342 core,
343 shutdown_tx,
344 _data_lock: StdMutex::new(data_lock),
345 }),
346 })
347 }
348}
349
350impl Engine {
351 pub fn builder() -> EngineBuilder {
353 EngineBuilder::default()
354 }
355
356 #[cfg(test)]
357 pub(crate) fn from_core_for_tests(core: Arc<Core>) -> Self {
361 let (shutdown_tx, _) = watch::channel(false);
362 let data_lock = rusqlite::Connection::open_in_memory()
363 .expect("test Engine data lock connection should open");
364 Self {
365 inner: Arc::new(EngineInner {
366 core,
367 shutdown_tx,
368 _data_lock: StdMutex::new(data_lock),
369 }),
370 }
371 }
372
373 pub(crate) fn core(&self) -> &Core {
374 self.inner.core.as_ref()
375 }
376
377 #[cfg(feature = "coap")]
383 #[doc(hidden)]
384 pub fn shutdown_receiver(&self) -> ShutdownToken {
385 ShutdownToken {
386 rx: self.inner.shutdown_tx.subscribe(),
387 }
388 }
389
390 pub fn verify_token(&self, token: &[u8]) -> AccessTier {
396 self.inner.core.tokens.check_token_bytes(token).into()
397 }
398
399 pub fn shutdown(&self) {
406 self.inner.shutdown_tx.send_if_modified(|shutdown| {
407 if *shutdown {
408 false
409 } else {
410 *shutdown = true;
411 true
412 }
413 });
414 }
415}
416
417#[cfg(feature = "coap")]
418impl ShutdownToken {
419 pub fn is_shutdown(&self) -> bool {
421 *self.rx.borrow()
422 }
423
424 pub async fn wait(&mut self) {
426 if self.is_shutdown() {
427 return;
428 }
429 let _ = self.rx.changed().await;
430 }
431}
432
433impl EngineError {
434 pub fn sqlite_code(&self) -> Option<i32> {
442 match self {
443 Self::TransientStorage { sqlite_code }
444 | Self::InsufficientStorage { sqlite_code }
445 | Self::Storage { sqlite_code } => *sqlite_code,
446 Self::Auth(_)
447 | Self::InvalidWorldName
448 | Self::NotFound
449 | Self::AppendOnly
450 | Self::PayloadTooLarge { .. }
451 | Self::PreconditionFailed { .. }
452 | Self::QuotaExceeded { .. }
453 | Self::SubscriptionLimit
454 | Self::ShuttingDown
455 | Self::InternalInvariant(_) => None,
456 }
457 }
458}
459
460impl fmt::Debug for Engine {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 f.debug_struct("Engine").finish_non_exhaustive()
463 }
464}
465
466fn nonzero_or_default(value: usize, default: usize) -> usize {
467 match value {
468 0 => default,
469 value => value,
470 }
471}
472
473pub(crate) fn sqlite_code(err: &rusqlite::Error) -> Option<i32> {
474 err.sqlite_error_code().map(|code| code as i32)
475}
476
477fn map_writer_lock_error(data_root: &std::path::Path, err: rusqlite::Error) -> EngineBuildError {
478 if storage_class::is_transient_storage_error(&err) {
479 return EngineBuildError::DataRootLockHeld {
480 path: data_root.to_path_buf(),
481 };
482 }
483 EngineBuildError::Storage {
484 sqlite_code: sqlite_code(&err),
485 detail: format!("writer lock open failed: {err}"),
486 }
487}
488
489fn verify_all_worlds_with_names(
490 data_root: &std::path::Path,
491 key: &[u8],
492) -> Result<(), EngineBuildError> {
493 let worlds = world::list(data_root).map_err(|err| EngineBuildError::Storage {
494 sqlite_code: sqlite_code(&err),
495 detail: format!("list worlds for audit verification failed: {err}"),
496 })?;
497 for world_name in worlds {
498 audit::verify_world(data_root, &world_name, key).map_err(|err| {
499 if storage_class::is_transient_storage_error(&err) {
500 EngineBuildError::DataRootLockHeld {
501 path: data_root.to_path_buf(),
502 }
503 } else if storage_class::is_insufficient_storage_error(&err) {
504 EngineBuildError::Storage {
505 sqlite_code: sqlite_code(&err),
506 detail: format!(
507 "audit verification for {world_name} failed with sqlite code {:?}: {err}",
508 sqlite_code(&err)
509 ),
510 }
511 } else if audit::is_audit_chain_broken_error(&err) {
512 EngineBuildError::AuditChainCorrupted {
513 world: world_name.clone(),
514 detail: err.to_string(),
515 }
516 } else {
517 EngineBuildError::Storage {
518 sqlite_code: sqlite_code(&err),
519 detail: format!("audit verification for {world_name} failed: {err}"),
520 }
521 }
522 })?;
523 }
524 Ok(())
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use std::time::{SystemTime, UNIX_EPOCH};
531
532 fn temp_root(name: &str) -> PathBuf {
533 let nonce = SystemTime::now()
534 .duration_since(UNIX_EPOCH)
535 .unwrap()
536 .as_nanos();
537 let root = std::env::temp_dir().join(format!(
538 "elastik-engine-{name}-{}-{nonce}",
539 std::process::id()
540 ));
541 let _ = std::fs::remove_dir_all(&root);
542 root
543 }
544
545 #[test]
546 fn secret_bytes_rejects_empty_keys() {
547 assert!(SecretBytes::new(Vec::new()).is_err());
548 assert!(SecretBytes::try_from_slice(b"").is_err());
549 assert!(SecretBytes::try_from_slice(b" \t\r\n").is_err());
550 assert!(SecretBytes::try_from_slice("\u{2003}\n".as_bytes()).is_err());
551 assert!(SecretBytes::try_from_slice(b"key").is_ok());
552 assert!(SecretBytes::try_from_slice(&[0xff, b'k', b'e', b'y']).is_ok());
553 }
554
555 #[test]
556 fn verify_token_maps_raw_bytes_to_access_tier() {
557 let root = temp_root("verify-token");
558 let engine = Engine::builder()
559 .data_root(root.clone())
560 .key(SecretBytes::try_from_slice(b"key").unwrap())
561 .read_token(b"reader".to_vec())
562 .write_token(b"writer".to_vec())
563 .approve_token(b"approve".to_vec())
564 .build()
565 .unwrap();
566
567 assert_eq!(engine.verify_token(b""), AccessTier::Anon);
568 assert_eq!(engine.verify_token(b"missing"), AccessTier::Anon);
569 assert_eq!(engine.verify_token(b"reader"), AccessTier::Read);
570 assert_eq!(engine.verify_token(b"writer"), AccessTier::Write);
571 assert_eq!(engine.verify_token(b"approve"), AccessTier::Approve);
572
573 drop(engine);
574 let _ = std::fs::remove_dir_all(root);
575 }
576
577 #[test]
578 fn token_setters_treat_empty_and_whitespace_as_unset() {
579 let root = temp_root("token-whitespace");
580 let engine = Engine::builder()
581 .data_root(root.clone())
582 .key(SecretBytes::try_from_slice(b"key").unwrap())
583 .read_token(b" \t\n".to_vec())
584 .write_token(Vec::new())
585 .approve_token("\u{2003}\n".as_bytes().to_vec())
586 .build()
587 .unwrap();
588
589 assert_eq!(engine.verify_token(b" \t\n"), AccessTier::Anon);
590 assert_eq!(
591 engine.verify_token("\u{2003}\n".as_bytes()),
592 AccessTier::Anon
593 );
594
595 drop(engine);
596 let _ = std::fs::remove_dir_all(root);
597 }
598
599 #[test]
600 fn zero_numeric_limits_match_env_default_semantics() {
601 let root = temp_root("zero-limits");
602 let engine = Engine::builder()
603 .data_root(root.clone())
604 .key(SecretBytes::try_from_slice(b"key").unwrap())
605 .max_storage_bytes(Some(0))
606 .max_listen_connections(0)
607 .listen_replay_max(0)
608 .read_cache_max_entries(0)
609 .build()
610 .unwrap();
611
612 assert_eq!(engine.inner.core.max_storage_bytes, None);
613 assert_eq!(
614 engine.inner.core.listen_slots.available_permits(),
615 DEFAULT_MAX_LISTEN_CONNECTIONS
616 );
617 assert_eq!(
618 engine.inner.core.listen_replay_max,
619 DEFAULT_LISTEN_REPLAY_MAX
620 );
621 assert_eq!(
622 engine.inner.core.read_cache.max_entries,
623 DEFAULT_READ_CACHE_MAX_ENTRIES
624 );
625
626 drop(engine);
627 let _ = std::fs::remove_dir_all(root);
628 }
629
630 #[test]
631 fn shutdown_is_idempotent_without_extra_notifications() {
632 let root = temp_root("shutdown-idempotent");
633 let engine = Engine::builder()
634 .data_root(root.clone())
635 .key(SecretBytes::try_from_slice(b"key").unwrap())
636 .build()
637 .unwrap();
638 let mut shutdown = engine.inner.core.shutdown.clone();
639
640 assert!(!*shutdown.borrow());
641 engine.shutdown();
642 assert!(shutdown.has_changed().unwrap());
643 assert!(*shutdown.borrow_and_update());
644
645 engine.shutdown();
646 assert!(!shutdown.has_changed().unwrap());
647
648 drop(engine);
649 let _ = std::fs::remove_dir_all(root);
650 }
651
652 #[test]
653 fn build_holds_the_data_root_writer_lock() {
654 let root = temp_root("writer-lock");
655 let engine = Engine::builder()
656 .data_root(root.clone())
657 .key(SecretBytes::try_from_slice(b"key").unwrap())
658 .build()
659 .unwrap();
660
661 let second = Engine::builder()
662 .data_root(root.clone())
663 .key(SecretBytes::try_from_slice(b"key").unwrap())
664 .build();
665
666 assert!(matches!(
667 second,
668 Err(EngineBuildError::DataRootLockHeld { .. })
669 ));
670
671 drop(engine);
672 let _ = std::fs::remove_dir_all(root);
673 }
674
675 #[test]
676 fn build_verifies_audit_chains_before_returning_engine() {
677 let root = temp_root("audit-verify");
678 let key = b"key".to_vec();
679 let write_result = world::write_with_audit_checked(
680 &root,
681 "home/audit",
682 b"body",
683 "text/plain",
684 &[],
685 &key,
686 None,
687 );
688 assert!(write_result.is_ok(), "fixture write should succeed");
689 let db = world::world_db(&root, "home/audit");
690 let conn = rusqlite::Connection::open(db).unwrap();
691 conn.execute(
692 "UPDATE events SET body_sha256='tampered' WHERE id=(SELECT max(id) FROM events)",
693 [],
694 )
695 .unwrap();
696 drop(conn);
697
698 let result = Engine::builder()
699 .data_root(root.clone())
700 .key(SecretBytes::new(key).unwrap())
701 .build();
702
703 assert!(matches!(
704 result,
705 Err(EngineBuildError::AuditChainCorrupted { .. })
706 ));
707
708 let _ = std::fs::remove_dir_all(root);
709 }
710
711 #[test]
712 fn build_classifies_non_chain_verify_failures_as_storage_errors() {
713 let root = temp_root("audit-storage-error");
714 let key = b"key".to_vec();
715 let write_result = world::write_with_audit_checked(
716 &root,
717 "home/schema",
718 b"body",
719 "text/plain",
720 &[],
721 &key,
722 None,
723 );
724 assert!(write_result.is_ok(), "fixture write should succeed");
725 let db = world::world_db(&root, "home/schema");
726 let conn = rusqlite::Connection::open(db).unwrap();
727 conn.execute("DROP TABLE events", []).unwrap();
728 drop(conn);
729
730 let result = Engine::builder()
731 .data_root(root.clone())
732 .key(SecretBytes::new(key).unwrap())
733 .build();
734
735 match result {
736 Err(EngineBuildError::Storage {
737 sqlite_code: Some(_),
738 detail,
739 }) => assert!(detail.contains("audit verification for home/schema failed")),
740 other => panic!("expected storage error with sqlite code, got {other:?}"),
741 }
742
743 let _ = std::fs::remove_dir_all(root);
744 }
745}