1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
8
9use std::collections::VecDeque;
10
11use bytes::Bytes;
12
13use crate::{
14 auth,
15 delete_ops::{self, DeleteRequest, DeleteTraceHooks},
16 engine::{self, Engine, EngineError},
17 engine_types::{
18 AccessTier, ChangeEvent, EngineSubscription, EtagMatcher, Preconditions, ReadResult,
19 Representation, SubscribePattern, SubscriptionRecvError, ValidatedWorldPath, WriteKind,
20 WriteResult,
21 },
22 etag, event, world_ops, AuthGate, BlockingSqliteError, Core,
23};
24
25pub(crate) struct EngineOps<'a> {
26 core: &'a Core,
27}
28
29struct SubscribePermit {
30 pattern: SubscribePattern,
31 slot: tokio::sync::OwnedSemaphorePermit,
32}
33
34impl<'a> EngineOps<'a> {
35 pub(crate) fn new(core: &'a Core) -> Self {
36 Self { core }
37 }
38
39 pub(crate) fn core(&self) -> &'a Core {
40 self.core
41 }
42
43 pub(crate) fn read(
44 &self,
45 world: &ValidatedWorldPath,
46 tier: auth::Tier,
47 ) -> Result<Option<ReadResult>, EngineError> {
48 let permit = world_ops::authorize_read(self.core, world, tier)?;
49 match world_ops::read_world(self.core, &permit)
50 .map_err(|err| read_error_to_engine(err, Some(world.as_str())))?
51 {
52 world_ops::ReadOutcome::Found { stage, etag } => Ok(Some(ReadResult {
53 representation: Representation {
54 body: Bytes::from(stage.body),
55 content_type: stage.content_type,
56 headers: stage.headers,
57 },
58 etag,
59 })),
60 world_ops::ReadOutcome::Missing => Ok(None),
61 }
62 }
63
64 pub(crate) async fn replace<H: world_ops::WriteTraceHooks + ?Sized>(
65 &self,
66 world: &ValidatedWorldPath,
67 representation: Representation,
68 preconditions: Preconditions,
69 tier: auth::Tier,
70 hooks: &H,
71 ) -> Result<WriteResult, EngineError> {
72 let permit = world_ops::authorize_write(world, tier)?;
73 let outcome = world_ops::replace_write(
74 self.core,
75 &permit,
76 world_ops::ReplaceRequest {
77 body: representation.body,
78 content_type: representation.content_type,
79 headers: representation.headers,
80 preconditions: preconditions.into(),
81 },
82 hooks,
83 )
84 .await
85 .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
86 Ok(outcome.into())
87 }
88
89 pub(crate) async fn append<H: world_ops::WriteTraceHooks + ?Sized>(
90 &self,
91 world: &ValidatedWorldPath,
92 body: Bytes,
93 preconditions: Preconditions,
94 tier: auth::Tier,
95 hooks: &H,
96 ) -> Result<WriteResult, EngineError> {
97 let permit = world_ops::authorize_write(world, tier)?;
98 let outcome = world_ops::append_write(
99 self.core,
100 &permit,
101 world_ops::AppendRequest {
102 body,
103 preconditions: preconditions.into(),
104 },
105 hooks,
106 )
107 .await
108 .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
109 Ok(outcome.into())
110 }
111
112 pub(crate) async fn delete<H: DeleteTraceHooks + ?Sized>(
113 &self,
114 world: &ValidatedWorldPath,
115 req: DeleteRequest,
116 tier: auth::Tier,
117 hooks: &H,
118 ) -> Result<(), delete_ops::DeleteError> {
119 let permit = delete_ops::authorize_delete(world, tier)?;
120 delete_ops::delete(self.core, &permit, req, hooks).await
121 }
122
123 pub(crate) fn subscribe(
124 &self,
125 pattern: &SubscribePattern,
126 tier: auth::Tier,
127 since: Option<u64>,
128 ) -> Result<EngineSubscription, EngineError> {
129 let permit = self.authorize_subscribe(pattern, tier)?;
130 Ok(self.open_subscription(permit, since))
131 }
132
133 fn authorize_subscribe(
134 &self,
135 pattern: &SubscribePattern,
136 tier: auth::Tier,
137 ) -> Result<SubscribePermit, EngineError> {
138 if !crate::can_read(self.core, tier) {
139 return Err(EngineError::Auth(AuthGate::Read));
140 }
141 if *self.core.shutdown.borrow() {
142 return Err(EngineError::ShuttingDown);
143 }
144 let slot = self
145 .core
146 .listen_slots
147 .clone()
148 .try_acquire_owned()
149 .map_err(|_| EngineError::SubscriptionLimit)?;
150 Ok(SubscribePermit {
151 pattern: pattern.clone(),
152 slot,
153 })
154 }
155
156 fn open_subscription(&self, permit: SubscribePermit, since: Option<u64>) -> EngineSubscription {
157 let rx = self.core.events.subscribe();
158 let (lag, replay, live_floor) = replay_after(self.core, since, &permit.pattern);
159 let replay_mode = since.is_some();
160 let mut initial = VecDeque::new();
161 if let Some(skipped) = lag {
162 initial.push_back(Err(SubscriptionRecvError::Lagged { skipped }));
163 }
164 initial.extend(replay.into_iter().map(Ok));
165 EngineSubscription::new(
166 permit.slot,
167 initial,
168 rx,
169 permit.pattern,
170 replay_mode,
171 live_floor,
172 self.core.shutdown.clone(),
173 )
174 }
175}
176
177impl Engine {
178 pub fn read(
191 &self,
192 world: &ValidatedWorldPath,
193 tier: AccessTier,
194 ) -> Result<Option<ReadResult>, EngineError> {
195 EngineOps::new(self.core()).read(world, tier.into())
196 }
197
198 pub async fn replace(
216 &self,
217 world: &ValidatedWorldPath,
218 representation: Representation,
219 preconditions: Preconditions,
220 tier: AccessTier,
221 ) -> Result<WriteResult, EngineError> {
222 EngineOps::new(self.core())
223 .replace(
224 world,
225 representation,
226 preconditions,
227 tier.into(),
228 &NoopWriteTrace,
229 )
230 .await
231 }
232
233 pub async fn append(
241 &self,
242 world: &ValidatedWorldPath,
243 body: Bytes,
244 preconditions: Preconditions,
245 tier: AccessTier,
246 ) -> Result<WriteResult, EngineError> {
247 EngineOps::new(self.core())
248 .append(world, body, preconditions, tier.into(), &NoopWriteTrace)
249 .await
250 }
251
252 pub async fn delete(
269 &self,
270 world: &ValidatedWorldPath,
271 preconditions: Preconditions,
272 tier: AccessTier,
273 ) -> Result<(), EngineError> {
274 EngineOps::new(self.core())
275 .delete(
276 world,
277 DeleteRequest {
278 preconditions,
279 content_type: String::new(),
280 headers: Vec::new(),
281 },
282 tier.into(),
283 &NoopDeleteTrace,
284 )
285 .await
286 .map_err(Into::into)
287 }
288
289 pub fn subscribe(
307 &self,
308 pattern: &SubscribePattern,
309 tier: AccessTier,
310 since: Option<u64>,
311 ) -> Result<EngineSubscription, EngineError> {
312 EngineOps::new(self.core()).subscribe(pattern, tier.into(), since)
313 }
314}
315
316struct NoopWriteTrace;
317
318impl world_ops::WriteTraceHooks for NoopWriteTrace {}
319
320struct NoopDeleteTrace;
321
322impl DeleteTraceHooks for NoopDeleteTrace {}
323
324pub(crate) fn replay_after(
325 core: &Core,
326 since: Option<u64>,
327 pattern: &SubscribePattern,
328) -> (Option<u64>, Vec<ChangeEvent>, u64) {
329 let Some(last_id) = since else {
330 return (None, Vec::new(), 0);
331 };
332 let log = core
333 .event_log
334 .lock()
335 .unwrap_or_else(|poison| poison.into_inner());
336 let gap = log.front().and_then(|oldest| {
337 let expected_next = last_id.saturating_add(1);
338 if expected_next < oldest.id {
339 Some(oldest.id - expected_next)
340 } else {
341 None
342 }
343 });
344 let replay: Vec<ChangeEvent> = log
345 .iter()
346 .filter(|change| change.id > last_id && event::matches(pattern.as_str(), &change.path))
347 .cloned()
348 .map(Into::into)
349 .collect();
350 let live_floor = replay.last().map(|change| change.id).unwrap_or(last_id);
351 (gap, replay, live_floor)
352}
353
354impl From<Preconditions> for etag::Preconditions {
355 fn from(value: Preconditions) -> Self {
356 etag::Preconditions::new(
357 value.if_match.into_iter().map(Into::into).collect(),
358 value.if_none_match.into_iter().map(Into::into).collect(),
359 )
360 }
361}
362
363impl From<etag::Preconditions> for Preconditions {
364 fn from(value: etag::Preconditions) -> Self {
365 let (if_match, if_none_match) = value.into_parts();
366 Self {
367 if_match: if_match.into_iter().map(Into::into).collect(),
368 if_none_match: if_none_match.into_iter().map(Into::into).collect(),
369 }
370 }
371}
372
373impl From<EtagMatcher> for etag::EtagMatcher {
374 fn from(value: EtagMatcher) -> Self {
375 match value {
376 EtagMatcher::Any => Self::Any,
377 EtagMatcher::Strong(value) => Self::Strong(value),
378 EtagMatcher::Weak(value) => Self::Weak(value),
379 EtagMatcher::Invalid => Self::Invalid,
380 }
381 }
382}
383
384impl From<etag::EtagMatcher> for EtagMatcher {
385 fn from(value: etag::EtagMatcher) -> Self {
386 match value {
387 etag::EtagMatcher::Any => Self::Any,
388 etag::EtagMatcher::Strong(value) => Self::Strong(value),
389 etag::EtagMatcher::Weak(value) => Self::Weak(value),
390 etag::EtagMatcher::Invalid => Self::Invalid,
391 }
392 }
393}
394
395impl From<world_ops::WriteOutcome> for WriteResult {
396 fn from(value: world_ops::WriteOutcome) -> Self {
397 Self {
398 kind: match value.status_kind {
399 world_ops::WriteStatusKind::Created => WriteKind::Created,
400 world_ops::WriteStatusKind::Updated => WriteKind::Updated,
401 },
402 etag: value.etag,
403 }
404 }
405}
406
407impl From<AccessTier> for auth::Tier {
408 fn from(value: AccessTier) -> Self {
409 match value {
410 AccessTier::Anon => Self::Anon,
411 AccessTier::Read => Self::Read,
412 AccessTier::Write => Self::Write,
413 AccessTier::Approve => Self::Approve,
414 }
415 }
416}
417
418impl From<world_ops::ReadError> for EngineError {
419 fn from(value: world_ops::ReadError) -> Self {
420 read_error_to_engine(value, None)
421 }
422}
423
424impl From<world_ops::WriteError> for EngineError {
425 fn from(value: world_ops::WriteError) -> Self {
426 write_error_to_engine(value, None)
427 }
428}
429
430fn storage_op_label(op: world_ops::StorageOp) -> &'static str {
431 match op {
432 world_ops::StorageOp::Read => "read",
433 world_ops::StorageOp::WriteAudit => "write_audit",
434 }
435}
436
437fn read_error_to_engine(value: world_ops::ReadError, world: Option<&str>) -> EngineError {
438 match value {
439 world_ops::ReadError::Auth(gate) => EngineError::Auth(gate),
440 world_ops::ReadError::TransientStorage { scope, err } => {
441 log_storage_error(scope, &err, "read", world);
442 EngineError::TransientStorage {
443 sqlite_code: engine::sqlite_code(&err),
444 }
445 }
446 world_ops::ReadError::InsufficientStorage { scope, err } => {
447 log_storage_error(scope, &err, "read", world);
448 EngineError::InsufficientStorage {
449 sqlite_code: engine::sqlite_code(&err),
450 }
451 }
452 world_ops::ReadError::StorageRead { scope, err } => {
453 log_storage_error(scope, &err, "read", world);
454 EngineError::Storage {
455 sqlite_code: engine::sqlite_code(&err),
456 }
457 }
458 world_ops::ReadError::PermitWorldMismatch => {
459 EngineError::InternalInvariant("read permit world mismatch")
460 }
461 }
462}
463
464fn write_error_to_engine(value: world_ops::WriteError, world: Option<&str>) -> EngineError {
465 match value {
466 world_ops::WriteError::Auth(gate) => EngineError::Auth(gate),
467 world_ops::WriteError::PayloadTooLarge { max } => EngineError::PayloadTooLarge { max },
468 world_ops::WriteError::PreconditionFailed { message } => {
469 EngineError::PreconditionFailed { message }
470 }
471 world_ops::WriteError::NotFound => EngineError::NotFound,
472 world_ops::WriteError::QuotaExceeded {
473 used,
474 quota,
475 projected,
476 } => EngineError::QuotaExceeded {
477 used,
478 quota,
479 projected,
480 },
481 world_ops::WriteError::TransientStorage { scope, err, op } => {
482 log_storage_error(scope, &err, storage_op_label(op), world);
483 EngineError::TransientStorage {
484 sqlite_code: engine::sqlite_code(&err),
485 }
486 }
487 world_ops::WriteError::InsufficientStorage { scope, err, op } => {
488 log_storage_error(scope, &err, storage_op_label(op), world);
489 EngineError::InsufficientStorage {
490 sqlite_code: engine::sqlite_code(&err),
491 }
492 }
493 world_ops::WriteError::StorageRead { scope, err } => {
494 log_storage_error(scope, &err, "read", world);
495 EngineError::Storage {
496 sqlite_code: engine::sqlite_code(&err),
497 }
498 }
499 world_ops::WriteError::StorageWriteAudit { scope, err } => {
500 log_storage_error(scope, &err, "write_audit", world);
501 EngineError::Storage {
502 sqlite_code: engine::sqlite_code(&err),
503 }
504 }
505 world_ops::WriteError::Internal(message) => EngineError::InternalInvariant(message),
506 }
507}
508
509pub(crate) fn log_storage_error(
510 scope: &'static str,
511 err: &rusqlite::Error,
512 operation: &'static str,
513 world: Option<&str>,
514) {
515 #[cfg(feature = "unstable-engine")]
516 tracing::error!(
517 scope,
518 operation,
519 world = world.unwrap_or(""),
520 sqlite_code = ?engine::sqlite_code(err),
521 error = %err,
522 "engine storage error"
523 );
524
525 #[cfg(not(feature = "unstable-engine"))]
526 match world {
527 Some(world) => {
528 eprintln!("elastik-core internal {scope} ({operation}) world={world}: {err}");
529 }
530 None => eprintln!("elastik-core internal {scope} ({operation}): {err}"),
531 }
532}
533
534pub(crate) fn log_blocking_storage_error(
535 scope: &'static str,
536 err: &BlockingSqliteError,
537 operation: &'static str,
538 world: Option<&str>,
539) {
540 match err {
541 BlockingSqliteError::Sqlite(err) => log_storage_error(scope, err, operation, world),
542 BlockingSqliteError::Worker => {
543 #[cfg(feature = "unstable-engine")]
544 tracing::error!(
545 scope,
546 operation,
547 world = world.unwrap_or(""),
548 "engine storage worker failed"
549 );
550
551 #[cfg(not(feature = "unstable-engine"))]
552 match world {
553 Some(world) => {
554 eprintln!(
555 "elastik-core internal {scope} ({operation}) world={world}: sqlite worker failed"
556 );
557 }
558 None => {
559 eprintln!("elastik-core internal {scope} ({operation}): sqlite worker failed");
560 }
561 }
562 }
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use std::path::PathBuf;
569 use std::time::{SystemTime, UNIX_EPOCH};
570
571 use bytes::Bytes;
572
573 use super::*;
574 use crate::engine_types::SecretBytes;
575
576 fn temp_root(name: &str) -> PathBuf {
577 let nonce = SystemTime::now()
578 .duration_since(UNIX_EPOCH)
579 .unwrap()
580 .as_nanos();
581 let root = std::env::temp_dir().join(format!(
582 "elastik-engine-ops-{name}-{}-{nonce}",
583 std::process::id()
584 ));
585 let _ = std::fs::remove_dir_all(&root);
586 root
587 }
588
589 fn test_engine(name: &str) -> (Engine, PathBuf) {
590 let root = temp_root(name);
591 let engine = Engine::builder()
592 .data_root(root.clone())
593 .key(SecretBytes::try_from_slice(b"key").unwrap())
594 .max_listen_connections(1)
595 .build()
596 .unwrap();
597 (engine, root)
598 }
599
600 #[tokio::test]
601 async fn engine_delete_requires_approve_and_removes_world() {
602 let (engine, root) = test_engine("delete");
603 let world = ValidatedWorldPath::new("home/delete-me").unwrap();
604 engine
605 .replace(
606 &world,
607 Representation {
608 body: Bytes::from_static(b"alive"),
609 content_type: "text/plain".to_owned(),
610 headers: Vec::new(),
611 },
612 Preconditions::none(),
613 AccessTier::Write,
614 )
615 .await
616 .unwrap();
617
618 assert!(matches!(
619 engine
620 .delete(&world, Preconditions::none(), AccessTier::Write)
621 .await,
622 Err(EngineError::Auth(AuthGate::Delete))
623 ));
624
625 engine
626 .delete(&world, Preconditions::none(), AccessTier::Approve)
627 .await
628 .unwrap();
629 assert!(engine.read(&world, AccessTier::Read).unwrap().is_none());
630
631 drop(engine);
632 let _ = std::fs::remove_dir_all(root);
633 }
634
635 #[tokio::test]
636 async fn engine_subscribe_requires_read_tier_and_replays_since_id() {
637 let root = temp_root("subscribe");
638 let engine = Engine::builder()
639 .data_root(root.clone())
640 .key(SecretBytes::try_from_slice(b"key").unwrap())
641 .read_token(b"reader".to_vec())
642 .build()
643 .unwrap();
644 let pattern = SubscribePattern::new("home/events/*");
645
646 assert!(matches!(
647 engine.subscribe(&pattern, AccessTier::Anon, None),
648 Err(EngineError::Auth(AuthGate::Read))
649 ));
650
651 let world = ValidatedWorldPath::new("home/events/a").unwrap();
652 engine
653 .replace(
654 &world,
655 Representation {
656 body: Bytes::from_static(b"event"),
657 content_type: "text/plain".to_owned(),
658 headers: Vec::new(),
659 },
660 Preconditions::none(),
661 AccessTier::Write,
662 )
663 .await
664 .unwrap();
665
666 let mut subscription = engine
667 .subscribe(&pattern, AccessTier::Read, Some(0))
668 .expect("read tier subscribes");
669 let event = subscription.recv().await.expect("replay event");
670 assert_eq!(event.method, "PUT");
671 assert_eq!(event.path.as_str(), "home/events/a");
672
673 drop(subscription);
674 drop(engine);
675 let _ = std::fs::remove_dir_all(root);
676 }
677
678 #[test]
679 fn engine_subscribe_enforces_slot_cap_at_entry() {
680 let (engine, root) = test_engine("subscribe-cap");
681 let pattern = SubscribePattern::new("*");
682 let first = engine
683 .subscribe(&pattern, AccessTier::Anon, None)
684 .expect("first subscription consumes the sole slot");
685 assert!(matches!(
686 engine.subscribe(&pattern, AccessTier::Anon, None),
687 Err(EngineError::SubscriptionLimit)
688 ));
689
690 drop(first);
691 drop(engine);
692 let _ = std::fs::remove_dir_all(root);
693 }
694
695 #[test]
696 fn engine_subscribe_denied_auth_does_not_consume_slot() {
697 let root = temp_root("subscribe-auth-slot");
698 let engine = Engine::builder()
699 .data_root(root.clone())
700 .key(SecretBytes::try_from_slice(b"key").unwrap())
701 .read_token(b"reader".to_vec())
702 .max_listen_connections(1)
703 .build()
704 .unwrap();
705 let pattern = SubscribePattern::new("*");
706
707 assert!(matches!(
708 engine.subscribe(&pattern, AccessTier::Anon, None),
709 Err(EngineError::Auth(AuthGate::Read))
710 ));
711 let subscription = engine
712 .subscribe(&pattern, AccessTier::Read, None)
713 .expect("failed auth must not consume the only slot");
714
715 drop(subscription);
716 drop(engine);
717 let _ = std::fs::remove_dir_all(root);
718 }
719
720 #[tokio::test]
721 async fn engine_subscription_closed_is_terminal() {
722 let (engine, root) = test_engine("subscribe-closed");
723 let pattern = SubscribePattern::new("*");
724 let mut subscription = engine
725 .subscribe(&pattern, AccessTier::Anon, None)
726 .expect("subscription opens before shutdown");
727
728 engine.shutdown();
729 assert!(matches!(
730 subscription.recv().await,
731 Err(SubscriptionRecvError::Closed)
732 ));
733 assert!(matches!(
734 subscription.recv().await,
735 Err(SubscriptionRecvError::Closed)
736 ));
737
738 drop(subscription);
739 drop(engine);
740 let _ = std::fs::remove_dir_all(root);
741 }
742
743 #[tokio::test]
744 async fn engine_subscription_drains_replay_before_shutdown() {
745 let (engine, root) = test_engine("subscribe-replay-before-shutdown");
746 let pattern = SubscribePattern::new("home/replay/*");
747 for name in ["home/replay/a", "home/replay/b"] {
748 let world = ValidatedWorldPath::new(name).unwrap();
749 engine
750 .replace(
751 &world,
752 Representation {
753 body: Bytes::from_static(b"event"),
754 content_type: "text/plain".to_owned(),
755 headers: Vec::new(),
756 },
757 Preconditions::none(),
758 AccessTier::Write,
759 )
760 .await
761 .unwrap();
762 }
763
764 let mut subscription = engine
765 .subscribe(&pattern, AccessTier::Anon, Some(0))
766 .expect("subscription opens before shutdown");
767 engine.shutdown();
768 assert_eq!(
769 subscription.recv().await.unwrap().path.as_str(),
770 "home/replay/a"
771 );
772 assert_eq!(
773 subscription.recv().await.unwrap().path.as_str(),
774 "home/replay/b"
775 );
776 assert!(matches!(
777 subscription.recv().await,
778 Err(SubscriptionRecvError::Closed)
779 ));
780
781 drop(subscription);
782 drop(engine);
783 let _ = std::fs::remove_dir_all(root);
784 }
785}