hermes_async_runtime_components/subscription/impls/
closure.rs

1use alloc::boxed::Box;
2use alloc::sync::Arc;
3use core::future::Future;
4use core::pin::Pin;
5
6use cgp::prelude::*;
7use futures_core::stream::Stream;
8use hermes_runtime_components::traits::mutex::{HasMutex, MutexOf};
9
10use crate::subscription::traits::subscription::Subscription;
11
12/**
13   An auto trait that is implemented by all runtime contexts that implement
14   [`HasMutex`]. This allows simple creation of [`Subscription`] values by
15   wrapping an async closure that returns the same thing as
16   [`subscribe`](Subscription::subscribe).
17
18   The returned [`Subscription`] also implements guard to skip calling the
19   underlying closure once the closure returns `None`.
20*/
21pub trait CanCreateClosureSubscription {
22    fn new_closure_subscription<T: Async>(
23        subscribe: impl Fn() -> Pin<
24                Box<
25                    dyn Future<
26                            Output = Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
27                        > + Send
28                        + 'static,
29                >,
30            > + Send
31            + Sync
32            + 'static,
33    ) -> Arc<dyn Subscription<Item = T>>;
34}
35
36impl<Runtime> CanCreateClosureSubscription for Runtime
37where
38    Runtime: HasMutex,
39{
40    fn new_closure_subscription<T: Async>(
41        subscribe: impl Fn() -> Pin<
42                Box<
43                    dyn Future<
44                            Output = Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
45                        > + Send
46                        + 'static,
47                >,
48            > + Send
49            + Sync
50            + 'static,
51    ) -> Arc<dyn Subscription<Item = T>> {
52        let subscription: SubscriptionClosure<Runtime, T> = SubscriptionClosure {
53            terminated: Runtime::new_mutex(false),
54            subscribe: Box::new(subscribe),
55        };
56
57        Arc::new(subscription)
58    }
59}
60
61struct SubscriptionClosure<Runtime, T>
62where
63    Runtime: HasMutex,
64{
65    terminated: MutexOf<Runtime, bool>,
66    subscribe: Box<
67        dyn Fn() -> Pin<
68                Box<
69                    dyn Future<
70                            Output = Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
71                        > + Send
72                        + 'static,
73                >,
74            > + Send
75            + Sync
76            + 'static,
77    >,
78}
79
80#[async_trait::async_trait]
81impl<Runtime, T: Async> Subscription for SubscriptionClosure<Runtime, T>
82where
83    Runtime: HasMutex,
84{
85    type Item = T;
86
87    async fn subscribe(
88        &self,
89    ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>> {
90        let mut terminated = Runtime::acquire_mutex(&self.terminated).await;
91
92        if *terminated {
93            // If a subscription is terminated, it always return `None` from
94            // that point onward.
95            None
96        } else {
97            let m_stream = (self.subscribe)().await;
98
99            if m_stream.is_none() {
100                *terminated = true;
101            }
102
103            m_stream
104        }
105    }
106}