hermes_async_runtime_components/channel/
impls.rs1use alloc::boxed::Box;
2use alloc::sync::Arc;
3
4use cgp::prelude::*;
5use futures_channel::mpsc;
6use futures_util::lock::Mutex;
7use futures_util::stream::StreamExt;
8use hermes_runtime_components::traits::channel::{
9 ChannelCreator, ChannelUser, ProvideChannelType, ReceiverStreamer, SenderCloner,
10};
11
12use crate::channel::traits::{HasUnboundedChannelType, UnboundedChannelTypeProvider};
13use crate::channel::types::ChannelClosedError;
14use crate::stream::traits::boxed::HasBoxedStreamType;
15
16pub struct ProvideUnboundedChannelType;
17
18impl<Runtime> ProvideChannelType<Runtime> for ProvideUnboundedChannelType
19where
20 Runtime: Async,
21{
22 type Sender<T> = Arc<Mutex<mpsc::UnboundedSender<T>>>
23 where
24 T: Async;
25
26 type Receiver<T> = mpsc::UnboundedReceiver<T>
27 where
28 T: Async;
29}
30
31impl<Runtime> UnboundedChannelTypeProvider<Runtime> for ProvideUnboundedChannelType
32where
33 Runtime: Async,
34{
35 fn from_unbounded_sender<T>(sender: Arc<Mutex<mpsc::UnboundedSender<T>>>) -> Self::Sender<T>
36 where
37 T: Async,
38 {
39 sender
40 }
41
42 fn from_unbounded_receiver<T>(receiver: mpsc::UnboundedReceiver<T>) -> Self::Receiver<T>
43 where
44 T: Async,
45 {
46 receiver
47 }
48
49 fn to_unbounded_receiver<T>(receiver: Self::Receiver<T>) -> mpsc::UnboundedReceiver<T>
50 where
51 T: Async,
52 {
53 receiver
54 }
55 fn to_unbounded_sender_ref<T>(sender: &Self::Sender<T>) -> &Arc<Mutex<mpsc::UnboundedSender<T>>>
56 where
57 T: Async,
58 {
59 sender
60 }
61
62 fn to_unbounded_receiver_ref<T>(
63 receiver: &mut Self::Receiver<T>,
64 ) -> &mut mpsc::UnboundedReceiver<T>
65 where
66 T: Async,
67 {
68 receiver
69 }
70}
71
72impl<Runtime> ChannelCreator<Runtime> for ProvideUnboundedChannelType
73where
74 Runtime: HasUnboundedChannelType,
75{
76 fn new_channel<T>() -> (Runtime::Sender<T>, Runtime::Receiver<T>)
77 where
78 T: Async,
79 {
80 let (sender, receiver) = mpsc::unbounded();
81
82 (
83 Runtime::from_unbounded_sender(Arc::new(Mutex::new(sender))),
84 Runtime::from_unbounded_receiver(receiver),
85 )
86 }
87}
88
89impl<Runtime> ChannelUser<Runtime> for ProvideUnboundedChannelType
90where
91 Runtime: HasUnboundedChannelType + CanRaiseError<ChannelClosedError>,
92{
93 async fn send<T>(sender: &Runtime::Sender<T>, value: T) -> Result<(), Runtime::Error>
94 where
95 T: Async,
96 {
97 Runtime::to_unbounded_sender_ref(sender)
98 .lock()
99 .await
100 .unbounded_send(value)
101 .map_err(|_| Runtime::raise_error(ChannelClosedError))
102 }
103
104 async fn receive<T>(receiver: &mut Runtime::Receiver<T>) -> Result<T, Runtime::Error>
105 where
106 T: Async,
107 {
108 Runtime::to_unbounded_receiver_ref(receiver)
109 .next()
110 .await
111 .ok_or(Runtime::raise_error(ChannelClosedError))
112 }
113
114 fn try_receive<T>(receiver: &mut Runtime::Receiver<T>) -> Result<Option<T>, Runtime::Error>
115 where
116 T: Async,
117 {
118 let res = Runtime::to_unbounded_receiver_ref(receiver).try_next();
119
120 match res {
122 Ok(Some(res)) => Ok(Some(res)),
123 Ok(None) => Err(Runtime::raise_error(ChannelClosedError)),
125 Err(_) => Ok(None),
127 }
128 }
129}
130
131impl<Runtime> ReceiverStreamer<Runtime> for ProvideUnboundedChannelType
132where
133 Runtime: HasUnboundedChannelType + HasBoxedStreamType,
134{
135 fn receiver_to_stream<T>(receiver: Runtime::Receiver<T>) -> Runtime::Stream<T>
136 where
137 T: Async,
138 {
139 Runtime::from_boxed_stream(Box::pin(Runtime::to_unbounded_receiver(receiver)))
140 }
141}
142
143impl<Runtime> SenderCloner<Runtime> for ProvideUnboundedChannelType
144where
145 Runtime: HasUnboundedChannelType,
146{
147 fn clone_sender<T>(sender: &Runtime::Sender<T>) -> Runtime::Sender<T>
148 where
149 T: Async,
150 {
151 Runtime::from_unbounded_sender(Runtime::to_unbounded_sender_ref(sender).clone())
152 }
153}