Skip to main content

elastik_core/
engine_ops.rs

1//! Engine operation seam over protocol-neutral world transitions.
2//!
3//! HTTP and CoAP still live inside the library crate today, so they use the
4//! crate-private `EngineOps` view. Public `Engine` methods delegate here too,
5//! keeping one path from facade to `world_ops`.
6
7#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
8
9use std::collections::VecDeque;
10
11use bytes::Bytes;
12
13use crate::{
14    auth,
15    delete_ops::{self, DeleteRequest, DeleteTraceHooks},
16    engine::{self, Engine, EngineError},
17    engine_types::{
18        AccessTier, ChangeEvent, EngineSubscription, EtagMatcher, Preconditions, ReadResult,
19        Representation, SubscribePattern, SubscriptionRecvError, ValidatedWorldPath, WriteKind,
20        WriteResult,
21    },
22    etag, event, world_ops, AuthGate, BlockingSqliteError, Core,
23};
24
25pub(crate) struct EngineOps<'a> {
26    core: &'a Core,
27}
28
29struct SubscribePermit {
30    pattern: SubscribePattern,
31    slot: tokio::sync::OwnedSemaphorePermit,
32}
33
34impl<'a> EngineOps<'a> {
35    pub(crate) fn new(core: &'a Core) -> Self {
36        Self { core }
37    }
38
39    pub(crate) fn core(&self) -> &'a Core {
40        self.core
41    }
42
43    pub(crate) fn read(
44        &self,
45        world: &ValidatedWorldPath,
46        tier: auth::Tier,
47    ) -> Result<Option<ReadResult>, EngineError> {
48        let permit = world_ops::authorize_read(self.core, world, tier)?;
49        match world_ops::read_world(self.core, &permit)
50            .map_err(|err| read_error_to_engine(err, Some(world.as_str())))?
51        {
52            world_ops::ReadOutcome::Found { stage, etag } => Ok(Some(ReadResult {
53                representation: Representation {
54                    body: Bytes::from(stage.body),
55                    content_type: stage.content_type,
56                    headers: stage.headers,
57                },
58                etag,
59            })),
60            world_ops::ReadOutcome::Missing => Ok(None),
61        }
62    }
63
64    pub(crate) async fn replace<H: world_ops::WriteTraceHooks + ?Sized>(
65        &self,
66        world: &ValidatedWorldPath,
67        representation: Representation,
68        preconditions: Preconditions,
69        tier: auth::Tier,
70        hooks: &H,
71    ) -> Result<WriteResult, EngineError> {
72        let permit = world_ops::authorize_write(world, tier)?;
73        let outcome = world_ops::replace_write(
74            self.core,
75            &permit,
76            world_ops::ReplaceRequest {
77                body: representation.body,
78                content_type: representation.content_type,
79                headers: representation.headers,
80                preconditions: preconditions.into(),
81            },
82            hooks,
83        )
84        .await
85        .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
86        Ok(outcome.into())
87    }
88
89    pub(crate) async fn append<H: world_ops::WriteTraceHooks + ?Sized>(
90        &self,
91        world: &ValidatedWorldPath,
92        body: Bytes,
93        preconditions: Preconditions,
94        tier: auth::Tier,
95        hooks: &H,
96    ) -> Result<WriteResult, EngineError> {
97        let permit = world_ops::authorize_write(world, tier)?;
98        let outcome = world_ops::append_write(
99            self.core,
100            &permit,
101            world_ops::AppendRequest {
102                body,
103                preconditions: preconditions.into(),
104            },
105            hooks,
106        )
107        .await
108        .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
109        Ok(outcome.into())
110    }
111
112    pub(crate) async fn delete<H: DeleteTraceHooks + ?Sized>(
113        &self,
114        world: &ValidatedWorldPath,
115        req: DeleteRequest,
116        tier: auth::Tier,
117        hooks: &H,
118    ) -> Result<(), delete_ops::DeleteError> {
119        let permit = delete_ops::authorize_delete(world, tier)?;
120        delete_ops::delete(self.core, &permit, req, hooks).await
121    }
122
123    pub(crate) fn subscribe(
124        &self,
125        pattern: &SubscribePattern,
126        tier: auth::Tier,
127        since: Option<u64>,
128    ) -> Result<EngineSubscription, EngineError> {
129        let permit = self.authorize_subscribe(pattern, tier)?;
130        Ok(self.open_subscription(permit, since))
131    }
132
133    fn authorize_subscribe(
134        &self,
135        pattern: &SubscribePattern,
136        tier: auth::Tier,
137    ) -> Result<SubscribePermit, EngineError> {
138        if !crate::can_read(self.core, tier) {
139            return Err(EngineError::Auth(AuthGate::Read));
140        }
141        if *self.core.shutdown.borrow() {
142            return Err(EngineError::ShuttingDown);
143        }
144        let slot = self
145            .core
146            .listen_slots
147            .clone()
148            .try_acquire_owned()
149            .map_err(|_| EngineError::SubscriptionLimit)?;
150        Ok(SubscribePermit {
151            pattern: pattern.clone(),
152            slot,
153        })
154    }
155
156    fn open_subscription(&self, permit: SubscribePermit, since: Option<u64>) -> EngineSubscription {
157        let rx = self.core.events.subscribe();
158        let (lag, replay, live_floor) = replay_after(self.core, since, &permit.pattern);
159        let replay_mode = since.is_some();
160        let mut initial = VecDeque::new();
161        if let Some(skipped) = lag {
162            initial.push_back(Err(SubscriptionRecvError::Lagged { skipped }));
163        }
164        initial.extend(replay.into_iter().map(Ok));
165        EngineSubscription::new(
166            permit.slot,
167            initial,
168            rx,
169            permit.pattern,
170            replay_mode,
171            live_floor,
172            self.core.shutdown.clone(),
173        )
174    }
175}
176
177impl Engine {
178    /// Reads a world's full representation.
179    ///
180    /// # Returns
181    /// - `Ok(Some(ReadResult))` if the world exists.
182    /// - `Ok(None)` if the world does not exist (callers that want 404
183    ///   semantics handle this).
184    ///
185    /// # Errors
186    /// - [`EngineError::Auth`] if `tier` is below `Read`.
187    /// - [`EngineError::TransientStorage`] for SQLite `BUSY`/`LOCKED`.
188    /// - [`EngineError::InsufficientStorage`] for full-disk failures.
189    /// - [`EngineError::Storage`] for other storage errors.
190    pub fn read(
191        &self,
192        world: &ValidatedWorldPath,
193        tier: AccessTier,
194    ) -> Result<Option<ReadResult>, EngineError> {
195        EngineOps::new(self.core()).read(world, tier.into())
196    }
197
198    /// Replaces a world with the provided representation.
199    ///
200    /// Creates the world if it does not exist; otherwise overwrites the
201    /// body, content type, and headers, then advances the audit chain.
202    ///
203    /// # Errors
204    /// - [`EngineError::Auth`] if `tier` is below the namespace's write
205    ///   requirement (`Write` for `home/`, `Approve` for system
206    ///   namespaces).
207    /// - [`EngineError::PayloadTooLarge`] if the body exceeds the per-world
208    ///   cap.
209    /// - [`EngineError::PreconditionFailed`] if `preconditions` reject the
210    ///   write.
211    /// - [`EngineError::QuotaExceeded`] for durable-storage quota failures.
212    /// - [`EngineError::TransientStorage`] /
213    ///   [`EngineError::InsufficientStorage`] / [`EngineError::Storage`]
214    ///   for storage-layer failures.
215    pub async fn replace(
216        &self,
217        world: &ValidatedWorldPath,
218        representation: Representation,
219        preconditions: Preconditions,
220        tier: AccessTier,
221    ) -> Result<WriteResult, EngineError> {
222        EngineOps::new(self.core())
223            .replace(
224                world,
225                representation,
226                preconditions,
227                tier.into(),
228                &NoopWriteTrace,
229            )
230            .await
231    }
232
233    /// Appends bytes to a world's body and advances the audit chain.
234    ///
235    /// Same auth requirements and error variants as [`Engine::replace`].
236    /// The world's content type and metadata headers are unchanged.
237    ///
238    /// # Errors
239    /// Same as [`Engine::replace`].
240    pub async fn append(
241        &self,
242        world: &ValidatedWorldPath,
243        body: Bytes,
244        preconditions: Preconditions,
245        tier: AccessTier,
246    ) -> Result<WriteResult, EngineError> {
247        EngineOps::new(self.core())
248            .append(world, body, preconditions, tier.into(), &NoopWriteTrace)
249            .await
250    }
251
252    /// Deletes a world with default, empty audit metadata.
253    ///
254    /// Convenience wrapper around the DELETE protocol that records empty
255    /// content-type and headers in the audit intent. Adapters that need to
256    /// preserve the deleted representation's metadata in the audit log
257    /// should call [`Engine::delete_traced`] with a populated
258    /// [`crate::DeleteMetadata`].
259    ///
260    /// # Errors
261    /// - [`EngineError::Auth`] if `tier` is below `Approve`.
262    /// - [`EngineError::AppendOnly`] for append-only worlds (e.g.
263    ///   `var/log/deletes`).
264    /// - [`EngineError::PreconditionFailed`] / [`EngineError::NotFound`].
265    /// - [`EngineError::TransientStorage`] /
266    ///   [`EngineError::InsufficientStorage`] / [`EngineError::Storage`]
267    ///   for storage-layer failures.
268    pub async fn delete(
269        &self,
270        world: &ValidatedWorldPath,
271        preconditions: Preconditions,
272        tier: AccessTier,
273    ) -> Result<(), EngineError> {
274        EngineOps::new(self.core())
275            .delete(
276                world,
277                DeleteRequest {
278                    preconditions,
279                    content_type: String::new(),
280                    headers: Vec::new(),
281                },
282                tier.into(),
283                &NoopDeleteTrace,
284            )
285            .await
286            .map_err(Into::into)
287    }
288
289    /// Subscribes to change events matching `pattern`.
290    ///
291    /// If `since` is `Some(id)`, the subscription replays every event with
292    /// `id > since` from the in-memory ring before switching to the live
293    /// stream. Replay is bounded by the configured `listen_replay_max`; if
294    /// `since` is older than the ring's floor, the first `recv` call yields
295    /// a [`crate::SubscriptionRecvError::Lagged`] error.
296    ///
297    /// The returned [`EngineSubscription`] holds a subscription slot until
298    /// dropped; drop it promptly when finished so other subscribers can
299    /// join.
300    ///
301    /// # Errors
302    /// - [`EngineError::Auth`] if `tier` is below `Read`.
303    /// - [`EngineError::SubscriptionLimit`] if the slot pool is full.
304    /// - [`EngineError::ShuttingDown`] if [`Engine::shutdown`] has been
305    ///   called.
306    pub fn subscribe(
307        &self,
308        pattern: &SubscribePattern,
309        tier: AccessTier,
310        since: Option<u64>,
311    ) -> Result<EngineSubscription, EngineError> {
312        EngineOps::new(self.core()).subscribe(pattern, tier.into(), since)
313    }
314}
315
316struct NoopWriteTrace;
317
318impl world_ops::WriteTraceHooks for NoopWriteTrace {}
319
320struct NoopDeleteTrace;
321
322impl DeleteTraceHooks for NoopDeleteTrace {}
323
324pub(crate) fn replay_after(
325    core: &Core,
326    since: Option<u64>,
327    pattern: &SubscribePattern,
328) -> (Option<u64>, Vec<ChangeEvent>, u64) {
329    let Some(last_id) = since else {
330        return (None, Vec::new(), 0);
331    };
332    let log = core
333        .event_log
334        .lock()
335        .unwrap_or_else(|poison| poison.into_inner());
336    let gap = log.front().and_then(|oldest| {
337        let expected_next = last_id.saturating_add(1);
338        if expected_next < oldest.id {
339            Some(oldest.id - expected_next)
340        } else {
341            None
342        }
343    });
344    let replay: Vec<ChangeEvent> = log
345        .iter()
346        .filter(|change| change.id > last_id && event::matches(pattern.as_str(), &change.path))
347        .cloned()
348        .map(Into::into)
349        .collect();
350    let live_floor = replay.last().map(|change| change.id).unwrap_or(last_id);
351    (gap, replay, live_floor)
352}
353
354impl From<Preconditions> for etag::Preconditions {
355    fn from(value: Preconditions) -> Self {
356        etag::Preconditions::new(
357            value.if_match.into_iter().map(Into::into).collect(),
358            value.if_none_match.into_iter().map(Into::into).collect(),
359        )
360    }
361}
362
363impl From<etag::Preconditions> for Preconditions {
364    fn from(value: etag::Preconditions) -> Self {
365        let (if_match, if_none_match) = value.into_parts();
366        Self {
367            if_match: if_match.into_iter().map(Into::into).collect(),
368            if_none_match: if_none_match.into_iter().map(Into::into).collect(),
369        }
370    }
371}
372
373impl From<EtagMatcher> for etag::EtagMatcher {
374    fn from(value: EtagMatcher) -> Self {
375        match value {
376            EtagMatcher::Any => Self::Any,
377            EtagMatcher::Strong(value) => Self::Strong(value),
378            EtagMatcher::Weak(value) => Self::Weak(value),
379            EtagMatcher::Invalid => Self::Invalid,
380        }
381    }
382}
383
384impl From<etag::EtagMatcher> for EtagMatcher {
385    fn from(value: etag::EtagMatcher) -> Self {
386        match value {
387            etag::EtagMatcher::Any => Self::Any,
388            etag::EtagMatcher::Strong(value) => Self::Strong(value),
389            etag::EtagMatcher::Weak(value) => Self::Weak(value),
390            etag::EtagMatcher::Invalid => Self::Invalid,
391        }
392    }
393}
394
395impl From<world_ops::WriteOutcome> for WriteResult {
396    fn from(value: world_ops::WriteOutcome) -> Self {
397        Self {
398            kind: match value.status_kind {
399                world_ops::WriteStatusKind::Created => WriteKind::Created,
400                world_ops::WriteStatusKind::Updated => WriteKind::Updated,
401            },
402            etag: value.etag,
403        }
404    }
405}
406
407impl From<AccessTier> for auth::Tier {
408    fn from(value: AccessTier) -> Self {
409        match value {
410            AccessTier::Anon => Self::Anon,
411            AccessTier::Read => Self::Read,
412            AccessTier::Write => Self::Write,
413            AccessTier::Approve => Self::Approve,
414        }
415    }
416}
417
418impl From<world_ops::ReadError> for EngineError {
419    fn from(value: world_ops::ReadError) -> Self {
420        read_error_to_engine(value, None)
421    }
422}
423
424impl From<world_ops::WriteError> for EngineError {
425    fn from(value: world_ops::WriteError) -> Self {
426        write_error_to_engine(value, None)
427    }
428}
429
430fn storage_op_label(op: world_ops::StorageOp) -> &'static str {
431    match op {
432        world_ops::StorageOp::Read => "read",
433        world_ops::StorageOp::WriteAudit => "write_audit",
434    }
435}
436
437fn read_error_to_engine(value: world_ops::ReadError, world: Option<&str>) -> EngineError {
438    match value {
439        world_ops::ReadError::Auth(gate) => EngineError::Auth(gate),
440        world_ops::ReadError::TransientStorage { scope, err } => {
441            log_storage_error(scope, &err, "read", world);
442            EngineError::TransientStorage {
443                sqlite_code: engine::sqlite_code(&err),
444            }
445        }
446        world_ops::ReadError::InsufficientStorage { scope, err } => {
447            log_storage_error(scope, &err, "read", world);
448            EngineError::InsufficientStorage {
449                sqlite_code: engine::sqlite_code(&err),
450            }
451        }
452        world_ops::ReadError::StorageRead { scope, err } => {
453            log_storage_error(scope, &err, "read", world);
454            EngineError::Storage {
455                sqlite_code: engine::sqlite_code(&err),
456            }
457        }
458        world_ops::ReadError::PermitWorldMismatch => {
459            EngineError::InternalInvariant("read permit world mismatch")
460        }
461    }
462}
463
464fn write_error_to_engine(value: world_ops::WriteError, world: Option<&str>) -> EngineError {
465    match value {
466        world_ops::WriteError::Auth(gate) => EngineError::Auth(gate),
467        world_ops::WriteError::PayloadTooLarge { max } => EngineError::PayloadTooLarge { max },
468        world_ops::WriteError::PreconditionFailed { message } => {
469            EngineError::PreconditionFailed { message }
470        }
471        world_ops::WriteError::NotFound => EngineError::NotFound,
472        world_ops::WriteError::QuotaExceeded {
473            used,
474            quota,
475            projected,
476        } => EngineError::QuotaExceeded {
477            used,
478            quota,
479            projected,
480        },
481        world_ops::WriteError::TransientStorage { scope, err, op } => {
482            log_storage_error(scope, &err, storage_op_label(op), world);
483            EngineError::TransientStorage {
484                sqlite_code: engine::sqlite_code(&err),
485            }
486        }
487        world_ops::WriteError::InsufficientStorage { scope, err, op } => {
488            log_storage_error(scope, &err, storage_op_label(op), world);
489            EngineError::InsufficientStorage {
490                sqlite_code: engine::sqlite_code(&err),
491            }
492        }
493        world_ops::WriteError::StorageRead { scope, err } => {
494            log_storage_error(scope, &err, "read", world);
495            EngineError::Storage {
496                sqlite_code: engine::sqlite_code(&err),
497            }
498        }
499        world_ops::WriteError::StorageWriteAudit { scope, err } => {
500            log_storage_error(scope, &err, "write_audit", world);
501            EngineError::Storage {
502                sqlite_code: engine::sqlite_code(&err),
503            }
504        }
505        world_ops::WriteError::Internal(message) => EngineError::InternalInvariant(message),
506    }
507}
508
509pub(crate) fn log_storage_error(
510    scope: &'static str,
511    err: &rusqlite::Error,
512    operation: &'static str,
513    world: Option<&str>,
514) {
515    #[cfg(feature = "unstable-engine")]
516    tracing::error!(
517        scope,
518        operation,
519        world = world.unwrap_or(""),
520        sqlite_code = ?engine::sqlite_code(err),
521        error = %err,
522        "engine storage error"
523    );
524
525    #[cfg(not(feature = "unstable-engine"))]
526    match world {
527        Some(world) => {
528            eprintln!("elastik-core internal {scope} ({operation}) world={world}: {err}");
529        }
530        None => eprintln!("elastik-core internal {scope} ({operation}): {err}"),
531    }
532}
533
534pub(crate) fn log_blocking_storage_error(
535    scope: &'static str,
536    err: &BlockingSqliteError,
537    operation: &'static str,
538    world: Option<&str>,
539) {
540    match err {
541        BlockingSqliteError::Sqlite(err) => log_storage_error(scope, err, operation, world),
542        BlockingSqliteError::Worker => {
543            #[cfg(feature = "unstable-engine")]
544            tracing::error!(
545                scope,
546                operation,
547                world = world.unwrap_or(""),
548                "engine storage worker failed"
549            );
550
551            #[cfg(not(feature = "unstable-engine"))]
552            match world {
553                Some(world) => {
554                    eprintln!(
555                        "elastik-core internal {scope} ({operation}) world={world}: sqlite worker failed"
556                    );
557                }
558                None => {
559                    eprintln!("elastik-core internal {scope} ({operation}): sqlite worker failed");
560                }
561            }
562        }
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use std::path::PathBuf;
569    use std::time::{SystemTime, UNIX_EPOCH};
570
571    use bytes::Bytes;
572
573    use super::*;
574    use crate::engine_types::SecretBytes;
575
576    fn temp_root(name: &str) -> PathBuf {
577        let nonce = SystemTime::now()
578            .duration_since(UNIX_EPOCH)
579            .unwrap()
580            .as_nanos();
581        let root = std::env::temp_dir().join(format!(
582            "elastik-engine-ops-{name}-{}-{nonce}",
583            std::process::id()
584        ));
585        let _ = std::fs::remove_dir_all(&root);
586        root
587    }
588
589    fn test_engine(name: &str) -> (Engine, PathBuf) {
590        let root = temp_root(name);
591        let engine = Engine::builder()
592            .data_root(root.clone())
593            .key(SecretBytes::try_from_slice(b"key").unwrap())
594            .max_listen_connections(1)
595            .build()
596            .unwrap();
597        (engine, root)
598    }
599
600    #[tokio::test]
601    async fn engine_delete_requires_approve_and_removes_world() {
602        let (engine, root) = test_engine("delete");
603        let world = ValidatedWorldPath::new("home/delete-me").unwrap();
604        engine
605            .replace(
606                &world,
607                Representation {
608                    body: Bytes::from_static(b"alive"),
609                    content_type: "text/plain".to_owned(),
610                    headers: Vec::new(),
611                },
612                Preconditions::none(),
613                AccessTier::Write,
614            )
615            .await
616            .unwrap();
617
618        assert!(matches!(
619            engine
620                .delete(&world, Preconditions::none(), AccessTier::Write)
621                .await,
622            Err(EngineError::Auth(AuthGate::Delete))
623        ));
624
625        engine
626            .delete(&world, Preconditions::none(), AccessTier::Approve)
627            .await
628            .unwrap();
629        assert!(engine.read(&world, AccessTier::Read).unwrap().is_none());
630
631        drop(engine);
632        let _ = std::fs::remove_dir_all(root);
633    }
634
635    #[tokio::test]
636    async fn engine_subscribe_requires_read_tier_and_replays_since_id() {
637        let root = temp_root("subscribe");
638        let engine = Engine::builder()
639            .data_root(root.clone())
640            .key(SecretBytes::try_from_slice(b"key").unwrap())
641            .read_token(b"reader".to_vec())
642            .build()
643            .unwrap();
644        let pattern = SubscribePattern::new("home/events/*");
645
646        assert!(matches!(
647            engine.subscribe(&pattern, AccessTier::Anon, None),
648            Err(EngineError::Auth(AuthGate::Read))
649        ));
650
651        let world = ValidatedWorldPath::new("home/events/a").unwrap();
652        engine
653            .replace(
654                &world,
655                Representation {
656                    body: Bytes::from_static(b"event"),
657                    content_type: "text/plain".to_owned(),
658                    headers: Vec::new(),
659                },
660                Preconditions::none(),
661                AccessTier::Write,
662            )
663            .await
664            .unwrap();
665
666        let mut subscription = engine
667            .subscribe(&pattern, AccessTier::Read, Some(0))
668            .expect("read tier subscribes");
669        let event = subscription.recv().await.expect("replay event");
670        assert_eq!(event.method, "PUT");
671        assert_eq!(event.path.as_str(), "home/events/a");
672
673        drop(subscription);
674        drop(engine);
675        let _ = std::fs::remove_dir_all(root);
676    }
677
678    #[test]
679    fn engine_subscribe_enforces_slot_cap_at_entry() {
680        let (engine, root) = test_engine("subscribe-cap");
681        let pattern = SubscribePattern::new("*");
682        let first = engine
683            .subscribe(&pattern, AccessTier::Anon, None)
684            .expect("first subscription consumes the sole slot");
685        assert!(matches!(
686            engine.subscribe(&pattern, AccessTier::Anon, None),
687            Err(EngineError::SubscriptionLimit)
688        ));
689
690        drop(first);
691        drop(engine);
692        let _ = std::fs::remove_dir_all(root);
693    }
694
695    #[test]
696    fn engine_subscribe_denied_auth_does_not_consume_slot() {
697        let root = temp_root("subscribe-auth-slot");
698        let engine = Engine::builder()
699            .data_root(root.clone())
700            .key(SecretBytes::try_from_slice(b"key").unwrap())
701            .read_token(b"reader".to_vec())
702            .max_listen_connections(1)
703            .build()
704            .unwrap();
705        let pattern = SubscribePattern::new("*");
706
707        assert!(matches!(
708            engine.subscribe(&pattern, AccessTier::Anon, None),
709            Err(EngineError::Auth(AuthGate::Read))
710        ));
711        let subscription = engine
712            .subscribe(&pattern, AccessTier::Read, None)
713            .expect("failed auth must not consume the only slot");
714
715        drop(subscription);
716        drop(engine);
717        let _ = std::fs::remove_dir_all(root);
718    }
719
720    #[tokio::test]
721    async fn engine_subscription_closed_is_terminal() {
722        let (engine, root) = test_engine("subscribe-closed");
723        let pattern = SubscribePattern::new("*");
724        let mut subscription = engine
725            .subscribe(&pattern, AccessTier::Anon, None)
726            .expect("subscription opens before shutdown");
727
728        engine.shutdown();
729        assert!(matches!(
730            subscription.recv().await,
731            Err(SubscriptionRecvError::Closed)
732        ));
733        assert!(matches!(
734            subscription.recv().await,
735            Err(SubscriptionRecvError::Closed)
736        ));
737
738        drop(subscription);
739        drop(engine);
740        let _ = std::fs::remove_dir_all(root);
741    }
742
743    #[tokio::test]
744    async fn engine_subscription_drains_replay_before_shutdown() {
745        let (engine, root) = test_engine("subscribe-replay-before-shutdown");
746        let pattern = SubscribePattern::new("home/replay/*");
747        for name in ["home/replay/a", "home/replay/b"] {
748            let world = ValidatedWorldPath::new(name).unwrap();
749            engine
750                .replace(
751                    &world,
752                    Representation {
753                        body: Bytes::from_static(b"event"),
754                        content_type: "text/plain".to_owned(),
755                        headers: Vec::new(),
756                    },
757                    Preconditions::none(),
758                    AccessTier::Write,
759                )
760                .await
761                .unwrap();
762        }
763
764        let mut subscription = engine
765            .subscribe(&pattern, AccessTier::Anon, Some(0))
766            .expect("subscription opens before shutdown");
767        engine.shutdown();
768        assert_eq!(
769            subscription.recv().await.unwrap().path.as_str(),
770            "home/replay/a"
771        );
772        assert_eq!(
773            subscription.recv().await.unwrap().path.as_str(),
774            "home/replay/b"
775        );
776        assert!(matches!(
777            subscription.recv().await,
778            Err(SubscriptionRecvError::Closed)
779        ));
780
781        drop(subscription);
782        drop(engine);
783        let _ = std::fs::remove_dir_all(root);
784    }
785}