ferrite_session/internal/base/channel/
impls.rs1use serde::{
2 self,
3 Deserialize,
4 Serialize,
5};
6use tokio::task;
7
8use super::{
9 functions::*,
10 traits::ForwardChannel,
11 types::*,
12};
13use crate::internal::functional::*;
14
15impl<T> Clone for Sender<T>
16{
17 fn clone(&self) -> Sender<T>
18 {
19 Sender(self.0.clone())
20 }
21}
22
23impl<T> Clone for Receiver<T>
24{
25 fn clone(&self) -> Receiver<T>
26 {
27 Receiver(self.0.clone())
28 }
29}
30
31impl<T> Sender<T>
32{
33 pub fn send(
34 &self,
35 msg: T,
36 ) -> Result<(), SendError>
37 {
38 self
39 .0
40 .send(msg)
41 .map_err(|_| SendError(String::from("failed to send")))
42 }
43}
44
45impl ForwardChannel for ()
46{
47 fn forward_to(
48 self,
49 _: OpaqueSender,
50 _: OpaqueReceiver,
51 )
52 {
53 }
54
55 fn forward_from(
56 _: OpaqueSender,
57 _: OpaqueReceiver,
58 ) -> Self
59 {
60 }
61}
62
63impl<T> ForwardChannel for SenderOnce<T>
64where
65 T: ForwardChannel,
66{
67 fn forward_to(
68 self,
69 sender: OpaqueSender,
70 receiver: OpaqueReceiver,
71 )
72 {
73 task::spawn_blocking(move || {
74 receiver.recv::<()>().unwrap();
75
76 let payload = T::forward_from(sender, receiver);
77
78 self.send(payload).unwrap();
79 });
80 }
81
82 fn forward_from(
83 sender1: OpaqueSender,
84 receiver1: OpaqueReceiver,
85 ) -> Self
86 {
87 let (sender2, receiver2) = once_channel();
88
89 task::spawn(async move {
90 let payload: T = receiver2.recv().await.unwrap();
91
92 task::spawn_blocking(move || {
93 sender1.send(());
94
95 payload.forward_to(sender1, receiver1);
96 });
97 });
98
99 sender2
100 }
101}
102
103impl<T> ForwardChannel for ReceiverOnce<T>
104where
105 T: ForwardChannel,
106{
107 fn forward_to(
108 self,
109 sender1: OpaqueSender,
110 receiver1: OpaqueReceiver,
111 )
112 {
113 task::spawn(async move {
114 let channel = self.recv().await.unwrap();
115
116 task::spawn_blocking(move || {
117 sender1.send(());
118
119 channel.forward_to(sender1, receiver1);
120 });
121 });
122 }
123
124 fn forward_from(
125 sender1: OpaqueSender,
126 receiver1: OpaqueReceiver,
127 ) -> Self
128 {
129 let (sender2, receiver2) = once_channel();
130
131 task::spawn_blocking(move || {
132 receiver1.recv::<()>().unwrap();
133
134 let channel = T::forward_from(sender1, receiver1);
135
136 sender2.send(channel).unwrap();
137 });
138
139 receiver2
140 }
141}
142
143impl<T> ForwardChannel for Value<T>
144where
145 T: Send + 'static,
146 T: Serialize + for<'de> Deserialize<'de>,
147{
148 fn forward_to(
149 self,
150 sender1: OpaqueSender,
151 _receiver1: OpaqueReceiver,
152 )
153 {
154 let Value(payload) = self;
155
156 sender1.send(payload);
157 }
158
159 fn forward_from(
160 _sender1: OpaqueSender,
161 receiver1: OpaqueReceiver,
162 ) -> Self
163 {
164 let payload = receiver1.recv().unwrap();
165
166 Value(payload)
167 }
168}
169
170impl<T, C> ForwardChannel for (Value<T>, C)
171where
172 T: Send + 'static,
173 T: Serialize + for<'de> Deserialize<'de>,
174 C: ForwardChannel,
175{
176 fn forward_to(
177 self,
178 sender1: OpaqueSender,
179 receiver1: OpaqueReceiver,
180 )
181 {
182 let (Value(payload), channel) = self;
183
184 task::spawn_blocking(move || {
185 sender1.send(payload);
186
187 channel.forward_to(sender1, receiver1)
188 });
189 }
190
191 fn forward_from(
192 sender1: OpaqueSender,
193 receiver1: OpaqueReceiver,
194 ) -> Self
195 {
196 let payload = receiver1.recv().unwrap();
197
198 let channel = C::forward_from(sender1, receiver1);
199
200 (Value(payload), channel)
201 }
202}
203
204impl<F, X, T> ForwardChannel for App<'static, F, X>
205where
206 X: 'static,
207 F: 'static,
208 F: TypeApp<'static, X, Applied = T>,
209 T: ForwardChannel,
210{
211 fn forward_to(
212 self,
213 sender: OpaqueSender,
214 receiver: OpaqueReceiver,
215 )
216 {
217 self.get_applied().forward_to(sender, receiver)
218 }
219
220 fn forward_from(
221 sender: OpaqueSender,
222 receiver: OpaqueReceiver,
223 ) -> Self
224 {
225 App::new(T::forward_from(sender, receiver))
226 }
227}
228
229impl<Row, F, T> ForwardChannel for AppSum<'static, Row, F>
230where
231 F: TyCon,
232 F: Send + 'static,
233 Row: 'static,
234 Row: SumApp<'static, F, Applied = T>,
235 T: ForwardChannel,
236{
237 fn forward_to(
238 self,
239 sender: OpaqueSender,
240 receiver: OpaqueReceiver,
241 )
242 {
243 self.get_sum().forward_to(sender, receiver)
244 }
245
246 fn forward_from(
247 sender: OpaqueSender,
248 receiver: OpaqueReceiver,
249 ) -> Self
250 {
251 AppSum::new(T::forward_from(sender, receiver))
252 }
253}
254
255impl<A, B> ForwardChannel for Sum<A, B>
256where
257 A: ForwardChannel,
258 B: ForwardChannel,
259{
260 fn forward_to(
261 self,
262 sender1: OpaqueSender,
263 receiver1: OpaqueReceiver,
264 )
265 {
266 match self {
267 Sum::Inl(a) => {
268 sender1.send(true);
269
270 a.forward_to(sender1, receiver1)
271 }
272 Sum::Inr(b) => {
273 sender1.send(false);
274
275 b.forward_to(sender1, receiver1)
276 }
277 }
278 }
279
280 fn forward_from(
281 sender1: OpaqueSender,
282 receiver1: OpaqueReceiver,
283 ) -> Self
284 {
285 if receiver1.recv().unwrap() {
286 Sum::Inl(A::forward_from(sender1, receiver1))
287 } else {
288 Sum::Inr(B::forward_from(sender1, receiver1))
289 }
290 }
291}
292
293impl ForwardChannel for Bottom
294{
295 fn forward_to(
296 self,
297 _: OpaqueSender,
298 _: OpaqueReceiver,
299 )
300 {
301 match self {}
302 }
303
304 fn forward_from(
305 _: OpaqueSender,
306 receiver1: OpaqueReceiver,
307 ) -> Self
308 {
309 receiver1.recv().unwrap()
310 }
311}