Skip to main content

elastik_core/
engine_ops.rs

1//! Engine operation seam over protocol-neutral world transitions.
2//!
3//! Public `Engine` methods delegate here, keeping one path from facade to
4//! `world_ops`.
5
6#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use std::collections::VecDeque;
9
10use bytes::Bytes;
11
12use crate::{
13    auth,
14    delete_ops::{self, DeleteRequest, DeleteTraceHooks},
15    engine::{self, Engine, EngineError},
16    engine_types::{
17        AccessTier, ChangeEvent, EngineSubscription, Preconditions, ReadResult, Representation,
18        SubscribePattern, SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
19    },
20    etag, event, world_ops, AuthGate, BlockingSqliteError, Core,
21};
22
23pub(crate) struct EngineOps<'a> {
24    core: &'a Core,
25}
26
27struct SubscribePermit {
28    pattern: SubscribePattern,
29    slot: tokio::sync::OwnedSemaphorePermit,
30}
31
32impl<'a> EngineOps<'a> {
33    pub(crate) fn new(core: &'a Core) -> Self {
34        Self { core }
35    }
36
37    pub(crate) fn core(&self) -> &'a Core {
38        self.core
39    }
40
41    pub(crate) fn read(
42        &self,
43        world: &ValidatedWorldPath,
44        tier: auth::Tier,
45    ) -> Result<Option<ReadResult>, EngineError> {
46        let permit = world_ops::authorize_read(self.core, world, tier)?;
47        match world_ops::read_world(self.core, &permit)
48            .map_err(|err| read_error_to_engine(err, Some(world.as_str())))?
49        {
50            world_ops::ReadOutcome::Found { stage, etag } => Ok(Some(ReadResult::new(
51                Representation::new(Bytes::from(stage.body), stage.content_type, stage.headers),
52                etag,
53            ))),
54            world_ops::ReadOutcome::Missing => Ok(None),
55        }
56    }
57
58    pub(crate) async fn replace<H: world_ops::WriteTraceHooks + ?Sized>(
59        &self,
60        world: &ValidatedWorldPath,
61        representation: Representation,
62        preconditions: Preconditions,
63        tier: auth::Tier,
64        hooks: &H,
65    ) -> Result<WriteResult, EngineError> {
66        let permit = world_ops::authorize_write(world, tier)?;
67        let outcome = world_ops::replace_write(
68            self.core,
69            &permit,
70            world_ops::ReplaceRequest {
71                body: representation.body,
72                content_type: representation.content_type,
73                headers: representation.headers,
74                preconditions: preconditions.into(),
75            },
76            hooks,
77        )
78        .await
79        .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
80        Ok(outcome.into())
81    }
82
83    pub(crate) async fn append<H: world_ops::WriteTraceHooks + ?Sized>(
84        &self,
85        world: &ValidatedWorldPath,
86        body: Bytes,
87        preconditions: Preconditions,
88        tier: auth::Tier,
89        hooks: &H,
90    ) -> Result<WriteResult, EngineError> {
91        let permit = world_ops::authorize_write(world, tier)?;
92        let outcome = world_ops::append_write(
93            self.core,
94            &permit,
95            world_ops::AppendRequest {
96                body,
97                preconditions: preconditions.into(),
98            },
99            hooks,
100        )
101        .await
102        .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
103        Ok(outcome.into())
104    }
105
106    pub(crate) async fn delete<H: DeleteTraceHooks + ?Sized>(
107        &self,
108        world: &ValidatedWorldPath,
109        req: DeleteRequest,
110        tier: auth::Tier,
111        hooks: &H,
112    ) -> Result<(), delete_ops::DeleteError> {
113        let permit = delete_ops::authorize_delete(world, tier)?;
114        delete_ops::delete(self.core, &permit, req, hooks).await
115    }
116
117    pub(crate) fn subscribe(
118        &self,
119        pattern: &SubscribePattern,
120        tier: auth::Tier,
121        since: Option<u64>,
122    ) -> Result<EngineSubscription, EngineError> {
123        let permit = self.authorize_subscribe(pattern, tier)?;
124        Ok(self.open_subscription(permit, since))
125    }
126
127    fn authorize_subscribe(
128        &self,
129        pattern: &SubscribePattern,
130        tier: auth::Tier,
131    ) -> Result<SubscribePermit, EngineError> {
132        if !crate::can_read(self.core, tier) {
133            return Err(EngineError::Auth(AuthGate::Read));
134        }
135        if *self.core.shutdown.borrow() {
136            return Err(EngineError::ShuttingDown);
137        }
138        let slot = self
139            .core
140            .listen_slots
141            .clone()
142            .try_acquire_owned()
143            .map_err(|_| EngineError::SubscriptionLimit)?;
144        Ok(SubscribePermit {
145            pattern: pattern.clone(),
146            slot,
147        })
148    }
149
150    fn open_subscription(&self, permit: SubscribePermit, since: Option<u64>) -> EngineSubscription {
151        let rx = self.core.events.subscribe();
152        let (lag, replay, live_floor) = replay_after(self.core, since, &permit.pattern);
153        let replay_mode = since.is_some();
154        let mut initial = VecDeque::new();
155        if let Some(skipped) = lag {
156            initial.push_back(Err(SubscriptionRecvError::Lagged { skipped }));
157        }
158        initial.extend(replay.into_iter().map(Ok));
159        EngineSubscription::new(
160            permit.slot,
161            initial,
162            rx,
163            permit.pattern,
164            replay_mode,
165            live_floor,
166            self.core.shutdown.clone(),
167        )
168    }
169}
170
171impl Engine {
172    /// Reads a world's full representation.
173    ///
174    /// # Returns
175    /// - `Ok(Some(ReadResult))` if the world exists.
176    /// - `Ok(None)` if the world does not exist (callers that want 404
177    ///   semantics handle this).
178    ///
179    /// # Errors
180    /// - [`EngineError::Auth`] if `tier` is below `Read`.
181    /// - [`EngineError::TransientStorage`] for SQLite `BUSY`/`LOCKED`.
182    /// - [`EngineError::InsufficientStorage`] for full-disk failures.
183    /// - [`EngineError::Storage`] for other storage errors.
184    pub fn read(
185        &self,
186        world: &ValidatedWorldPath,
187        tier: AccessTier,
188    ) -> Result<Option<ReadResult>, EngineError> {
189        EngineOps::new(self.core()).read(world, tier.into())
190    }
191
192    /// Replaces a world with the provided representation.
193    ///
194    /// Creates the world if it does not exist; otherwise overwrites the
195    /// body, content type, and headers, then advances the audit chain.
196    ///
197    /// # Errors
198    /// - [`EngineError::Auth`] if `tier` is below the namespace's write
199    ///   requirement (`Write` for `home/`, `Approve` for system
200    ///   namespaces).
201    /// - [`EngineError::PayloadTooLarge`] if the body exceeds the per-world
202    ///   cap.
203    /// - [`EngineError::PreconditionFailed`] if `preconditions` reject the
204    ///   write.
205    /// - [`EngineError::QuotaExceeded`] for durable-storage quota failures.
206    /// - [`EngineError::TransientStorage`] /
207    ///   [`EngineError::InsufficientStorage`] / [`EngineError::Storage`]
208    ///   for storage-layer failures.
209    pub async fn replace(
210        &self,
211        world: &ValidatedWorldPath,
212        representation: Representation,
213        preconditions: Preconditions,
214        tier: AccessTier,
215    ) -> Result<WriteResult, EngineError> {
216        EngineOps::new(self.core())
217            .replace(
218                world,
219                representation,
220                preconditions,
221                tier.into(),
222                &NoopWriteTrace,
223            )
224            .await
225    }
226
227    /// Appends bytes to a world's body and advances the audit chain.
228    ///
229    /// Same auth requirements and error variants as [`Engine::replace`].
230    /// The world's content type and metadata headers are unchanged.
231    ///
232    /// # Errors
233    /// Same as [`Engine::replace`].
234    pub async fn append(
235        &self,
236        world: &ValidatedWorldPath,
237        body: Bytes,
238        preconditions: Preconditions,
239        tier: AccessTier,
240    ) -> Result<WriteResult, EngineError> {
241        EngineOps::new(self.core())
242            .append(world, body, preconditions, tier.into(), &NoopWriteTrace)
243            .await
244    }
245
246    /// Deletes a world with default, empty audit metadata.
247    ///
248    /// Convenience wrapper around the delete transition that records empty
249    /// content-type and headers in the audit intent. Adapters that need to
250    /// preserve the deleted representation's metadata in the audit log
251    /// should call [`Engine::delete_traced`] with a populated
252    /// [`crate::DeleteMetadata`].
253    ///
254    /// # Errors
255    /// - [`EngineError::Auth`] if `tier` is below `Approve`.
256    /// - [`EngineError::AppendOnly`] for append-only worlds (e.g.
257    ///   `var/log/deletes`).
258    /// - [`EngineError::PreconditionFailed`] / [`EngineError::NotFound`].
259    /// - [`EngineError::TransientStorage`] /
260    ///   [`EngineError::InsufficientStorage`] / [`EngineError::Storage`]
261    ///   for storage-layer failures.
262    pub async fn delete(
263        &self,
264        world: &ValidatedWorldPath,
265        preconditions: Preconditions,
266        tier: AccessTier,
267    ) -> Result<(), EngineError> {
268        EngineOps::new(self.core())
269            .delete(
270                world,
271                DeleteRequest {
272                    preconditions,
273                    content_type: String::new(),
274                    headers: Vec::new(),
275                },
276                tier.into(),
277                &NoopDeleteTrace,
278            )
279            .await
280            .map_err(Into::into)
281    }
282
283    /// Subscribes to change events matching `pattern`.
284    ///
285    /// If `since` is `Some(id)`, the subscription replays every event with
286    /// `id > since` from the in-memory ring before switching to the live
287    /// stream. Replay is bounded by the configured `listen_replay_max`; if
288    /// `since` is older than the ring's floor, the first `recv` call yields
289    /// a [`crate::SubscriptionRecvError::Lagged`] error.
290    ///
291    /// The returned [`EngineSubscription`] holds a subscription slot until
292    /// dropped; drop it promptly when finished so other subscribers can
293    /// join.
294    ///
295    /// # Errors
296    /// - [`EngineError::Auth`] if `tier` is below `Read`.
297    /// - [`EngineError::SubscriptionLimit`] if the slot pool is full.
298    /// - [`EngineError::ShuttingDown`] if [`Engine::shutdown`] has been
299    ///   called.
300    pub fn subscribe(
301        &self,
302        pattern: &SubscribePattern,
303        tier: AccessTier,
304        since: Option<u64>,
305    ) -> Result<EngineSubscription, EngineError> {
306        EngineOps::new(self.core()).subscribe(pattern, tier.into(), since)
307    }
308}
309
310struct NoopWriteTrace;
311
312impl world_ops::WriteTraceHooks for NoopWriteTrace {}
313
314struct NoopDeleteTrace;
315
316impl DeleteTraceHooks for NoopDeleteTrace {}
317
318pub(crate) fn replay_after(
319    core: &Core,
320    since: Option<u64>,
321    pattern: &SubscribePattern,
322) -> (Option<u64>, Vec<ChangeEvent>, u64) {
323    let Some(last_id) = since else {
324        return (None, Vec::new(), 0);
325    };
326    let log = core
327        .event_log
328        .lock()
329        .unwrap_or_else(|poison| poison.into_inner());
330    let gap = log.front().and_then(|oldest| {
331        let expected_next = last_id.saturating_add(1);
332        if expected_next < oldest.id {
333            Some(oldest.id - expected_next)
334        } else {
335            None
336        }
337    });
338    let replay: Vec<ChangeEvent> = log
339        .iter()
340        .filter(|change| change.id > last_id && event::matches(pattern.as_str(), &change.path))
341        .cloned()
342        .map(Into::into)
343        .collect();
344    let live_floor = replay.last().map(|change| change.id).unwrap_or(last_id);
345    (gap, replay, live_floor)
346}
347
348impl From<Preconditions> for etag::Preconditions {
349    fn from(value: Preconditions) -> Self {
350        etag::Preconditions::new(
351            value.if_match.into_iter().map(Into::into).collect(),
352            value.if_none_match.into_iter().map(Into::into).collect(),
353        )
354    }
355}
356
357impl From<etag::Preconditions> for Preconditions {
358    fn from(value: etag::Preconditions) -> Self {
359        let (if_match, if_none_match) = value.into_parts();
360        Self::new(
361            if_match.into_iter().map(Into::into).collect(),
362            if_none_match.into_iter().map(Into::into).collect(),
363        )
364    }
365}
366
367impl From<world_ops::WriteOutcome> for WriteResult {
368    fn from(value: world_ops::WriteOutcome) -> Self {
369        Self::new(
370            match value.status_kind {
371                world_ops::WriteStatusKind::Created => WriteKind::Created,
372                world_ops::WriteStatusKind::Updated => WriteKind::Updated,
373            },
374            value.etag,
375        )
376    }
377}
378
379impl From<AccessTier> for auth::Tier {
380    fn from(value: AccessTier) -> Self {
381        match value {
382            AccessTier::Anon => Self::Anon,
383            AccessTier::Read => Self::Read,
384            AccessTier::Write => Self::Write,
385            AccessTier::Approve => Self::Approve,
386        }
387    }
388}
389
390impl From<world_ops::ReadError> for EngineError {
391    fn from(value: world_ops::ReadError) -> Self {
392        read_error_to_engine(value, None)
393    }
394}
395
396impl From<world_ops::WriteError> for EngineError {
397    fn from(value: world_ops::WriteError) -> Self {
398        write_error_to_engine(value, None)
399    }
400}
401
402fn storage_op_label(op: world_ops::StorageOp) -> &'static str {
403    match op {
404        world_ops::StorageOp::Read => "read",
405        world_ops::StorageOp::WriteAudit => "write_audit",
406    }
407}
408
409fn read_error_to_engine(value: world_ops::ReadError, world: Option<&str>) -> EngineError {
410    match value {
411        world_ops::ReadError::Auth(gate) => EngineError::Auth(gate),
412        world_ops::ReadError::TransientStorage { scope, err } => {
413            log_storage_error(scope, &err, "read", world);
414            EngineError::TransientStorage {
415                sqlite_code: engine::sqlite_code(&err),
416            }
417        }
418        world_ops::ReadError::InsufficientStorage { scope, err } => {
419            log_storage_error(scope, &err, "read", world);
420            EngineError::InsufficientStorage {
421                sqlite_code: engine::sqlite_code(&err),
422            }
423        }
424        world_ops::ReadError::StorageRead { scope, err } => {
425            log_storage_error(scope, &err, "read", world);
426            EngineError::Storage {
427                sqlite_code: engine::sqlite_code(&err),
428            }
429        }
430        world_ops::ReadError::PermitWorldMismatch => {
431            EngineError::InternalInvariant("read permit world mismatch")
432        }
433    }
434}
435
436fn write_error_to_engine(value: world_ops::WriteError, world: Option<&str>) -> EngineError {
437    match value {
438        world_ops::WriteError::Auth(gate) => EngineError::Auth(gate),
439        world_ops::WriteError::PayloadTooLarge { max } => EngineError::PayloadTooLarge { max },
440        world_ops::WriteError::PreconditionFailed { message } => {
441            EngineError::PreconditionFailed { message }
442        }
443        world_ops::WriteError::NotFound => EngineError::NotFound,
444        world_ops::WriteError::QuotaExceeded {
445            used,
446            quota,
447            projected,
448        } => EngineError::QuotaExceeded {
449            used,
450            quota,
451            projected,
452        },
453        world_ops::WriteError::TransientStorage { scope, err, op } => {
454            log_storage_error(scope, &err, storage_op_label(op), world);
455            EngineError::TransientStorage {
456                sqlite_code: engine::sqlite_code(&err),
457            }
458        }
459        world_ops::WriteError::InsufficientStorage { scope, err, op } => {
460            log_storage_error(scope, &err, storage_op_label(op), world);
461            EngineError::InsufficientStorage {
462                sqlite_code: engine::sqlite_code(&err),
463            }
464        }
465        world_ops::WriteError::StorageRead { scope, err } => {
466            log_storage_error(scope, &err, "read", world);
467            EngineError::Storage {
468                sqlite_code: engine::sqlite_code(&err),
469            }
470        }
471        world_ops::WriteError::StorageWriteAudit { scope, err } => {
472            log_storage_error(scope, &err, "write_audit", world);
473            EngineError::Storage {
474                sqlite_code: engine::sqlite_code(&err),
475            }
476        }
477        world_ops::WriteError::Internal(message) => EngineError::InternalInvariant(message),
478    }
479}
480
481pub(crate) fn log_storage_error(
482    scope: &'static str,
483    err: &rusqlite::Error,
484    operation: &'static str,
485    world: Option<&str>,
486) {
487    #[cfg(feature = "unstable-engine")]
488    tracing::error!(
489        scope,
490        operation,
491        world = world.unwrap_or(""),
492        sqlite_code = ?engine::sqlite_code(err),
493        error = %err,
494        "engine storage error"
495    );
496
497    #[cfg(not(feature = "unstable-engine"))]
498    match world {
499        Some(world) => {
500            eprintln!("elastik-core internal {scope} ({operation}) world={world}: {err}");
501        }
502        None => eprintln!("elastik-core internal {scope} ({operation}): {err}"),
503    }
504}
505
506pub(crate) fn log_blocking_storage_error(
507    scope: &'static str,
508    err: &BlockingSqliteError,
509    operation: &'static str,
510    world: Option<&str>,
511) {
512    match err {
513        BlockingSqliteError::Sqlite(err) => log_storage_error(scope, err, operation, world),
514        BlockingSqliteError::Worker => {
515            #[cfg(feature = "unstable-engine")]
516            tracing::error!(
517                scope,
518                operation,
519                world = world.unwrap_or(""),
520                "engine storage worker failed"
521            );
522
523            #[cfg(not(feature = "unstable-engine"))]
524            match world {
525                Some(world) => {
526                    eprintln!(
527                        "elastik-core internal {scope} ({operation}) world={world}: sqlite worker failed"
528                    );
529                }
530                None => {
531                    eprintln!("elastik-core internal {scope} ({operation}): sqlite worker failed");
532                }
533            }
534        }
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use std::path::PathBuf;
541    use std::time::{SystemTime, UNIX_EPOCH};
542
543    use bytes::Bytes;
544
545    use super::*;
546    use crate::engine_types::{ChangeVerb, SecretBytes};
547
548    fn temp_root(name: &str) -> PathBuf {
549        let nonce = SystemTime::now()
550            .duration_since(UNIX_EPOCH)
551            .unwrap()
552            .as_nanos();
553        let root = std::env::temp_dir().join(format!(
554            "elastik-engine-ops-{name}-{}-{nonce}",
555            std::process::id()
556        ));
557        let _ = std::fs::remove_dir_all(&root);
558        root
559    }
560
561    fn test_engine(name: &str) -> (Engine, PathBuf) {
562        let root = temp_root(name);
563        let engine = Engine::builder()
564            .data_root(root.clone())
565            .key(SecretBytes::try_from_slice(b"key").unwrap())
566            .max_listen_connections(1)
567            .build()
568            .unwrap();
569        (engine, root)
570    }
571
572    #[test]
573    fn replay_after_reports_ring_gap_and_replays_available_events() {
574        let (engine, root) = test_engine("replay-gap");
575        {
576            let mut log = engine.core().event_log.lock().unwrap();
577            for id in 10..=12 {
578                log.push_back(event::ChangeEvent {
579                    id,
580                    verb: ChangeVerb::Replace,
581                    path: format!("/home/task/{id}"),
582                    etag: format!("hmac-{id}"),
583                });
584            }
585        }
586
587        let pattern = SubscribePattern::new("home/task/*");
588        let (gap, replay, floor) = replay_after(engine.core(), Some(5), &pattern);
589
590        assert_eq!(gap, Some(4));
591        assert_eq!(replay.len(), 3);
592        assert_eq!(replay[0].id, 10);
593        assert_eq!(replay[0].path.as_str(), "home/task/10");
594        assert_eq!(floor, 12);
595
596        drop(engine);
597        let _ = std::fs::remove_dir_all(root);
598    }
599
600    #[test]
601    fn replay_after_handles_max_last_event_id_without_overflow() {
602        let (engine, root) = test_engine("replay-max-last-id");
603        {
604            let mut log = engine.core().event_log.lock().unwrap();
605            log.push_back(event::ChangeEvent {
606                id: u64::MAX,
607                verb: ChangeVerb::Replace,
608                path: "/home/task/max".to_string(),
609                etag: "hmac-max".to_string(),
610            });
611        }
612
613        let pattern = SubscribePattern::new("home/task/*");
614        let (gap, replay, floor) = replay_after(engine.core(), Some(u64::MAX), &pattern);
615
616        assert_eq!(gap, None);
617        assert!(replay.is_empty());
618        assert_eq!(floor, u64::MAX);
619
620        drop(engine);
621        let _ = std::fs::remove_dir_all(root);
622    }
623
624    #[tokio::test]
625    async fn engine_delete_requires_approve_and_removes_world() {
626        let (engine, root) = test_engine("delete");
627        let world = ValidatedWorldPath::new("home/delete-me").unwrap();
628        engine
629            .replace(
630                &world,
631                Representation::new(Bytes::from_static(b"alive"), "text/plain", Vec::new()),
632                Preconditions::none(),
633                AccessTier::Write,
634            )
635            .await
636            .unwrap();
637
638        assert!(matches!(
639            engine
640                .delete(&world, Preconditions::none(), AccessTier::Write)
641                .await,
642            Err(EngineError::Auth(AuthGate::Delete))
643        ));
644
645        engine
646            .delete(&world, Preconditions::none(), AccessTier::Approve)
647            .await
648            .unwrap();
649        assert!(engine.read(&world, AccessTier::Read).unwrap().is_none());
650
651        drop(engine);
652        let _ = std::fs::remove_dir_all(root);
653    }
654
655    #[tokio::test]
656    async fn engine_subscribe_requires_read_tier_and_replays_since_id() {
657        let root = temp_root("subscribe");
658        let engine = Engine::builder()
659            .data_root(root.clone())
660            .key(SecretBytes::try_from_slice(b"key").unwrap())
661            .read_token(b"reader".to_vec())
662            .build()
663            .unwrap();
664        let pattern = SubscribePattern::new("home/events/*");
665
666        assert!(matches!(
667            engine.subscribe(&pattern, AccessTier::Anon, None),
668            Err(EngineError::Auth(AuthGate::Read))
669        ));
670
671        let world = ValidatedWorldPath::new("home/events/a").unwrap();
672        engine
673            .replace(
674                &world,
675                Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
676                Preconditions::none(),
677                AccessTier::Write,
678            )
679            .await
680            .unwrap();
681
682        let mut subscription = engine
683            .subscribe(&pattern, AccessTier::Read, Some(0))
684            .expect("read tier subscribes");
685        let event = subscription.recv().await.expect("replay event");
686        assert_eq!(event.verb, ChangeVerb::Replace);
687        assert_eq!(event.path.as_str(), "home/events/a");
688
689        drop(subscription);
690        drop(engine);
691        let _ = std::fs::remove_dir_all(root);
692    }
693
694    #[test]
695    fn engine_subscribe_enforces_slot_cap_at_entry() {
696        let (engine, root) = test_engine("subscribe-cap");
697        let pattern = SubscribePattern::new("*");
698        let first = engine
699            .subscribe(&pattern, AccessTier::Anon, None)
700            .expect("first subscription consumes the sole slot");
701        assert!(matches!(
702            engine.subscribe(&pattern, AccessTier::Anon, None),
703            Err(EngineError::SubscriptionLimit)
704        ));
705
706        drop(first);
707        drop(engine);
708        let _ = std::fs::remove_dir_all(root);
709    }
710
711    #[test]
712    fn engine_subscribe_denied_auth_does_not_consume_slot() {
713        let root = temp_root("subscribe-auth-slot");
714        let engine = Engine::builder()
715            .data_root(root.clone())
716            .key(SecretBytes::try_from_slice(b"key").unwrap())
717            .read_token(b"reader".to_vec())
718            .max_listen_connections(1)
719            .build()
720            .unwrap();
721        let pattern = SubscribePattern::new("*");
722
723        assert!(matches!(
724            engine.subscribe(&pattern, AccessTier::Anon, None),
725            Err(EngineError::Auth(AuthGate::Read))
726        ));
727        let subscription = engine
728            .subscribe(&pattern, AccessTier::Read, None)
729            .expect("failed auth must not consume the only slot");
730
731        drop(subscription);
732        drop(engine);
733        let _ = std::fs::remove_dir_all(root);
734    }
735
736    #[tokio::test]
737    async fn engine_subscription_closed_is_terminal() {
738        let (engine, root) = test_engine("subscribe-closed");
739        let pattern = SubscribePattern::new("*");
740        let mut subscription = engine
741            .subscribe(&pattern, AccessTier::Anon, None)
742            .expect("subscription opens before shutdown");
743
744        engine.shutdown();
745        assert!(matches!(
746            subscription.recv().await,
747            Err(SubscriptionRecvError::Closed)
748        ));
749        assert!(matches!(
750            subscription.recv().await,
751            Err(SubscriptionRecvError::Closed)
752        ));
753
754        drop(subscription);
755        drop(engine);
756        let _ = std::fs::remove_dir_all(root);
757    }
758
759    #[tokio::test]
760    async fn engine_subscription_drains_replay_before_shutdown() {
761        let (engine, root) = test_engine("subscribe-replay-before-shutdown");
762        let pattern = SubscribePattern::new("home/replay/*");
763        for name in ["home/replay/a", "home/replay/b"] {
764            let world = ValidatedWorldPath::new(name).unwrap();
765            engine
766                .replace(
767                    &world,
768                    Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
769                    Preconditions::none(),
770                    AccessTier::Write,
771                )
772                .await
773                .unwrap();
774        }
775
776        let mut subscription = engine
777            .subscribe(&pattern, AccessTier::Anon, Some(0))
778            .expect("subscription opens before shutdown");
779        engine.shutdown();
780        assert_eq!(
781            subscription.recv().await.unwrap().path.as_str(),
782            "home/replay/a"
783        );
784        assert_eq!(
785            subscription.recv().await.unwrap().path.as_str(),
786            "home/replay/b"
787        );
788        assert!(matches!(
789            subscription.recv().await,
790            Err(SubscriptionRecvError::Closed)
791        ));
792
793        drop(subscription);
794        drop(engine);
795        let _ = std::fs::remove_dir_all(root);
796    }
797}