hermes_async_runtime_components/subscription/impls/
closure.rs1use alloc::boxed::Box;
2use alloc::sync::Arc;
3use core::future::Future;
4use core::pin::Pin;
5
6use cgp::prelude::*;
7use futures_core::stream::Stream;
8use hermes_runtime_components::traits::mutex::{HasMutex, MutexOf};
9
10use crate::subscription::traits::subscription::Subscription;
11
12pub trait CanCreateClosureSubscription {
22 fn new_closure_subscription<T: Async>(
23 subscribe: impl Fn() -> Pin<
24 Box<
25 dyn Future<
26 Output = Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
27 > + Send
28 + 'static,
29 >,
30 > + Send
31 + Sync
32 + 'static,
33 ) -> Arc<dyn Subscription<Item = T>>;
34}
35
36impl<Runtime> CanCreateClosureSubscription for Runtime
37where
38 Runtime: HasMutex,
39{
40 fn new_closure_subscription<T: Async>(
41 subscribe: impl Fn() -> Pin<
42 Box<
43 dyn Future<
44 Output = Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
45 > + Send
46 + 'static,
47 >,
48 > + Send
49 + Sync
50 + 'static,
51 ) -> Arc<dyn Subscription<Item = T>> {
52 let subscription: SubscriptionClosure<Runtime, T> = SubscriptionClosure {
53 terminated: Runtime::new_mutex(false),
54 subscribe: Box::new(subscribe),
55 };
56
57 Arc::new(subscription)
58 }
59}
60
61struct SubscriptionClosure<Runtime, T>
62where
63 Runtime: HasMutex,
64{
65 terminated: MutexOf<Runtime, bool>,
66 subscribe: Box<
67 dyn Fn() -> Pin<
68 Box<
69 dyn Future<
70 Output = Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
71 > + Send
72 + 'static,
73 >,
74 > + Send
75 + Sync
76 + 'static,
77 >,
78}
79
80#[async_trait::async_trait]
81impl<Runtime, T: Async> Subscription for SubscriptionClosure<Runtime, T>
82where
83 Runtime: HasMutex,
84{
85 type Item = T;
86
87 async fn subscribe(
88 &self,
89 ) -> Option<Pin<Box<dyn Stream<Item = Self::Item> + Send + Sync + 'static>>> {
90 let mut terminated = Runtime::acquire_mutex(&self.terminated).await;
91
92 if *terminated {
93 None
96 } else {
97 let m_stream = (self.subscribe)().await;
98
99 if m_stream.is_none() {
100 *terminated = true;
101 }
102
103 m_stream
104 }
105 }
106}