hermes_async_runtime_components/subscription/impls/
stream.rs

1use alloc::sync::Arc;
2use alloc::vec::Vec;
3use core::ops::DerefMut;
4
5use cgp::prelude::*;
6use futures_core::stream::Stream;
7use futures_util::stream::StreamExt;
8use hermes_runtime_components::traits::channel::{
9    CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes,
10};
11use hermes_runtime_components::traits::mutex::HasMutex;
12use hermes_runtime_components::traits::spawn::CanSpawnTask;
13use hermes_runtime_components::traits::task::Task;
14
15use crate::stream::traits::boxed::HasBoxedStreamType;
16use crate::subscription::impls::multiplex::MultiplexingSubscription;
17use crate::subscription::traits::subscription::Subscription;
18
19/**
20   Allows multiplexing of a single [`Stream`] into a subscription.
21   This is an auto trait implemented by all runtime contexts that implement
22   `HasSpawner`, [`HasMutex`], [`CanCreateChannels`], [`CanUseChannels`],
23   and [`CanStreamReceiver`].
24
25   When the stream terminates, the subscription also terminates.
26*/
27pub trait CanStreamSubscription {
28    fn stream_subscription<S, T>(&self, stream: S) -> Arc<dyn Subscription<Item = T>>
29    where
30        S: Stream<Item = T> + Async,
31        T: Async + Clone;
32}
33
34pub struct StreamSubscriptionTask<Runtime, S, T>
35where
36    Runtime: HasMutex + HasChannelTypes,
37    T: Async,
38    S: Stream<Item = T> + Async,
39{
40    pub stream: S,
41    pub task_senders: Arc<Runtime::Mutex<Option<Vec<Runtime::Sender<T>>>>>,
42}
43
44impl<Runtime, S, T> Task for StreamSubscriptionTask<Runtime, S, T>
45where
46    Runtime: HasMutex + CanCreateChannels + CanUseChannels + CanStreamReceiver + HasBoxedStreamType,
47    T: Clone + Async,
48    S: Stream<Item = T> + Async,
49{
50    async fn run(self) {
51        let task_senders = &self.task_senders;
52
53        self.stream
54            .for_each(|item| async move {
55                let mut m_senders = Runtime::acquire_mutex(task_senders).await;
56
57                if let Some(senders) = m_senders.deref_mut() {
58                    let mut new_senders = Vec::new();
59
60                    for sender in senders.drain(..) {
61                        let send_result = Runtime::send(&sender, item.clone()).await;
62                        // Remove senders where the receiver side has been dropped,
63                        // i.e. keep the ones where sending is successful
64                        if send_result.is_ok() {
65                            new_senders.push(sender);
66                        }
67                    }
68
69                    *senders = new_senders;
70                }
71            })
72            .await;
73
74        let mut senders = Runtime::acquire_mutex(&self.task_senders).await;
75        *senders = None;
76    }
77}
78
79impl<Runtime> CanStreamSubscription for Runtime
80where
81    Runtime: CanSpawnTask
82        + HasMutex
83        + CanCreateChannels
84        + CanUseChannels
85        + CanStreamReceiver
86        + HasBoxedStreamType,
87{
88    fn stream_subscription<S, T>(&self, stream: S) -> Arc<dyn Subscription<Item = T>>
89    where
90        S: Stream<Item = T> + Async,
91        T: Async + Clone,
92    {
93        let stream_senders = Arc::new(Runtime::new_mutex(Some(Vec::new())));
94
95        let task: StreamSubscriptionTask<Runtime, S, T> = StreamSubscriptionTask {
96            stream,
97            task_senders: stream_senders.clone(),
98        };
99
100        self.spawn_task(task);
101
102        let subscription: MultiplexingSubscription<Runtime, T> =
103            MultiplexingSubscription { stream_senders };
104
105        Arc::new(subscription)
106    }
107}