hermes_async_runtime_components/subscription/impls/
stream.rs1use alloc::sync::Arc;
2use alloc::vec::Vec;
3use core::ops::DerefMut;
4
5use cgp::prelude::*;
6use futures_core::stream::Stream;
7use futures_util::stream::StreamExt;
8use hermes_runtime_components::traits::channel::{
9 CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes,
10};
11use hermes_runtime_components::traits::mutex::HasMutex;
12use hermes_runtime_components::traits::spawn::CanSpawnTask;
13use hermes_runtime_components::traits::task::Task;
14
15use crate::stream::traits::boxed::HasBoxedStreamType;
16use crate::subscription::impls::multiplex::MultiplexingSubscription;
17use crate::subscription::traits::subscription::Subscription;
18
19pub trait CanStreamSubscription {
28 fn stream_subscription<S, T>(&self, stream: S) -> Arc<dyn Subscription<Item = T>>
29 where
30 S: Stream<Item = T> + Async,
31 T: Async + Clone;
32}
33
34pub struct StreamSubscriptionTask<Runtime, S, T>
35where
36 Runtime: HasMutex + HasChannelTypes,
37 T: Async,
38 S: Stream<Item = T> + Async,
39{
40 pub stream: S,
41 pub task_senders: Arc<Runtime::Mutex<Option<Vec<Runtime::Sender<T>>>>>,
42}
43
44impl<Runtime, S, T> Task for StreamSubscriptionTask<Runtime, S, T>
45where
46 Runtime: HasMutex + CanCreateChannels + CanUseChannels + CanStreamReceiver + HasBoxedStreamType,
47 T: Clone + Async,
48 S: Stream<Item = T> + Async,
49{
50 async fn run(self) {
51 let task_senders = &self.task_senders;
52
53 self.stream
54 .for_each(|item| async move {
55 let mut m_senders = Runtime::acquire_mutex(task_senders).await;
56
57 if let Some(senders) = m_senders.deref_mut() {
58 let mut new_senders = Vec::new();
59
60 for sender in senders.drain(..) {
61 let send_result = Runtime::send(&sender, item.clone()).await;
62 if send_result.is_ok() {
65 new_senders.push(sender);
66 }
67 }
68
69 *senders = new_senders;
70 }
71 })
72 .await;
73
74 let mut senders = Runtime::acquire_mutex(&self.task_senders).await;
75 *senders = None;
76 }
77}
78
79impl<Runtime> CanStreamSubscription for Runtime
80where
81 Runtime: CanSpawnTask
82 + HasMutex
83 + CanCreateChannels
84 + CanUseChannels
85 + CanStreamReceiver
86 + HasBoxedStreamType,
87{
88 fn stream_subscription<S, T>(&self, stream: S) -> Arc<dyn Subscription<Item = T>>
89 where
90 S: Stream<Item = T> + Async,
91 T: Async + Clone,
92 {
93 let stream_senders = Arc::new(Runtime::new_mutex(Some(Vec::new())));
94
95 let task: StreamSubscriptionTask<Runtime, S, T> = StreamSubscriptionTask {
96 stream,
97 task_senders: stream_senders.clone(),
98 };
99
100 self.spawn_task(task);
101
102 let subscription: MultiplexingSubscription<Runtime, T> =
103 MultiplexingSubscription { stream_senders };
104
105 Arc::new(subscription)
106 }
107}