Skip to main content

elastik_core/
engine_ops.rs

1//! Engine operation seam over protocol-neutral world transitions.
2//!
3//! HTTP and CoAP still live inside the library crate today, so they use the
4//! crate-private `EngineOps` view. Public `Engine` methods delegate here too,
5//! keeping one path from facade to `world_ops`.
6
7#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
8
9use std::collections::VecDeque;
10
11use bytes::Bytes;
12
13use crate::{
14    auth,
15    delete_ops::{self, DeleteRequest, DeleteTraceHooks},
16    engine::{self, Engine, EngineError},
17    engine_types::{
18        AccessTier, ChangeEvent, EngineSubscription, Preconditions, ReadResult, Representation,
19        SubscribePattern, SubscriptionRecvError, ValidatedWorldPath, WriteKind, WriteResult,
20    },
21    etag, event, world_ops, AuthGate, BlockingSqliteError, Core,
22};
23
24pub(crate) struct EngineOps<'a> {
25    core: &'a Core,
26}
27
28struct SubscribePermit {
29    pattern: SubscribePattern,
30    slot: tokio::sync::OwnedSemaphorePermit,
31}
32
33impl<'a> EngineOps<'a> {
34    pub(crate) fn new(core: &'a Core) -> Self {
35        Self { core }
36    }
37
38    pub(crate) fn core(&self) -> &'a Core {
39        self.core
40    }
41
42    pub(crate) fn read(
43        &self,
44        world: &ValidatedWorldPath,
45        tier: auth::Tier,
46    ) -> Result<Option<ReadResult>, EngineError> {
47        let permit = world_ops::authorize_read(self.core, world, tier)?;
48        match world_ops::read_world(self.core, &permit)
49            .map_err(|err| read_error_to_engine(err, Some(world.as_str())))?
50        {
51            world_ops::ReadOutcome::Found { stage, etag } => Ok(Some(ReadResult::new(
52                Representation::new(Bytes::from(stage.body), stage.content_type, stage.headers),
53                etag,
54            ))),
55            world_ops::ReadOutcome::Missing => Ok(None),
56        }
57    }
58
59    pub(crate) async fn replace<H: world_ops::WriteTraceHooks + ?Sized>(
60        &self,
61        world: &ValidatedWorldPath,
62        representation: Representation,
63        preconditions: Preconditions,
64        tier: auth::Tier,
65        hooks: &H,
66    ) -> Result<WriteResult, EngineError> {
67        let permit = world_ops::authorize_write(world, tier)?;
68        let outcome = world_ops::replace_write(
69            self.core,
70            &permit,
71            world_ops::ReplaceRequest {
72                body: representation.body,
73                content_type: representation.content_type,
74                headers: representation.headers,
75                preconditions: preconditions.into(),
76            },
77            hooks,
78        )
79        .await
80        .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
81        Ok(outcome.into())
82    }
83
84    pub(crate) async fn append<H: world_ops::WriteTraceHooks + ?Sized>(
85        &self,
86        world: &ValidatedWorldPath,
87        body: Bytes,
88        preconditions: Preconditions,
89        tier: auth::Tier,
90        hooks: &H,
91    ) -> Result<WriteResult, EngineError> {
92        let permit = world_ops::authorize_write(world, tier)?;
93        let outcome = world_ops::append_write(
94            self.core,
95            &permit,
96            world_ops::AppendRequest {
97                body,
98                preconditions: preconditions.into(),
99            },
100            hooks,
101        )
102        .await
103        .map_err(|err| write_error_to_engine(err, Some(world.as_str())))?;
104        Ok(outcome.into())
105    }
106
107    pub(crate) async fn delete<H: DeleteTraceHooks + ?Sized>(
108        &self,
109        world: &ValidatedWorldPath,
110        req: DeleteRequest,
111        tier: auth::Tier,
112        hooks: &H,
113    ) -> Result<(), delete_ops::DeleteError> {
114        let permit = delete_ops::authorize_delete(world, tier)?;
115        delete_ops::delete(self.core, &permit, req, hooks).await
116    }
117
118    pub(crate) fn subscribe(
119        &self,
120        pattern: &SubscribePattern,
121        tier: auth::Tier,
122        since: Option<u64>,
123    ) -> Result<EngineSubscription, EngineError> {
124        let permit = self.authorize_subscribe(pattern, tier)?;
125        Ok(self.open_subscription(permit, since))
126    }
127
128    fn authorize_subscribe(
129        &self,
130        pattern: &SubscribePattern,
131        tier: auth::Tier,
132    ) -> Result<SubscribePermit, EngineError> {
133        if !crate::can_read(self.core, tier) {
134            return Err(EngineError::Auth(AuthGate::Read));
135        }
136        if *self.core.shutdown.borrow() {
137            return Err(EngineError::ShuttingDown);
138        }
139        let slot = self
140            .core
141            .listen_slots
142            .clone()
143            .try_acquire_owned()
144            .map_err(|_| EngineError::SubscriptionLimit)?;
145        Ok(SubscribePermit {
146            pattern: pattern.clone(),
147            slot,
148        })
149    }
150
151    fn open_subscription(&self, permit: SubscribePermit, since: Option<u64>) -> EngineSubscription {
152        let rx = self.core.events.subscribe();
153        let (lag, replay, live_floor) = replay_after(self.core, since, &permit.pattern);
154        let replay_mode = since.is_some();
155        let mut initial = VecDeque::new();
156        if let Some(skipped) = lag {
157            initial.push_back(Err(SubscriptionRecvError::Lagged { skipped }));
158        }
159        initial.extend(replay.into_iter().map(Ok));
160        EngineSubscription::new(
161            permit.slot,
162            initial,
163            rx,
164            permit.pattern,
165            replay_mode,
166            live_floor,
167            self.core.shutdown.clone(),
168        )
169    }
170}
171
172impl Engine {
173    /// Reads a world's full representation.
174    ///
175    /// # Returns
176    /// - `Ok(Some(ReadResult))` if the world exists.
177    /// - `Ok(None)` if the world does not exist (callers that want 404
178    ///   semantics handle this).
179    ///
180    /// # Errors
181    /// - [`EngineError::Auth`] if `tier` is below `Read`.
182    /// - [`EngineError::TransientStorage`] for SQLite `BUSY`/`LOCKED`.
183    /// - [`EngineError::InsufficientStorage`] for full-disk failures.
184    /// - [`EngineError::Storage`] for other storage errors.
185    pub fn read(
186        &self,
187        world: &ValidatedWorldPath,
188        tier: AccessTier,
189    ) -> Result<Option<ReadResult>, EngineError> {
190        EngineOps::new(self.core()).read(world, tier.into())
191    }
192
193    /// Replaces a world with the provided representation.
194    ///
195    /// Creates the world if it does not exist; otherwise overwrites the
196    /// body, content type, and headers, then advances the audit chain.
197    ///
198    /// # Errors
199    /// - [`EngineError::Auth`] if `tier` is below the namespace's write
200    ///   requirement (`Write` for `home/`, `Approve` for system
201    ///   namespaces).
202    /// - [`EngineError::PayloadTooLarge`] if the body exceeds the per-world
203    ///   cap.
204    /// - [`EngineError::PreconditionFailed`] if `preconditions` reject the
205    ///   write.
206    /// - [`EngineError::QuotaExceeded`] for durable-storage quota failures.
207    /// - [`EngineError::TransientStorage`] /
208    ///   [`EngineError::InsufficientStorage`] / [`EngineError::Storage`]
209    ///   for storage-layer failures.
210    pub async fn replace(
211        &self,
212        world: &ValidatedWorldPath,
213        representation: Representation,
214        preconditions: Preconditions,
215        tier: AccessTier,
216    ) -> Result<WriteResult, EngineError> {
217        EngineOps::new(self.core())
218            .replace(
219                world,
220                representation,
221                preconditions,
222                tier.into(),
223                &NoopWriteTrace,
224            )
225            .await
226    }
227
228    /// Appends bytes to a world's body and advances the audit chain.
229    ///
230    /// Same auth requirements and error variants as [`Engine::replace`].
231    /// The world's content type and metadata headers are unchanged.
232    ///
233    /// # Errors
234    /// Same as [`Engine::replace`].
235    pub async fn append(
236        &self,
237        world: &ValidatedWorldPath,
238        body: Bytes,
239        preconditions: Preconditions,
240        tier: AccessTier,
241    ) -> Result<WriteResult, EngineError> {
242        EngineOps::new(self.core())
243            .append(world, body, preconditions, tier.into(), &NoopWriteTrace)
244            .await
245    }
246
247    /// Deletes a world with default, empty audit metadata.
248    ///
249    /// Convenience wrapper around the DELETE protocol that records empty
250    /// content-type and headers in the audit intent. Adapters that need to
251    /// preserve the deleted representation's metadata in the audit log
252    /// should call [`Engine::delete_traced`] with a populated
253    /// [`crate::DeleteMetadata`].
254    ///
255    /// # Errors
256    /// - [`EngineError::Auth`] if `tier` is below `Approve`.
257    /// - [`EngineError::AppendOnly`] for append-only worlds (e.g.
258    ///   `var/log/deletes`).
259    /// - [`EngineError::PreconditionFailed`] / [`EngineError::NotFound`].
260    /// - [`EngineError::TransientStorage`] /
261    ///   [`EngineError::InsufficientStorage`] / [`EngineError::Storage`]
262    ///   for storage-layer failures.
263    pub async fn delete(
264        &self,
265        world: &ValidatedWorldPath,
266        preconditions: Preconditions,
267        tier: AccessTier,
268    ) -> Result<(), EngineError> {
269        EngineOps::new(self.core())
270            .delete(
271                world,
272                DeleteRequest {
273                    preconditions,
274                    content_type: String::new(),
275                    headers: Vec::new(),
276                },
277                tier.into(),
278                &NoopDeleteTrace,
279            )
280            .await
281            .map_err(Into::into)
282    }
283
284    /// Subscribes to change events matching `pattern`.
285    ///
286    /// If `since` is `Some(id)`, the subscription replays every event with
287    /// `id > since` from the in-memory ring before switching to the live
288    /// stream. Replay is bounded by the configured `listen_replay_max`; if
289    /// `since` is older than the ring's floor, the first `recv` call yields
290    /// a [`crate::SubscriptionRecvError::Lagged`] error.
291    ///
292    /// The returned [`EngineSubscription`] holds a subscription slot until
293    /// dropped; drop it promptly when finished so other subscribers can
294    /// join.
295    ///
296    /// # Errors
297    /// - [`EngineError::Auth`] if `tier` is below `Read`.
298    /// - [`EngineError::SubscriptionLimit`] if the slot pool is full.
299    /// - [`EngineError::ShuttingDown`] if [`Engine::shutdown`] has been
300    ///   called.
301    pub fn subscribe(
302        &self,
303        pattern: &SubscribePattern,
304        tier: AccessTier,
305        since: Option<u64>,
306    ) -> Result<EngineSubscription, EngineError> {
307        EngineOps::new(self.core()).subscribe(pattern, tier.into(), since)
308    }
309}
310
311struct NoopWriteTrace;
312
313impl world_ops::WriteTraceHooks for NoopWriteTrace {}
314
315struct NoopDeleteTrace;
316
317impl DeleteTraceHooks for NoopDeleteTrace {}
318
319pub(crate) fn replay_after(
320    core: &Core,
321    since: Option<u64>,
322    pattern: &SubscribePattern,
323) -> (Option<u64>, Vec<ChangeEvent>, u64) {
324    let Some(last_id) = since else {
325        return (None, Vec::new(), 0);
326    };
327    let log = core
328        .event_log
329        .lock()
330        .unwrap_or_else(|poison| poison.into_inner());
331    let gap = log.front().and_then(|oldest| {
332        let expected_next = last_id.saturating_add(1);
333        if expected_next < oldest.id {
334            Some(oldest.id - expected_next)
335        } else {
336            None
337        }
338    });
339    let replay: Vec<ChangeEvent> = log
340        .iter()
341        .filter(|change| change.id > last_id && event::matches(pattern.as_str(), &change.path))
342        .cloned()
343        .map(Into::into)
344        .collect();
345    let live_floor = replay.last().map(|change| change.id).unwrap_or(last_id);
346    (gap, replay, live_floor)
347}
348
349impl From<Preconditions> for etag::Preconditions {
350    fn from(value: Preconditions) -> Self {
351        etag::Preconditions::new(
352            value.if_match.into_iter().map(Into::into).collect(),
353            value.if_none_match.into_iter().map(Into::into).collect(),
354        )
355    }
356}
357
358impl From<etag::Preconditions> for Preconditions {
359    fn from(value: etag::Preconditions) -> Self {
360        let (if_match, if_none_match) = value.into_parts();
361        Self::new(
362            if_match.into_iter().map(Into::into).collect(),
363            if_none_match.into_iter().map(Into::into).collect(),
364        )
365    }
366}
367
368impl From<world_ops::WriteOutcome> for WriteResult {
369    fn from(value: world_ops::WriteOutcome) -> Self {
370        Self::new(
371            match value.status_kind {
372                world_ops::WriteStatusKind::Created => WriteKind::Created,
373                world_ops::WriteStatusKind::Updated => WriteKind::Updated,
374            },
375            value.etag,
376        )
377    }
378}
379
380impl From<AccessTier> for auth::Tier {
381    fn from(value: AccessTier) -> Self {
382        match value {
383            AccessTier::Anon => Self::Anon,
384            AccessTier::Read => Self::Read,
385            AccessTier::Write => Self::Write,
386            AccessTier::Approve => Self::Approve,
387        }
388    }
389}
390
391impl From<world_ops::ReadError> for EngineError {
392    fn from(value: world_ops::ReadError) -> Self {
393        read_error_to_engine(value, None)
394    }
395}
396
397impl From<world_ops::WriteError> for EngineError {
398    fn from(value: world_ops::WriteError) -> Self {
399        write_error_to_engine(value, None)
400    }
401}
402
403fn storage_op_label(op: world_ops::StorageOp) -> &'static str {
404    match op {
405        world_ops::StorageOp::Read => "read",
406        world_ops::StorageOp::WriteAudit => "write_audit",
407    }
408}
409
410fn read_error_to_engine(value: world_ops::ReadError, world: Option<&str>) -> EngineError {
411    match value {
412        world_ops::ReadError::Auth(gate) => EngineError::Auth(gate),
413        world_ops::ReadError::TransientStorage { scope, err } => {
414            log_storage_error(scope, &err, "read", world);
415            EngineError::TransientStorage {
416                sqlite_code: engine::sqlite_code(&err),
417            }
418        }
419        world_ops::ReadError::InsufficientStorage { scope, err } => {
420            log_storage_error(scope, &err, "read", world);
421            EngineError::InsufficientStorage {
422                sqlite_code: engine::sqlite_code(&err),
423            }
424        }
425        world_ops::ReadError::StorageRead { scope, err } => {
426            log_storage_error(scope, &err, "read", world);
427            EngineError::Storage {
428                sqlite_code: engine::sqlite_code(&err),
429            }
430        }
431        world_ops::ReadError::PermitWorldMismatch => {
432            EngineError::InternalInvariant("read permit world mismatch")
433        }
434    }
435}
436
437fn write_error_to_engine(value: world_ops::WriteError, world: Option<&str>) -> EngineError {
438    match value {
439        world_ops::WriteError::Auth(gate) => EngineError::Auth(gate),
440        world_ops::WriteError::PayloadTooLarge { max } => EngineError::PayloadTooLarge { max },
441        world_ops::WriteError::PreconditionFailed { message } => {
442            EngineError::PreconditionFailed { message }
443        }
444        world_ops::WriteError::NotFound => EngineError::NotFound,
445        world_ops::WriteError::QuotaExceeded {
446            used,
447            quota,
448            projected,
449        } => EngineError::QuotaExceeded {
450            used,
451            quota,
452            projected,
453        },
454        world_ops::WriteError::TransientStorage { scope, err, op } => {
455            log_storage_error(scope, &err, storage_op_label(op), world);
456            EngineError::TransientStorage {
457                sqlite_code: engine::sqlite_code(&err),
458            }
459        }
460        world_ops::WriteError::InsufficientStorage { scope, err, op } => {
461            log_storage_error(scope, &err, storage_op_label(op), world);
462            EngineError::InsufficientStorage {
463                sqlite_code: engine::sqlite_code(&err),
464            }
465        }
466        world_ops::WriteError::StorageRead { scope, err } => {
467            log_storage_error(scope, &err, "read", world);
468            EngineError::Storage {
469                sqlite_code: engine::sqlite_code(&err),
470            }
471        }
472        world_ops::WriteError::StorageWriteAudit { scope, err } => {
473            log_storage_error(scope, &err, "write_audit", world);
474            EngineError::Storage {
475                sqlite_code: engine::sqlite_code(&err),
476            }
477        }
478        world_ops::WriteError::Internal(message) => EngineError::InternalInvariant(message),
479    }
480}
481
482pub(crate) fn log_storage_error(
483    scope: &'static str,
484    err: &rusqlite::Error,
485    operation: &'static str,
486    world: Option<&str>,
487) {
488    #[cfg(feature = "unstable-engine")]
489    tracing::error!(
490        scope,
491        operation,
492        world = world.unwrap_or(""),
493        sqlite_code = ?engine::sqlite_code(err),
494        error = %err,
495        "engine storage error"
496    );
497
498    #[cfg(not(feature = "unstable-engine"))]
499    match world {
500        Some(world) => {
501            eprintln!("elastik-core internal {scope} ({operation}) world={world}: {err}");
502        }
503        None => eprintln!("elastik-core internal {scope} ({operation}): {err}"),
504    }
505}
506
507pub(crate) fn log_blocking_storage_error(
508    scope: &'static str,
509    err: &BlockingSqliteError,
510    operation: &'static str,
511    world: Option<&str>,
512) {
513    match err {
514        BlockingSqliteError::Sqlite(err) => log_storage_error(scope, err, operation, world),
515        BlockingSqliteError::Worker => {
516            #[cfg(feature = "unstable-engine")]
517            tracing::error!(
518                scope,
519                operation,
520                world = world.unwrap_or(""),
521                "engine storage worker failed"
522            );
523
524            #[cfg(not(feature = "unstable-engine"))]
525            match world {
526                Some(world) => {
527                    eprintln!(
528                        "elastik-core internal {scope} ({operation}) world={world}: sqlite worker failed"
529                    );
530                }
531                None => {
532                    eprintln!("elastik-core internal {scope} ({operation}): sqlite worker failed");
533                }
534            }
535        }
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use std::path::PathBuf;
542    use std::time::{SystemTime, UNIX_EPOCH};
543
544    use bytes::Bytes;
545
546    use super::*;
547    use crate::engine_types::SecretBytes;
548
549    fn temp_root(name: &str) -> PathBuf {
550        let nonce = SystemTime::now()
551            .duration_since(UNIX_EPOCH)
552            .unwrap()
553            .as_nanos();
554        let root = std::env::temp_dir().join(format!(
555            "elastik-engine-ops-{name}-{}-{nonce}",
556            std::process::id()
557        ));
558        let _ = std::fs::remove_dir_all(&root);
559        root
560    }
561
562    fn test_engine(name: &str) -> (Engine, PathBuf) {
563        let root = temp_root(name);
564        let engine = Engine::builder()
565            .data_root(root.clone())
566            .key(SecretBytes::try_from_slice(b"key").unwrap())
567            .max_listen_connections(1)
568            .build()
569            .unwrap();
570        (engine, root)
571    }
572
573    #[tokio::test]
574    async fn engine_delete_requires_approve_and_removes_world() {
575        let (engine, root) = test_engine("delete");
576        let world = ValidatedWorldPath::new("home/delete-me").unwrap();
577        engine
578            .replace(
579                &world,
580                Representation::new(Bytes::from_static(b"alive"), "text/plain", Vec::new()),
581                Preconditions::none(),
582                AccessTier::Write,
583            )
584            .await
585            .unwrap();
586
587        assert!(matches!(
588            engine
589                .delete(&world, Preconditions::none(), AccessTier::Write)
590                .await,
591            Err(EngineError::Auth(AuthGate::Delete))
592        ));
593
594        engine
595            .delete(&world, Preconditions::none(), AccessTier::Approve)
596            .await
597            .unwrap();
598        assert!(engine.read(&world, AccessTier::Read).unwrap().is_none());
599
600        drop(engine);
601        let _ = std::fs::remove_dir_all(root);
602    }
603
604    #[tokio::test]
605    async fn engine_subscribe_requires_read_tier_and_replays_since_id() {
606        let root = temp_root("subscribe");
607        let engine = Engine::builder()
608            .data_root(root.clone())
609            .key(SecretBytes::try_from_slice(b"key").unwrap())
610            .read_token(b"reader".to_vec())
611            .build()
612            .unwrap();
613        let pattern = SubscribePattern::new("home/events/*");
614
615        assert!(matches!(
616            engine.subscribe(&pattern, AccessTier::Anon, None),
617            Err(EngineError::Auth(AuthGate::Read))
618        ));
619
620        let world = ValidatedWorldPath::new("home/events/a").unwrap();
621        engine
622            .replace(
623                &world,
624                Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
625                Preconditions::none(),
626                AccessTier::Write,
627            )
628            .await
629            .unwrap();
630
631        let mut subscription = engine
632            .subscribe(&pattern, AccessTier::Read, Some(0))
633            .expect("read tier subscribes");
634        let event = subscription.recv().await.expect("replay event");
635        assert_eq!(event.method, "PUT");
636        assert_eq!(event.path.as_str(), "home/events/a");
637
638        drop(subscription);
639        drop(engine);
640        let _ = std::fs::remove_dir_all(root);
641    }
642
643    #[test]
644    fn engine_subscribe_enforces_slot_cap_at_entry() {
645        let (engine, root) = test_engine("subscribe-cap");
646        let pattern = SubscribePattern::new("*");
647        let first = engine
648            .subscribe(&pattern, AccessTier::Anon, None)
649            .expect("first subscription consumes the sole slot");
650        assert!(matches!(
651            engine.subscribe(&pattern, AccessTier::Anon, None),
652            Err(EngineError::SubscriptionLimit)
653        ));
654
655        drop(first);
656        drop(engine);
657        let _ = std::fs::remove_dir_all(root);
658    }
659
660    #[test]
661    fn engine_subscribe_denied_auth_does_not_consume_slot() {
662        let root = temp_root("subscribe-auth-slot");
663        let engine = Engine::builder()
664            .data_root(root.clone())
665            .key(SecretBytes::try_from_slice(b"key").unwrap())
666            .read_token(b"reader".to_vec())
667            .max_listen_connections(1)
668            .build()
669            .unwrap();
670        let pattern = SubscribePattern::new("*");
671
672        assert!(matches!(
673            engine.subscribe(&pattern, AccessTier::Anon, None),
674            Err(EngineError::Auth(AuthGate::Read))
675        ));
676        let subscription = engine
677            .subscribe(&pattern, AccessTier::Read, None)
678            .expect("failed auth must not consume the only slot");
679
680        drop(subscription);
681        drop(engine);
682        let _ = std::fs::remove_dir_all(root);
683    }
684
685    #[tokio::test]
686    async fn engine_subscription_closed_is_terminal() {
687        let (engine, root) = test_engine("subscribe-closed");
688        let pattern = SubscribePattern::new("*");
689        let mut subscription = engine
690            .subscribe(&pattern, AccessTier::Anon, None)
691            .expect("subscription opens before shutdown");
692
693        engine.shutdown();
694        assert!(matches!(
695            subscription.recv().await,
696            Err(SubscriptionRecvError::Closed)
697        ));
698        assert!(matches!(
699            subscription.recv().await,
700            Err(SubscriptionRecvError::Closed)
701        ));
702
703        drop(subscription);
704        drop(engine);
705        let _ = std::fs::remove_dir_all(root);
706    }
707
708    #[tokio::test]
709    async fn engine_subscription_drains_replay_before_shutdown() {
710        let (engine, root) = test_engine("subscribe-replay-before-shutdown");
711        let pattern = SubscribePattern::new("home/replay/*");
712        for name in ["home/replay/a", "home/replay/b"] {
713            let world = ValidatedWorldPath::new(name).unwrap();
714            engine
715                .replace(
716                    &world,
717                    Representation::new(Bytes::from_static(b"event"), "text/plain", Vec::new()),
718                    Preconditions::none(),
719                    AccessTier::Write,
720                )
721                .await
722                .unwrap();
723        }
724
725        let mut subscription = engine
726            .subscribe(&pattern, AccessTier::Anon, Some(0))
727            .expect("subscription opens before shutdown");
728        engine.shutdown();
729        assert_eq!(
730            subscription.recv().await.unwrap().path.as_str(),
731            "home/replay/a"
732        );
733        assert_eq!(
734            subscription.recv().await.unwrap().path.as_str(),
735            "home/replay/b"
736        );
737        assert!(matches!(
738            subscription.recv().await,
739            Err(SubscriptionRecvError::Closed)
740        ));
741
742        drop(subscription);
743        drop(engine);
744        let _ = std::fs::remove_dir_all(root);
745    }
746}