hermes_async_runtime_components/subscription/impls/
multiplex.rs

1use alloc::boxed::Box;
2use alloc::sync::Arc;
3use alloc::vec::Vec;
4use core::marker::PhantomData;
5use core::ops::DerefMut;
6use core::pin::Pin;
7
8use cgp::prelude::*;
9use futures_core::stream::Stream;
10use futures_util::stream::StreamExt;
11use hermes_runtime_components::traits::channel::{
12    CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes,
13};
14use hermes_runtime_components::traits::mutex::{HasMutex, MutexOf};
15use hermes_runtime_components::traits::spawn::CanSpawnTask;
16use hermes_runtime_components::traits::stream::HasStreamType;
17use hermes_runtime_components::traits::task::Task;
18
19use crate::stream::traits::boxed::HasBoxedStreamType;
20use crate::subscription::traits::subscription::Subscription;
21
22/**
23   Multiplex the incoming [`Stream`] provided by an underlying [`Subscription`]
24   into multiple outgoing [`Stream`]s through a background task. This is
25   an auto trait implemented by all runtime contexts that implement
26   `HasSpawner`, [`HasMutex`], [`CanCreateChannels`], [`CanUseChannels`],
27   and [`CanStreamReceiver`].
28
29   This can be used to improve the efficiency of naive subscriptions created from
30   `CanCreateClosureSubscription`.
31   For example, one can first create a subscription closure that establishes
32   new network connection each time `subscribe` is called. The subscription
33   closure is then passed to [`multiplex_subscription`](Self::multiplex_subscription),
34   which would return a wrapped subscription which would only create one
35   network connection at a time.
36
37   The multiplexed subscription also attempts to recover by calling the
38   [`subscribe`](Subscription::subscribe) method of the underlying subsciption
39   again, if a given [`Stream`] terminates. This would allow for auto recovery
40   from underlying errors such as network disconnection. The multiplexed
41   subscription would only terminate if the underlying
42   [`subscribe`](Subscription::subscribe) returns `None`.
43
44   The streams returned from the [`subscribe`](Subscription::subscribe) of
45   the multiplexed subscription will automatically resume streaming from
46   a new underlying stream, if the original underlying stream is terminated.
47   However, since a consumer cannot know if a [`Subscription`] implementation
48   is multiplexed or not, it should always retry calling
49   [`subscribe`](Subscription::subscribe) in case a [`Stream`] ends.
50*/
51pub trait CanMultiplexSubscription {
52    /**
53       Multiplex a given subscription, with a mapper function that maps the
54       item coming from the underlying subscription from `T` to `U`. Returns
55       a new multiplexed subscription that shares the same underlying [`Stream`].
56    */
57    fn multiplex_subscription<T, U>(
58        &self,
59        subscription: impl Subscription<Item = T>,
60        map_item: impl Fn(T) -> U + Async,
61    ) -> Arc<dyn Subscription<Item = U>>
62    where
63        T: Async + Clone,
64        U: Async + Clone;
65}
66
67pub struct MultiplexSubscriptionTask<Runtime, S, M, T, U>
68where
69    Runtime: HasMutex + HasChannelTypes,
70    S: Subscription<Item = T>,
71    M: Fn(T) -> U + Async,
72    T: Async,
73    U: Async,
74{
75    pub subscription: S,
76    pub mapper: M,
77    pub task_senders: Arc<Runtime::Mutex<Option<Vec<Runtime::Sender<U>>>>>,
78    pub phantom: PhantomData<Runtime>,
79}
80
81impl<Runtime, S, M, T, U> Task for MultiplexSubscriptionTask<Runtime, S, M, T, U>
82where
83    Runtime: HasMutex + CanUseChannels,
84    S: Subscription<Item = T>,
85    M: Fn(T) -> U + Async,
86    T: Async,
87    U: Async + Clone,
88{
89    async fn run(self) {
90        loop {
91            let m_stream = self.subscription.subscribe().await;
92
93            match m_stream {
94                Some(stream) => {
95                    let task_senders = &self.task_senders;
96                    let map_item = &self.mapper;
97
98                    stream
99                        .for_each(|item| async move {
100                            let mapped = map_item(item);
101                            let mut m_senders = Runtime::acquire_mutex(task_senders).await;
102
103                            if let Some(senders) = m_senders.deref_mut() {
104                                let mut new_senders = Vec::new();
105
106                                for sender in senders.drain(..) {
107                                    let send_result = Runtime::send(&sender, mapped.clone()).await;
108                                    // Remove senders where the receiver side has been dropped,
109                                    // i.e. keep the ones where sending is successful
110                                    if send_result.is_ok() {
111                                        new_senders.push(sender);
112                                    }
113                                }
114
115                                *senders = new_senders;
116                            }
117                        })
118                        .await;
119                }
120                None => {
121                    // If the underlying subscription returns `None` from `subscribe`, clears the senders
122                    // queue inside the mutex and set it to `None` and return. This will cause subsequent
123                    // calls to `subscribe` for the multiplexed subscription to also return `None`.
124                    let mut senders = Runtime::acquire_mutex(&self.task_senders).await;
125                    *senders = None;
126                    return;
127                }
128            }
129        }
130    }
131}
132
133impl<Runtime> CanMultiplexSubscription for Runtime
134where
135    Runtime: CanSpawnTask
136        + HasMutex
137        + CanCreateChannels
138        + CanUseChannels
139        + CanStreamReceiver
140        + HasBoxedStreamType,
141{
142    fn multiplex_subscription<T, U>(
143        &self,
144        subscription: impl Subscription<Item = T>,
145        mapper: impl Fn(T) -> U + Async,
146    ) -> Arc<dyn Subscription<Item = U>>
147    where
148        T: Async + Clone,
149        U: Async + Clone,
150    {
151        let stream_senders = Arc::new(Runtime::new_mutex(Some(Vec::new())));
152
153        let task = MultiplexSubscriptionTask {
154            subscription,
155            mapper,
156            task_senders: stream_senders.clone(),
157            phantom: PhantomData::<Runtime>,
158        };
159
160        self.spawn_task(task);
161
162        let subscription: MultiplexingSubscription<Runtime, U> =
163            MultiplexingSubscription { stream_senders };
164
165        Arc::new(subscription)
166    }
167}
168
169type StreamSender<Runtime, T> = <Runtime as HasChannelTypes>::Sender<T>;
170
171type StreamSenders<Runtime, T> = Arc<MutexOf<Runtime, Option<Vec<StreamSender<Runtime, T>>>>>;
172
173pub struct MultiplexingSubscription<Runtime, T>
174where
175    T: Async,
176    Runtime: HasChannelTypes + HasMutex,
177{
178    pub stream_senders: StreamSenders<Runtime, T>,
179}
180
181impl<Runtime, T> Clone for MultiplexingSubscription<Runtime, T>
182where
183    T: Async,
184    Runtime: HasChannelTypes
185        + HasMutex
186        + HasStreamType<Stream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>,
187{
188    fn clone(&self) -> Self {
189        Self {
190            stream_senders: self.stream_senders.clone(),
191        }
192    }
193}
194
195#[async_trait::async_trait]
196impl<Runtime, T> Subscription for MultiplexingSubscription<Runtime, T>
197where
198    T: Async,
199    Runtime: HasMutex + CanCreateChannels + CanStreamReceiver + HasBoxedStreamType,
200{
201    type Item = T;
202
203    async fn subscribe(&self) -> Option<Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>>
204    where
205        T: Async,
206    {
207        let mut m_senders = Runtime::acquire_mutex(&self.stream_senders).await;
208
209        match m_senders.as_mut() {
210            Some(senders) => {
211                let (sender, receiver) = Runtime::new_channel();
212                senders.push(sender);
213
214                let stream = Runtime::receiver_to_stream(receiver);
215
216                Some(Runtime::to_boxed_stream(stream))
217            }
218            None => None,
219        }
220    }
221}