1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
8
9use std::collections::VecDeque;
10
11use bytes::Bytes;
12
13use crate::{
14 auth,
15 delete_ops::{self, DeleteRequest, DeleteTraceHooks},
16 engine::{self, Engine, EngineError},
17 engine_types::{
18 AccessTier, ChangeEvent, EngineSubscription, Preconditions, ReadResult, Representation,
19 SubscribePattern, SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
20 },
21 etag, event, world_ops, AuthGate, BlockingSqliteError, Core,
22};
23
24pub(crate) struct EngineOps<'a> {
25 core: &'a Core,
26}
27
28struct SubscribePermit {
29 pattern: SubscribePattern,
30 slot: tokio::sync::OwnedSemaphorePermit,
31}
32
33impl<'a> EngineOps<'a> {
34 pub(crate) fn new(core: &'a Core) -> Self {
35 Self { core }
36 }
37
38 pub(crate) fn core(&self) -> &'a Core {
39 self.core
40 }
41
42 pub(crate) fn read(
43 &self,
44 world: &ValidatedWorldPath,
45 tier: auth::Tier,
46 ) -> Result<Option<ReadResult>, EngineError> {
47 let permit = world_ops::authorize_read(self.core, world, tier)?;
48 match world_ops::read_world(self.core, &permit)
49 .map_err(|err| read_error_to_engine(err, Some(world.as_str())))?
50 {
51 world_ops::ReadOutcome::Found { stage, etag } => Ok(Some(ReadResult::new(
52 Representation::new(Bytes::from(stage.body), stage.content_type, stage.headers),
53 etag,
54 ))),
55 world_ops::ReadOutcome::Missing => Ok(None),
56 }
57 }
58
59 pub(crate) async fn replace<H: world_ops::WriteTraceHooks + ?Sized>(
60 &self,
61 world: &ValidatedWorldPath,
62 representation: Representation,
63 preconditions: Preconditions,
64 tier: auth::Tier,
65 hooks: &H,
66 ) -> Result<WriteResult, EngineError> {
67 let permit = world_ops::authorize_write(world, tier)?;
68 let outcome = world_ops::replace_write(
69 self.core,
70 &permit,
71 world_ops::ReplaceRequest {
72 body: representation.body,
73 content_type: representation.content_type,
74 headers: representation.headers,
75 preconditions: preconditions.into(),
76 },
77 hooks,
78 )
79 .await
80 .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
81 Ok(outcome.into())
82 }
83
84 pub(crate) async fn append<H: world_ops::WriteTraceHooks + ?Sized>(
85 &self,
86 world: &ValidatedWorldPath,
87 body: Bytes,
88 preconditions: Preconditions,
89 tier: auth::Tier,
90 hooks: &H,
91 ) -> Result<WriteResult, EngineError> {
92 let permit = world_ops::authorize_write(world, tier)?;
93 let outcome = world_ops::append_write(
94 self.core,
95 &permit,
96 world_ops::AppendRequest {
97 body,
98 preconditions: preconditions.into(),
99 },
100 hooks,
101 )
102 .await
103 .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
104 Ok(outcome.into())
105 }
106
107 pub(crate) async fn delete<H: DeleteTraceHooks + ?Sized>(
108 &self,
109 world: &ValidatedWorldPath,
110 req: DeleteRequest,
111 tier: auth::Tier,
112 hooks: &H,
113 ) -> Result<(), delete_ops::DeleteError> {
114 let permit = delete_ops::authorize_delete(world, tier)?;
115 delete_ops::delete(self.core, &permit, req, hooks).await
116 }
117
118 pub(crate) fn subscribe(
119 &self,
120 pattern: &SubscribePattern,
121 tier: auth::Tier,
122 since: Option<u64>,
123 ) -> Result<EngineSubscription, EngineError> {
124 let permit = self.authorize_subscribe(pattern, tier)?;
125 Ok(self.open_subscription(permit, since))
126 }
127
128 fn authorize_subscribe(
129 &self,
130 pattern: &SubscribePattern,
131 tier: auth::Tier,
132 ) -> Result<SubscribePermit, EngineError> {
133 if !crate::can_read(self.core, tier) {
134 return Err(EngineError::Auth(AuthGate::Read));
135 }
136 if *self.core.shutdown.borrow() {
137 return Err(EngineError::ShuttingDown);
138 }
139 let slot = self
140 .core
141 .listen_slots
142 .clone()
143 .try_acquire_owned()
144 .map_err(|_| EngineError::SubscriptionLimit)?;
145 Ok(SubscribePermit {
146 pattern: pattern.clone(),
147 slot,
148 })
149 }
150
151 fn open_subscription(&self, permit: SubscribePermit, since: Option<u64>) -> EngineSubscription {
152 let rx = self.core.events.subscribe();
153 let (lag, replay, live_floor) = replay_after(self.core, since, &permit.pattern);
154 let replay_mode = since.is_some();
155 let mut initial = VecDeque::new();
156 if let Some(skipped) = lag {
157 initial.push_back(Err(SubscriptionRecvError::Lagged { skipped }));
158 }
159 initial.extend(replay.into_iter().map(Ok));
160 EngineSubscription::new(
161 permit.slot,
162 initial,
163 rx,
164 permit.pattern,
165 replay_mode,
166 live_floor,
167 self.core.shutdown.clone(),
168 )
169 }
170}
171
172impl Engine {
173 pub fn read(
186 &self,
187 world: &ValidatedWorldPath,
188 tier: AccessTier,
189 ) -> Result<Option<ReadResult>, EngineError> {
190 EngineOps::new(self.core()).read(world, tier.into())
191 }
192
193 pub async fn replace(
211 &self,
212 world: &ValidatedWorldPath,
213 representation: Representation,
214 preconditions: Preconditions,
215 tier: AccessTier,
216 ) -> Result<WriteResult, EngineError> {
217 EngineOps::new(self.core())
218 .replace(
219 world,
220 representation,
221 preconditions,
222 tier.into(),
223 &NoopWriteTrace,
224 )
225 .await
226 }
227
228 pub async fn append(
236 &self,
237 world: &ValidatedWorldPath,
238 body: Bytes,
239 preconditions: Preconditions,
240 tier: AccessTier,
241 ) -> Result<WriteResult, EngineError> {
242 EngineOps::new(self.core())
243 .append(world, body, preconditions, tier.into(), &NoopWriteTrace)
244 .await
245 }
246
247 pub async fn delete(
264 &self,
265 world: &ValidatedWorldPath,
266 preconditions: Preconditions,
267 tier: AccessTier,
268 ) -> Result<(), EngineError> {
269 EngineOps::new(self.core())
270 .delete(
271 world,
272 DeleteRequest {
273 preconditions,
274 content_type: String::new(),
275 headers: Vec::new(),
276 },
277 tier.into(),
278 &NoopDeleteTrace,
279 )
280 .await
281 .map_err(Into::into)
282 }
283
284 pub fn subscribe(
302 &self,
303 pattern: &SubscribePattern,
304 tier: AccessTier,
305 since: Option<u64>,
306 ) -> Result<EngineSubscription, EngineError> {
307 EngineOps::new(self.core()).subscribe(pattern, tier.into(), since)
308 }
309}
310
311struct NoopWriteTrace;
312
313impl world_ops::WriteTraceHooks for NoopWriteTrace {}
314
315struct NoopDeleteTrace;
316
317impl DeleteTraceHooks for NoopDeleteTrace {}
318
319pub(crate) fn replay_after(
320 core: &Core,
321 since: Option<u64>,
322 pattern: &SubscribePattern,
323) -> (Option<u64>, Vec<ChangeEvent>, u64) {
324 let Some(last_id) = since else {
325 return (None, Vec::new(), 0);
326 };
327 let log = core
328 .event_log
329 .lock()
330 .unwrap_or_else(|poison| poison.into_inner());
331 let gap = log.front().and_then(|oldest| {
332 let expected_next = last_id.saturating_add(1);
333 if expected_next < oldest.id {
334 Some(oldest.id - expected_next)
335 } else {
336 None
337 }
338 });
339 let replay: Vec<ChangeEvent> = log
340 .iter()
341 .filter(|change| change.id > last_id && event::matches(pattern.as_str(), &change.path))
342 .cloned()
343 .map(Into::into)
344 .collect();
345 let live_floor = replay.last().map(|change| change.id).unwrap_or(last_id);
346 (gap, replay, live_floor)
347}
348
349impl From<Preconditions> for etag::Preconditions {
350 fn from(value: Preconditions) -> Self {
351 etag::Preconditions::new(
352 value.if_match.into_iter().map(Into::into).collect(),
353 value.if_none_match.into_iter().map(Into::into).collect(),
354 )
355 }
356}
357
358impl From<etag::Preconditions> for Preconditions {
359 fn from(value: etag::Preconditions) -> Self {
360 let (if_match, if_none_match) = value.into_parts();
361 Self::new(
362 if_match.into_iter().map(Into::into).collect(),
363 if_none_match.into_iter().map(Into::into).collect(),
364 )
365 }
366}
367
368impl From<world_ops::WriteOutcome> for WriteResult {
369 fn from(value: world_ops::WriteOutcome) -> Self {
370 Self::new(
371 match value.status_kind {
372 world_ops::WriteStatusKind::Created => WriteKind::Created,
373 world_ops::WriteStatusKind::Updated => WriteKind::Updated,
374 },
375 value.etag,
376 )
377 }
378}
379
380impl From<AccessTier> for auth::Tier {
381 fn from(value: AccessTier) -> Self {
382 match value {
383 AccessTier::Anon => Self::Anon,
384 AccessTier::Read => Self::Read,
385 AccessTier::Write => Self::Write,
386 AccessTier::Approve => Self::Approve,
387 }
388 }
389}
390
391impl From<world_ops::ReadError> for EngineError {
392 fn from(value: world_ops::ReadError) -> Self {
393 read_error_to_engine(value, None)
394 }
395}
396
397impl From<world_ops::WriteError> for EngineError {
398 fn from(value: world_ops::WriteError) -> Self {
399 write_error_to_engine(value, None)
400 }
401}
402
403fn storage_op_label(op: world_ops::StorageOp) -> &'static str {
404 match op {
405 world_ops::StorageOp::Read => "read",
406 world_ops::StorageOp::WriteAudit => "write_audit",
407 }
408}
409
410fn read_error_to_engine(value: world_ops::ReadError, world: Option<&str>) -> EngineError {
411 match value {
412 world_ops::ReadError::Auth(gate) => EngineError::Auth(gate),
413 world_ops::ReadError::TransientStorage { scope, err } => {
414 log_storage_error(scope, &err, "read", world);
415 EngineError::TransientStorage {
416 sqlite_code: engine::sqlite_code(&err),
417 }
418 }
419 world_ops::ReadError::InsufficientStorage { scope, err } => {
420 log_storage_error(scope, &err, "read", world);
421 EngineError::InsufficientStorage {
422 sqlite_code: engine::sqlite_code(&err),
423 }
424 }
425 world_ops::ReadError::StorageRead { scope, err } => {
426 log_storage_error(scope, &err, "read", world);
427 EngineError::Storage {
428 sqlite_code: engine::sqlite_code(&err),
429 }
430 }
431 world_ops::ReadError::PermitWorldMismatch => {
432 EngineError::InternalInvariant("read permit world mismatch")
433 }
434 }
435}
436
437fn write_error_to_engine(value: world_ops::WriteError, world: Option<&str>) -> EngineError {
438 match value {
439 world_ops::WriteError::Auth(gate) => EngineError::Auth(gate),
440 world_ops::WriteError::PayloadTooLarge { max } => EngineError::PayloadTooLarge { max },
441 world_ops::WriteError::PreconditionFailed { message } => {
442 EngineError::PreconditionFailed { message }
443 }
444 world_ops::WriteError::NotFound => EngineError::NotFound,
445 world_ops::WriteError::QuotaExceeded {
446 used,
447 quota,
448 projected,
449 } => EngineError::QuotaExceeded {
450 used,
451 quota,
452 projected,
453 },
454 world_ops::WriteError::TransientStorage { scope, err, op } => {
455 log_storage_error(scope, &err, storage_op_label(op), world);
456 EngineError::TransientStorage {
457 sqlite_code: engine::sqlite_code(&err),
458 }
459 }
460 world_ops::WriteError::InsufficientStorage { scope, err, op } => {
461 log_storage_error(scope, &err, storage_op_label(op), world);
462 EngineError::InsufficientStorage {
463 sqlite_code: engine::sqlite_code(&err),
464 }
465 }
466 world_ops::WriteError::StorageRead { scope, err } => {
467 log_storage_error(scope, &err, "read", world);
468 EngineError::Storage {
469 sqlite_code: engine::sqlite_code(&err),
470 }
471 }
472 world_ops::WriteError::StorageWriteAudit { scope, err } => {
473 log_storage_error(scope, &err, "write_audit", world);
474 EngineError::Storage {
475 sqlite_code: engine::sqlite_code(&err),
476 }
477 }
478 world_ops::WriteError::Internal(message) => EngineError::InternalInvariant(message),
479 }
480}
481
482pub(crate) fn log_storage_error(
483 scope: &'static str,
484 err: &rusqlite::Error,
485 operation: &'static str,
486 world: Option<&str>,
487) {
488 #[cfg(feature = "unstable-engine")]
489 tracing::error!(
490 scope,
491 operation,
492 world = world.unwrap_or(""),
493 sqlite_code = ?engine::sqlite_code(err),
494 error = %err,
495 "engine storage error"
496 );
497
498 #[cfg(not(feature = "unstable-engine"))]
499 match world {
500 Some(world) => {
501 eprintln!("elastik-core internal {scope} ({operation}) world={world}: {err}");
502 }
503 None => eprintln!("elastik-core internal {scope} ({operation}): {err}"),
504 }
505}
506
507pub(crate) fn log_blocking_storage_error(
508 scope: &'static str,
509 err: &BlockingSqliteError,
510 operation: &'static str,
511 world: Option<&str>,
512) {
513 match err {
514 BlockingSqliteError::Sqlite(err) => log_storage_error(scope, err, operation, world),
515 BlockingSqliteError::Worker => {
516 #[cfg(feature = "unstable-engine")]
517 tracing::error!(
518 scope,
519 operation,
520 world = world.unwrap_or(""),
521 "engine storage worker failed"
522 );
523
524 #[cfg(not(feature = "unstable-engine"))]
525 match world {
526 Some(world) => {
527 eprintln!(
528 "elastik-core internal {scope} ({operation}) world={world}: sqlite worker failed"
529 );
530 }
531 None => {
532 eprintln!("elastik-core internal {scope} ({operation}): sqlite worker failed");
533 }
534 }
535 }
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use std::path::PathBuf;
542 use std::time::{SystemTime, UNIX_EPOCH};
543
544 use bytes::Bytes;
545
546 use super::*;
547 use crate::engine_types::SecretBytes;
548
549 fn temp_root(name: &str) -> PathBuf {
550 let nonce = SystemTime::now()
551 .duration_since(UNIX_EPOCH)
552 .unwrap()
553 .as_nanos();
554 let root = std::env::temp_dir().join(format!(
555 "elastik-engine-ops-{name}-{}-{nonce}",
556 std::process::id()
557 ));
558 let _ = std::fs::remove_dir_all(&root);
559 root
560 }
561
562 fn test_engine(name: &str) -> (Engine, PathBuf) {
563 let root = temp_root(name);
564 let engine = Engine::builder()
565 .data_root(root.clone())
566 .key(SecretBytes::try_from_slice(b"key").unwrap())
567 .max_listen_connections(1)
568 .build()
569 .unwrap();
570 (engine, root)
571 }
572
573 #[tokio::test]
574 async fn engine_delete_requires_approve_and_removes_world() {
575 let (engine, root) = test_engine("delete");
576 let world = ValidatedWorldPath::new("home/delete-me").unwrap();
577 engine
578 .replace(
579 &world,
580 Representation::new(Bytes::from_static(b"alive"), "text/plain", Vec::new()),
581 Preconditions::none(),
582 AccessTier::Write,
583 )
584 .await
585 .unwrap();
586
587 assert!(matches!(
588 engine
589 .delete(&world, Preconditions::none(), AccessTier::Write)
590 .await,
591 Err(EngineError::Auth(AuthGate::Delete))
592 ));
593
594 engine
595 .delete(&world, Preconditions::none(), AccessTier::Approve)
596 .await
597 .unwrap();
598 assert!(engine.read(&world, AccessTier::Read).unwrap().is_none());
599
600 drop(engine);
601 let _ = std::fs::remove_dir_all(root);
602 }
603
604 #[tokio::test]
605 async fn engine_subscribe_requires_read_tier_and_replays_since_id() {
606 let root = temp_root("subscribe");
607 let engine = Engine::builder()
608 .data_root(root.clone())
609 .key(SecretBytes::try_from_slice(b"key").unwrap())
610 .read_token(b"reader".to_vec())
611 .build()
612 .unwrap();
613 let pattern = SubscribePattern::new("home/events/*");
614
615 assert!(matches!(
616 engine.subscribe(&pattern, AccessTier::Anon, None),
617 Err(EngineError::Auth(AuthGate::Read))
618 ));
619
620 let world = ValidatedWorldPath::new("home/events/a").unwrap();
621 engine
622 .replace(
623 &world,
624 Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
625 Preconditions::none(),
626 AccessTier::Write,
627 )
628 .await
629 .unwrap();
630
631 let mut subscription = engine
632 .subscribe(&pattern, AccessTier::Read, Some(0))
633 .expect("read tier subscribes");
634 let event = subscription.recv().await.expect("replay event");
635 assert_eq!(event.method, "PUT");
636 assert_eq!(event.path.as_str(), "home/events/a");
637
638 drop(subscription);
639 drop(engine);
640 let _ = std::fs::remove_dir_all(root);
641 }
642
643 #[test]
644 fn engine_subscribe_enforces_slot_cap_at_entry() {
645 let (engine, root) = test_engine("subscribe-cap");
646 let pattern = SubscribePattern::new("*");
647 let first = engine
648 .subscribe(&pattern, AccessTier::Anon, None)
649 .expect("first subscription consumes the sole slot");
650 assert!(matches!(
651 engine.subscribe(&pattern, AccessTier::Anon, None),
652 Err(EngineError::SubscriptionLimit)
653 ));
654
655 drop(first);
656 drop(engine);
657 let _ = std::fs::remove_dir_all(root);
658 }
659
660 #[test]
661 fn engine_subscribe_denied_auth_does_not_consume_slot() {
662 let root = temp_root("subscribe-auth-slot");
663 let engine = Engine::builder()
664 .data_root(root.clone())
665 .key(SecretBytes::try_from_slice(b"key").unwrap())
666 .read_token(b"reader".to_vec())
667 .max_listen_connections(1)
668 .build()
669 .unwrap();
670 let pattern = SubscribePattern::new("*");
671
672 assert!(matches!(
673 engine.subscribe(&pattern, AccessTier::Anon, None),
674 Err(EngineError::Auth(AuthGate::Read))
675 ));
676 let subscription = engine
677 .subscribe(&pattern, AccessTier::Read, None)
678 .expect("failed auth must not consume the only slot");
679
680 drop(subscription);
681 drop(engine);
682 let _ = std::fs::remove_dir_all(root);
683 }
684
685 #[tokio::test]
686 async fn engine_subscription_closed_is_terminal() {
687 let (engine, root) = test_engine("subscribe-closed");
688 let pattern = SubscribePattern::new("*");
689 let mut subscription = engine
690 .subscribe(&pattern, AccessTier::Anon, None)
691 .expect("subscription opens before shutdown");
692
693 engine.shutdown();
694 assert!(matches!(
695 subscription.recv().await,
696 Err(SubscriptionRecvError::Closed)
697 ));
698 assert!(matches!(
699 subscription.recv().await,
700 Err(SubscriptionRecvError::Closed)
701 ));
702
703 drop(subscription);
704 drop(engine);
705 let _ = std::fs::remove_dir_all(root);
706 }
707
708 #[tokio::test]
709 async fn engine_subscription_drains_replay_before_shutdown() {
710 let (engine, root) = test_engine("subscribe-replay-before-shutdown");
711 let pattern = SubscribePattern::new("home/replay/*");
712 for name in ["home/replay/a", "home/replay/b"] {
713 let world = ValidatedWorldPath::new(name).unwrap();
714 engine
715 .replace(
716 &world,
717 Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
718 Preconditions::none(),
719 AccessTier::Write,
720 )
721 .await
722 .unwrap();
723 }
724
725 let mut subscription = engine
726 .subscribe(&pattern, AccessTier::Anon, Some(0))
727 .expect("subscription opens before shutdown");
728 engine.shutdown();
729 assert_eq!(
730 subscription.recv().await.unwrap().path.as_str(),
731 "home/replay/a"
732 );
733 assert_eq!(
734 subscription.recv().await.unwrap().path.as_str(),
735 "home/replay/b"
736 );
737 assert!(matches!(
738 subscription.recv().await,
739 Err(SubscriptionRecvError::Closed)
740 ));
741
742 drop(subscription);
743 drop(engine);
744 let _ = std::fs::remove_dir_all(root);
745 }
746}