hermes_async_runtime_components/subscription/impls/
multiplex.rs1use alloc::boxed::Box;
2use alloc::sync::Arc;
3use alloc::vec::Vec;
4use core::marker::PhantomData;
5use core::ops::DerefMut;
6use core::pin::Pin;
7
8use cgp::prelude::*;
9use futures_core::stream::Stream;
10use futures_util::stream::StreamExt;
11use hermes_runtime_components::traits::channel::{
12 CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes,
13};
14use hermes_runtime_components::traits::mutex::{HasMutex, MutexOf};
15use hermes_runtime_components::traits::spawn::CanSpawnTask;
16use hermes_runtime_components::traits::stream::HasStreamType;
17use hermes_runtime_components::traits::task::Task;
18
19use crate::stream::traits::boxed::HasBoxedStreamType;
20use crate::subscription::traits::subscription::Subscription;
21
22pub trait CanMultiplexSubscription {
52 fn multiplex_subscription<T, U>(
58 &self,
59 subscription: impl Subscription<Item = T>,
60 map_item: impl Fn(T) -> U + Async,
61 ) -> Arc<dyn Subscription<Item = U>>
62 where
63 T: Async + Clone,
64 U: Async + Clone;
65}
66
67pub struct MultiplexSubscriptionTask<Runtime, S, M, T, U>
68where
69 Runtime: HasMutex + HasChannelTypes,
70 S: Subscription<Item = T>,
71 M: Fn(T) -> U + Async,
72 T: Async,
73 U: Async,
74{
75 pub subscription: S,
76 pub mapper: M,
77 pub task_senders: Arc<Runtime::Mutex<Option<Vec<Runtime::Sender<U>>>>>,
78 pub phantom: PhantomData<Runtime>,
79}
80
81impl<Runtime, S, M, T, U> Task for MultiplexSubscriptionTask<Runtime, S, M, T, U>
82where
83 Runtime: HasMutex + CanUseChannels,
84 S: Subscription<Item = T>,
85 M: Fn(T) -> U + Async,
86 T: Async,
87 U: Async + Clone,
88{
89 async fn run(self) {
90 loop {
91 let m_stream = self.subscription.subscribe().await;
92
93 match m_stream {
94 Some(stream) => {
95 let task_senders = &self.task_senders;
96 let map_item = &self.mapper;
97
98 stream
99 .for_each(|item| async move {
100 let mapped = map_item(item);
101 let mut m_senders = Runtime::acquire_mutex(task_senders).await;
102
103 if let Some(senders) = m_senders.deref_mut() {
104 let mut new_senders = Vec::new();
105
106 for sender in senders.drain(..) {
107 let send_result = Runtime::send(&sender, mapped.clone()).await;
108 if send_result.is_ok() {
111 new_senders.push(sender);
112 }
113 }
114
115 *senders = new_senders;
116 }
117 })
118 .await;
119 }
120 None => {
121 let mut senders = Runtime::acquire_mutex(&self.task_senders).await;
125 *senders = None;
126 return;
127 }
128 }
129 }
130 }
131}
132
133impl<Runtime> CanMultiplexSubscription for Runtime
134where
135 Runtime: CanSpawnTask
136 + HasMutex
137 + CanCreateChannels
138 + CanUseChannels
139 + CanStreamReceiver
140 + HasBoxedStreamType,
141{
142 fn multiplex_subscription<T, U>(
143 &self,
144 subscription: impl Subscription<Item = T>,
145 mapper: impl Fn(T) -> U + Async,
146 ) -> Arc<dyn Subscription<Item = U>>
147 where
148 T: Async + Clone,
149 U: Async + Clone,
150 {
151 let stream_senders = Arc::new(Runtime::new_mutex(Some(Vec::new())));
152
153 let task = MultiplexSubscriptionTask {
154 subscription,
155 mapper,
156 task_senders: stream_senders.clone(),
157 phantom: PhantomData::<Runtime>,
158 };
159
160 self.spawn_task(task);
161
162 let subscription: MultiplexingSubscription<Runtime, U> =
163 MultiplexingSubscription { stream_senders };
164
165 Arc::new(subscription)
166 }
167}
168
169type StreamSender<Runtime, T> = <Runtime as HasChannelTypes>::Sender<T>;
170
171type StreamSenders<Runtime, T> = Arc<MutexOf<Runtime, Option<Vec<StreamSender<Runtime, T>>>>>;
172
173pub struct MultiplexingSubscription<Runtime, T>
174where
175 T: Async,
176 Runtime: HasChannelTypes + HasMutex,
177{
178 pub stream_senders: StreamSenders<Runtime, T>,
179}
180
181impl<Runtime, T> Clone for MultiplexingSubscription<Runtime, T>
182where
183 T: Async,
184 Runtime: HasChannelTypes
185 + HasMutex
186 + HasStreamType<Stream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
187{
188 fn clone(&self) -> Self {
189 Self {
190 stream_senders: self.stream_senders.clone(),
191 }
192 }
193}
194
195#[async_trait::async_trait]
196impl<Runtime, T> Subscription for MultiplexingSubscription<Runtime, T>
197where
198 T: Async,
199 Runtime: HasMutex + CanCreateChannels + CanStreamReceiver + HasBoxedStreamType,
200{
201 type Item = T;
202
203 async fn subscribe(&self) -> Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>
204 where
205 T: Async,
206 {
207 let mut m_senders = Runtime::acquire_mutex(&self.stream_senders).await;
208
209 match m_senders.as_mut() {
210 Some(senders) => {
211 let (sender, receiver) = Runtime::new_channel();
212 senders.push(sender);
213
214 let stream = Runtime::receiver_to_stream(receiver);
215
216 Some(Runtime::to_boxed_stream(stream))
217 }
218 None => None,
219 }
220 }
221}