1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::collections::VecDeque;
9
10use bytes::Bytes;
11
12use crate::{
13 auth,
14 delete_ops::{self, DeleteRequest, DeleteTraceHooks},
15 engine::{self, Engine, EngineError},
16 engine_types::{
17 AccessTier, ChangeEvent, EngineSubscription, Preconditions, ReadResult, Representation,
18 SubscribePattern, SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
19 },
20 etag, event, world_ops, AuthGate, BlockingSqliteError, Core,
21};
22
23pub(crate) struct EngineOps<'a> {
24 core: &'a Core,
25}
26
27struct SubscribePermit {
28 pattern: SubscribePattern,
29 slot: tokio::sync::OwnedSemaphorePermit,
30}
31
32impl<'a> EngineOps<'a> {
33 pub(crate) fn new(core: &'a Core) -> Self {
34 Self { core }
35 }
36
37 pub(crate) fn core(&self) -> &'a Core {
38 self.core
39 }
40
41 pub(crate) fn read(
42 &self,
43 world: &ValidatedWorldPath,
44 tier: auth::Tier,
45 ) -> Result<Option<ReadResult>, EngineError> {
46 let permit = world_ops::authorize_read(self.core, world, tier)?;
47 match world_ops::read_world(self.core, &permit)
48 .map_err(|err| read_error_to_engine(err, Some(world.as_str())))?
49 {
50 world_ops::ReadOutcome::Found { stage, etag } => Ok(Some(ReadResult::new(
51 Representation::new(Bytes::from(stage.body), stage.content_type, stage.headers),
52 etag,
53 ))),
54 world_ops::ReadOutcome::Missing => Ok(None),
55 }
56 }
57
58 pub(crate) async fn replace<H: world_ops::WriteTraceHooks + ?Sized>(
59 &self,
60 world: &ValidatedWorldPath,
61 representation: Representation,
62 preconditions: Preconditions,
63 tier: auth::Tier,
64 hooks: &H,
65 ) -> Result<WriteResult, EngineError> {
66 let permit = world_ops::authorize_write(world, tier)?;
67 let outcome = world_ops::replace_write(
68 self.core,
69 &permit,
70 world_ops::ReplaceRequest {
71 body: representation.body,
72 content_type: representation.content_type,
73 headers: representation.headers,
74 preconditions: preconditions.into(),
75 },
76 hooks,
77 )
78 .await
79 .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
80 Ok(outcome.into())
81 }
82
83 pub(crate) async fn append<H: world_ops::WriteTraceHooks + ?Sized>(
84 &self,
85 world: &ValidatedWorldPath,
86 body: Bytes,
87 preconditions: Preconditions,
88 tier: auth::Tier,
89 hooks: &H,
90 ) -> Result<WriteResult, EngineError> {
91 let permit = world_ops::authorize_write(world, tier)?;
92 let outcome = world_ops::append_write(
93 self.core,
94 &permit,
95 world_ops::AppendRequest {
96 body,
97 preconditions: preconditions.into(),
98 },
99 hooks,
100 )
101 .await
102 .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
103 Ok(outcome.into())
104 }
105
106 pub(crate) async fn delete<H: DeleteTraceHooks + ?Sized>(
107 &self,
108 world: &ValidatedWorldPath,
109 req: DeleteRequest,
110 tier: auth::Tier,
111 hooks: &H,
112 ) -> Result<(), delete_ops::DeleteError> {
113 let permit = delete_ops::authorize_delete(world, tier)?;
114 delete_ops::delete(self.core, &permit, req, hooks).await
115 }
116
117 pub(crate) fn subscribe(
118 &self,
119 pattern: &SubscribePattern,
120 tier: auth::Tier,
121 since: Option<u64>,
122 ) -> Result<EngineSubscription, EngineError> {
123 let permit = self.authorize_subscribe(pattern, tier)?;
124 Ok(self.open_subscription(permit, since))
125 }
126
127 fn authorize_subscribe(
128 &self,
129 pattern: &SubscribePattern,
130 tier: auth::Tier,
131 ) -> Result<SubscribePermit, EngineError> {
132 if !crate::can_read(self.core, tier) {
133 return Err(EngineError::Auth(AuthGate::Read));
134 }
135 if *self.core.shutdown.borrow() {
136 return Err(EngineError::ShuttingDown);
137 }
138 let slot = self
139 .core
140 .listen_slots
141 .clone()
142 .try_acquire_owned()
143 .map_err(|_| EngineError::SubscriptionLimit)?;
144 Ok(SubscribePermit {
145 pattern: pattern.clone(),
146 slot,
147 })
148 }
149
150 fn open_subscription(&self, permit: SubscribePermit, since: Option<u64>) -> EngineSubscription {
151 let rx = self.core.events.subscribe();
152 let (lag, replay, live_floor) = replay_after(self.core, since, &permit.pattern);
153 let replay_mode = since.is_some();
154 let mut initial = VecDeque::new();
155 if let Some(skipped) = lag {
156 initial.push_back(Err(SubscriptionRecvError::Lagged { skipped }));
157 }
158 initial.extend(replay.into_iter().map(Ok));
159 EngineSubscription::new(
160 permit.slot,
161 initial,
162 rx,
163 permit.pattern,
164 replay_mode,
165 live_floor,
166 self.core.shutdown.clone(),
167 )
168 }
169}
170
171impl Engine {
172 pub fn read(
185 &self,
186 world: &ValidatedWorldPath,
187 tier: AccessTier,
188 ) -> Result<Option<ReadResult>, EngineError> {
189 EngineOps::new(self.core()).read(world, tier.into())
190 }
191
192 pub async fn replace(
210 &self,
211 world: &ValidatedWorldPath,
212 representation: Representation,
213 preconditions: Preconditions,
214 tier: AccessTier,
215 ) -> Result<WriteResult, EngineError> {
216 EngineOps::new(self.core())
217 .replace(
218 world,
219 representation,
220 preconditions,
221 tier.into(),
222 &NoopWriteTrace,
223 )
224 .await
225 }
226
227 pub async fn append(
235 &self,
236 world: &ValidatedWorldPath,
237 body: Bytes,
238 preconditions: Preconditions,
239 tier: AccessTier,
240 ) -> Result<WriteResult, EngineError> {
241 EngineOps::new(self.core())
242 .append(world, body, preconditions, tier.into(), &NoopWriteTrace)
243 .await
244 }
245
246 pub async fn delete(
263 &self,
264 world: &ValidatedWorldPath,
265 preconditions: Preconditions,
266 tier: AccessTier,
267 ) -> Result<(), EngineError> {
268 EngineOps::new(self.core())
269 .delete(
270 world,
271 DeleteRequest {
272 preconditions,
273 content_type: String::new(),
274 headers: Vec::new(),
275 },
276 tier.into(),
277 &NoopDeleteTrace,
278 )
279 .await
280 .map_err(Into::into)
281 }
282
283 pub fn subscribe(
301 &self,
302 pattern: &SubscribePattern,
303 tier: AccessTier,
304 since: Option<u64>,
305 ) -> Result<EngineSubscription, EngineError> {
306 EngineOps::new(self.core()).subscribe(pattern, tier.into(), since)
307 }
308}
309
310struct NoopWriteTrace;
311
312impl world_ops::WriteTraceHooks for NoopWriteTrace {}
313
314struct NoopDeleteTrace;
315
316impl DeleteTraceHooks for NoopDeleteTrace {}
317
318pub(crate) fn replay_after(
319 core: &Core,
320 since: Option<u64>,
321 pattern: &SubscribePattern,
322) -> (Option<u64>, Vec<ChangeEvent>, u64) {
323 let Some(last_id) = since else {
324 return (None, Vec::new(), 0);
325 };
326 let log = core
327 .event_log
328 .lock()
329 .unwrap_or_else(|poison| poison.into_inner());
330 let gap = log.front().and_then(|oldest| {
331 let expected_next = last_id.saturating_add(1);
332 if expected_next < oldest.id {
333 Some(oldest.id - expected_next)
334 } else {
335 None
336 }
337 });
338 let replay: Vec<ChangeEvent> = log
339 .iter()
340 .filter(|change| change.id > last_id && event::matches(pattern.as_str(), &change.path))
341 .cloned()
342 .map(Into::into)
343 .collect();
344 let live_floor = replay.last().map(|change| change.id).unwrap_or(last_id);
345 (gap, replay, live_floor)
346}
347
348impl From<Preconditions> for etag::Preconditions {
349 fn from(value: Preconditions) -> Self {
350 etag::Preconditions::new(
351 value.if_match.into_iter().map(Into::into).collect(),
352 value.if_none_match.into_iter().map(Into::into).collect(),
353 )
354 }
355}
356
357impl From<etag::Preconditions> for Preconditions {
358 fn from(value: etag::Preconditions) -> Self {
359 let (if_match, if_none_match) = value.into_parts();
360 Self::new(
361 if_match.into_iter().map(Into::into).collect(),
362 if_none_match.into_iter().map(Into::into).collect(),
363 )
364 }
365}
366
367impl From<world_ops::WriteOutcome> for WriteResult {
368 fn from(value: world_ops::WriteOutcome) -> Self {
369 Self::new(
370 match value.status_kind {
371 world_ops::WriteStatusKind::Created => WriteKind::Created,
372 world_ops::WriteStatusKind::Updated => WriteKind::Updated,
373 },
374 value.etag,
375 )
376 }
377}
378
379impl From<AccessTier> for auth::Tier {
380 fn from(value: AccessTier) -> Self {
381 match value {
382 AccessTier::Anon => Self::Anon,
383 AccessTier::Read => Self::Read,
384 AccessTier::Write => Self::Write,
385 AccessTier::Approve => Self::Approve,
386 }
387 }
388}
389
390impl From<world_ops::ReadError> for EngineError {
391 fn from(value: world_ops::ReadError) -> Self {
392 read_error_to_engine(value, None)
393 }
394}
395
396impl From<world_ops::WriteError> for EngineError {
397 fn from(value: world_ops::WriteError) -> Self {
398 write_error_to_engine(value, None)
399 }
400}
401
402fn storage_op_label(op: world_ops::StorageOp) -> &'static str {
403 match op {
404 world_ops::StorageOp::Read => "read",
405 world_ops::StorageOp::WriteAudit => "write_audit",
406 }
407}
408
409fn read_error_to_engine(value: world_ops::ReadError, world: Option<&str>) -> EngineError {
410 match value {
411 world_ops::ReadError::Auth(gate) => EngineError::Auth(gate),
412 world_ops::ReadError::TransientStorage { scope, err } => {
413 log_storage_error(scope, &err, "read", world);
414 EngineError::TransientStorage {
415 sqlite_code: engine::sqlite_code(&err),
416 }
417 }
418 world_ops::ReadError::InsufficientStorage { scope, err } => {
419 log_storage_error(scope, &err, "read", world);
420 EngineError::InsufficientStorage {
421 sqlite_code: engine::sqlite_code(&err),
422 }
423 }
424 world_ops::ReadError::StorageRead { scope, err } => {
425 log_storage_error(scope, &err, "read", world);
426 EngineError::Storage {
427 sqlite_code: engine::sqlite_code(&err),
428 }
429 }
430 world_ops::ReadError::PermitWorldMismatch => {
431 EngineError::InternalInvariant("read permit world mismatch")
432 }
433 }
434}
435
436fn write_error_to_engine(value: world_ops::WriteError, world: Option<&str>) -> EngineError {
437 match value {
438 world_ops::WriteError::Auth(gate) => EngineError::Auth(gate),
439 world_ops::WriteError::PayloadTooLarge { max } => EngineError::PayloadTooLarge { max },
440 world_ops::WriteError::PreconditionFailed { message } => {
441 EngineError::PreconditionFailed { message }
442 }
443 world_ops::WriteError::NotFound => EngineError::NotFound,
444 world_ops::WriteError::QuotaExceeded {
445 used,
446 quota,
447 projected,
448 } => EngineError::QuotaExceeded {
449 used,
450 quota,
451 projected,
452 },
453 world_ops::WriteError::TransientStorage { scope, err, op } => {
454 log_storage_error(scope, &err, storage_op_label(op), world);
455 EngineError::TransientStorage {
456 sqlite_code: engine::sqlite_code(&err),
457 }
458 }
459 world_ops::WriteError::InsufficientStorage { scope, err, op } => {
460 log_storage_error(scope, &err, storage_op_label(op), world);
461 EngineError::InsufficientStorage {
462 sqlite_code: engine::sqlite_code(&err),
463 }
464 }
465 world_ops::WriteError::StorageRead { scope, err } => {
466 log_storage_error(scope, &err, "read", world);
467 EngineError::Storage {
468 sqlite_code: engine::sqlite_code(&err),
469 }
470 }
471 world_ops::WriteError::StorageWriteAudit { scope, err } => {
472 log_storage_error(scope, &err, "write_audit", world);
473 EngineError::Storage {
474 sqlite_code: engine::sqlite_code(&err),
475 }
476 }
477 world_ops::WriteError::Internal(message) => EngineError::InternalInvariant(message),
478 }
479}
480
481pub(crate) fn log_storage_error(
482 scope: &'static str,
483 err: &rusqlite::Error,
484 operation: &'static str,
485 world: Option<&str>,
486) {
487 #[cfg(feature = "unstable-engine")]
488 tracing::error!(
489 scope,
490 operation,
491 world = world.unwrap_or(""),
492 sqlite_code = ?engine::sqlite_code(err),
493 error = %err,
494 "engine storage error"
495 );
496
497 #[cfg(not(feature = "unstable-engine"))]
498 match world {
499 Some(world) => {
500 eprintln!("elastik-core internal {scope} ({operation}) world={world}: {err}");
501 }
502 None => eprintln!("elastik-core internal {scope} ({operation}): {err}"),
503 }
504}
505
506pub(crate) fn log_blocking_storage_error(
507 scope: &'static str,
508 err: &BlockingSqliteError,
509 operation: &'static str,
510 world: Option<&str>,
511) {
512 match err {
513 BlockingSqliteError::Sqlite(err) => log_storage_error(scope, err, operation, world),
514 BlockingSqliteError::Worker => {
515 #[cfg(feature = "unstable-engine")]
516 tracing::error!(
517 scope,
518 operation,
519 world = world.unwrap_or(""),
520 "engine storage worker failed"
521 );
522
523 #[cfg(not(feature = "unstable-engine"))]
524 match world {
525 Some(world) => {
526 eprintln!(
527 "elastik-core internal {scope} ({operation}) world={world}: sqlite worker failed"
528 );
529 }
530 None => {
531 eprintln!("elastik-core internal {scope} ({operation}): sqlite worker failed");
532 }
533 }
534 }
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use std::path::PathBuf;
541 use std::time::{SystemTime, UNIX_EPOCH};
542
543 use bytes::Bytes;
544
545 use super::*;
546 use crate::engine_types::{ChangeVerb, SecretBytes};
547
548 fn temp_root(name: &str) -> PathBuf {
549 let nonce = SystemTime::now()
550 .duration_since(UNIX_EPOCH)
551 .unwrap()
552 .as_nanos();
553 let root = std::env::temp_dir().join(format!(
554 "elastik-engine-ops-{name}-{}-{nonce}",
555 std::process::id()
556 ));
557 let _ = std::fs::remove_dir_all(&root);
558 root
559 }
560
561 fn test_engine(name: &str) -> (Engine, PathBuf) {
562 let root = temp_root(name);
563 let engine = Engine::builder()
564 .data_root(root.clone())
565 .key(SecretBytes::try_from_slice(b"key").unwrap())
566 .max_listen_connections(1)
567 .build()
568 .unwrap();
569 (engine, root)
570 }
571
572 #[test]
573 fn replay_after_reports_ring_gap_and_replays_available_events() {
574 let (engine, root) = test_engine("replay-gap");
575 {
576 let mut log = engine.core().event_log.lock().unwrap();
577 for id in 10..=12 {
578 log.push_back(event::ChangeEvent {
579 id,
580 verb: ChangeVerb::Replace,
581 path: format!("/home/task/{id}"),
582 etag: format!("hmac-{id}"),
583 });
584 }
585 }
586
587 let pattern = SubscribePattern::new("home/task/*");
588 let (gap, replay, floor) = replay_after(engine.core(), Some(5), &pattern);
589
590 assert_eq!(gap, Some(4));
591 assert_eq!(replay.len(), 3);
592 assert_eq!(replay[0].id, 10);
593 assert_eq!(replay[0].path.as_str(), "home/task/10");
594 assert_eq!(floor, 12);
595
596 drop(engine);
597 let _ = std::fs::remove_dir_all(root);
598 }
599
600 #[test]
601 fn replay_after_handles_max_last_event_id_without_overflow() {
602 let (engine, root) = test_engine("replay-max-last-id");
603 {
604 let mut log = engine.core().event_log.lock().unwrap();
605 log.push_back(event::ChangeEvent {
606 id: u64::MAX,
607 verb: ChangeVerb::Replace,
608 path: "/home/task/max".to_string(),
609 etag: "hmac-max".to_string(),
610 });
611 }
612
613 let pattern = SubscribePattern::new("home/task/*");
614 let (gap, replay, floor) = replay_after(engine.core(), Some(u64::MAX), &pattern);
615
616 assert_eq!(gap, None);
617 assert!(replay.is_empty());
618 assert_eq!(floor, u64::MAX);
619
620 drop(engine);
621 let _ = std::fs::remove_dir_all(root);
622 }
623
624 #[tokio::test]
625 async fn engine_delete_requires_approve_and_removes_world() {
626 let (engine, root) = test_engine("delete");
627 let world = ValidatedWorldPath::new("home/delete-me").unwrap();
628 engine
629 .replace(
630 &world,
631 Representation::new(Bytes::from_static(b"alive"), "text/plain", Vec::new()),
632 Preconditions::none(),
633 AccessTier::Write,
634 )
635 .await
636 .unwrap();
637
638 assert!(matches!(
639 engine
640 .delete(&world, Preconditions::none(), AccessTier::Write)
641 .await,
642 Err(EngineError::Auth(AuthGate::Delete))
643 ));
644
645 engine
646 .delete(&world, Preconditions::none(), AccessTier::Approve)
647 .await
648 .unwrap();
649 assert!(engine.read(&world, AccessTier::Read).unwrap().is_none());
650
651 drop(engine);
652 let _ = std::fs::remove_dir_all(root);
653 }
654
655 #[tokio::test]
656 async fn engine_subscribe_requires_read_tier_and_replays_since_id() {
657 let root = temp_root("subscribe");
658 let engine = Engine::builder()
659 .data_root(root.clone())
660 .key(SecretBytes::try_from_slice(b"key").unwrap())
661 .read_token(b"reader".to_vec())
662 .build()
663 .unwrap();
664 let pattern = SubscribePattern::new("home/events/*");
665
666 assert!(matches!(
667 engine.subscribe(&pattern, AccessTier::Anon, None),
668 Err(EngineError::Auth(AuthGate::Read))
669 ));
670
671 let world = ValidatedWorldPath::new("home/events/a").unwrap();
672 engine
673 .replace(
674 &world,
675 Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
676 Preconditions::none(),
677 AccessTier::Write,
678 )
679 .await
680 .unwrap();
681
682 let mut subscription = engine
683 .subscribe(&pattern, AccessTier::Read, Some(0))
684 .expect("read tier subscribes");
685 let event = subscription.recv().await.expect("replay event");
686 assert_eq!(event.verb, ChangeVerb::Replace);
687 assert_eq!(event.path.as_str(), "home/events/a");
688
689 drop(subscription);
690 drop(engine);
691 let _ = std::fs::remove_dir_all(root);
692 }
693
694 #[test]
695 fn engine_subscribe_enforces_slot_cap_at_entry() {
696 let (engine, root) = test_engine("subscribe-cap");
697 let pattern = SubscribePattern::new("*");
698 let first = engine
699 .subscribe(&pattern, AccessTier::Anon, None)
700 .expect("first subscription consumes the sole slot");
701 assert!(matches!(
702 engine.subscribe(&pattern, AccessTier::Anon, None),
703 Err(EngineError::SubscriptionLimit)
704 ));
705
706 drop(first);
707 drop(engine);
708 let _ = std::fs::remove_dir_all(root);
709 }
710
711 #[test]
712 fn engine_subscribe_denied_auth_does_not_consume_slot() {
713 let root = temp_root("subscribe-auth-slot");
714 let engine = Engine::builder()
715 .data_root(root.clone())
716 .key(SecretBytes::try_from_slice(b"key").unwrap())
717 .read_token(b"reader".to_vec())
718 .max_listen_connections(1)
719 .build()
720 .unwrap();
721 let pattern = SubscribePattern::new("*");
722
723 assert!(matches!(
724 engine.subscribe(&pattern, AccessTier::Anon, None),
725 Err(EngineError::Auth(AuthGate::Read))
726 ));
727 let subscription = engine
728 .subscribe(&pattern, AccessTier::Read, None)
729 .expect("failed auth must not consume the only slot");
730
731 drop(subscription);
732 drop(engine);
733 let _ = std::fs::remove_dir_all(root);
734 }
735
736 #[tokio::test]
737 async fn engine_subscription_closed_is_terminal() {
738 let (engine, root) = test_engine("subscribe-closed");
739 let pattern = SubscribePattern::new("*");
740 let mut subscription = engine
741 .subscribe(&pattern, AccessTier::Anon, None)
742 .expect("subscription opens before shutdown");
743
744 engine.shutdown();
745 assert!(matches!(
746 subscription.recv().await,
747 Err(SubscriptionRecvError::Closed)
748 ));
749 assert!(matches!(
750 subscription.recv().await,
751 Err(SubscriptionRecvError::Closed)
752 ));
753
754 drop(subscription);
755 drop(engine);
756 let _ = std::fs::remove_dir_all(root);
757 }
758
759 #[tokio::test]
760 async fn engine_subscription_drains_replay_before_shutdown() {
761 let (engine, root) = test_engine("subscribe-replay-before-shutdown");
762 let pattern = SubscribePattern::new("home/replay/*");
763 for name in ["home/replay/a", "home/replay/b"] {
764 let world = ValidatedWorldPath::new(name).unwrap();
765 engine
766 .replace(
767 &world,
768 Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
769 Preconditions::none(),
770 AccessTier::Write,
771 )
772 .await
773 .unwrap();
774 }
775
776 let mut subscription = engine
777 .subscribe(&pattern, AccessTier::Anon, Some(0))
778 .expect("subscription opens before shutdown");
779 engine.shutdown();
780 assert_eq!(
781 subscription.recv().await.unwrap().path.as_str(),
782 "home/replay/a"
783 );
784 assert_eq!(
785 subscription.recv().await.unwrap().path.as_str(),
786 "home/replay/b"
787 );
788 assert!(matches!(
789 subscription.recv().await,
790 Err(SubscriptionRecvError::Closed)
791 ));
792
793 drop(subscription);
794 drop(engine);
795 let _ = std::fs::remove_dir_all(root);
796 }
797}