hermes_async_runtime_components/channel/
impls.rs

1use alloc::boxed::Box;
2use alloc::sync::Arc;
3
4use cgp::prelude::*;
5use futures_channel::mpsc;
6use futures_util::lock::Mutex;
7use futures_util::stream::StreamExt;
8use hermes_runtime_components::traits::channel::{
9    ChannelCreator, ChannelUser, ProvideChannelType, ReceiverStreamer, SenderCloner,
10};
11
12use crate::channel::traits::{HasUnboundedChannelType, UnboundedChannelTypeProvider};
13use crate::channel::types::ChannelClosedError;
14use crate::stream::traits::boxed::HasBoxedStreamType;
15
16pub struct ProvideUnboundedChannelType;
17
18impl<Runtime> ProvideChannelType<Runtime> for ProvideUnboundedChannelType
19where
20    Runtime: Async,
21{
22    type Sender<T> = Arc<Mutex<mpsc::UnboundedSender<T>>>
23    where
24        T: Async;
25
26    type Receiver<T> = mpsc::UnboundedReceiver<T>
27    where
28        T: Async;
29}
30
31impl<Runtime> UnboundedChannelTypeProvider<Runtime> for ProvideUnboundedChannelType
32where
33    Runtime: Async,
34{
35    fn from_unbounded_sender<T>(sender: Arc<Mutex<mpsc::UnboundedSender<T>>>) -> Self::Sender<T>
36    where
37        T: Async,
38    {
39        sender
40    }
41
42    fn from_unbounded_receiver<T>(receiver: mpsc::UnboundedReceiver<T>) -> Self::Receiver<T>
43    where
44        T: Async,
45    {
46        receiver
47    }
48
49    fn to_unbounded_receiver<T>(receiver: Self::Receiver<T>) -> mpsc::UnboundedReceiver<T>
50    where
51        T: Async,
52    {
53        receiver
54    }
55    fn to_unbounded_sender_ref<T>(sender: &Self::Sender<T>) -> &Arc<Mutex<mpsc::UnboundedSender<T>>>
56    where
57        T: Async,
58    {
59        sender
60    }
61
62    fn to_unbounded_receiver_ref<T>(
63        receiver: &mut Self::Receiver<T>,
64    ) -> &mut mpsc::UnboundedReceiver<T>
65    where
66        T: Async,
67    {
68        receiver
69    }
70}
71
72impl<Runtime> ChannelCreator<Runtime> for ProvideUnboundedChannelType
73where
74    Runtime: HasUnboundedChannelType,
75{
76    fn new_channel<T>() -> (Runtime::Sender<T>, Runtime::Receiver<T>)
77    where
78        T: Async,
79    {
80        let (sender, receiver) = mpsc::unbounded();
81
82        (
83            Runtime::from_unbounded_sender(Arc::new(Mutex::new(sender))),
84            Runtime::from_unbounded_receiver(receiver),
85        )
86    }
87}
88
89impl<Runtime> ChannelUser<Runtime> for ProvideUnboundedChannelType
90where
91    Runtime: HasUnboundedChannelType + CanRaiseError<ChannelClosedError>,
92{
93    async fn send<T>(sender: &Runtime::Sender<T>, value: T) -> Result<(), Runtime::Error>
94    where
95        T: Async,
96    {
97        Runtime::to_unbounded_sender_ref(sender)
98            .lock()
99            .await
100            .unbounded_send(value)
101            .map_err(|_| Runtime::raise_error(ChannelClosedError))
102    }
103
104    async fn receive<T>(receiver: &mut Runtime::Receiver<T>) -> Result<T, Runtime::Error>
105    where
106        T: Async,
107    {
108        Runtime::to_unbounded_receiver_ref(receiver)
109            .next()
110            .await
111            .ok_or(Runtime::raise_error(ChannelClosedError))
112    }
113
114    fn try_receive<T>(receiver: &mut Runtime::Receiver<T>) -> Result<Option<T>, Runtime::Error>
115    where
116        T: Async,
117    {
118        let res = Runtime::to_unbounded_receiver_ref(receiver).try_next();
119
120        // The result semantics of the futures version of receiver is slightly different
121        match res {
122            Ok(Some(res)) => Ok(Some(res)),
123            // Ok(None) means that the channel is closed
124            Ok(None) => Err(Runtime::raise_error(ChannelClosedError)),
125            // Error means that there is no meesage currently available
126            Err(_) => Ok(None),
127        }
128    }
129}
130
131impl<Runtime> ReceiverStreamer<Runtime> for ProvideUnboundedChannelType
132where
133    Runtime: HasUnboundedChannelType + HasBoxedStreamType,
134{
135    fn receiver_to_stream<T>(receiver: Runtime::Receiver<T>) -> Runtime::Stream<T>
136    where
137        T: Async,
138    {
139        Runtime::from_boxed_stream(Box::pin(Runtime::to_unbounded_receiver(receiver)))
140    }
141}
142
143impl<Runtime> SenderCloner<Runtime> for ProvideUnboundedChannelType
144where
145    Runtime: HasUnboundedChannelType,
146{
147    fn clone_sender<T>(sender: &Runtime::Sender<T>) -> Runtime::Sender<T>
148    where
149        T: Async,
150    {
151        Runtime::from_unbounded_sender(Runtime::to_unbounded_sender_ref(sender).clone())
152    }
153}