ferrite_session/internal/base/channel/
impls.rs

1use serde::{
2  self,
3  Deserialize,
4  Serialize,
5};
6use tokio::task;
7
8use super::{
9  functions::*,
10  traits::ForwardChannel,
11  types::*,
12};
13use crate::internal::functional::*;
14
15impl<T> Clone for Sender<T>
16{
17  fn clone(&self) -> Sender<T>
18  {
19    Sender(self.0.clone())
20  }
21}
22
23impl<T> Clone for Receiver<T>
24{
25  fn clone(&self) -> Receiver<T>
26  {
27    Receiver(self.0.clone())
28  }
29}
30
31impl<T> Sender<T>
32{
33  pub fn send(
34    &self,
35    msg: T,
36  ) -> Result<(), SendError>
37  {
38    self
39      .0
40      .send(msg)
41      .map_err(|_| SendError(String::from("failed to send")))
42  }
43}
44
45impl ForwardChannel for ()
46{
47  fn forward_to(
48    self,
49    _: OpaqueSender,
50    _: OpaqueReceiver,
51  )
52  {
53  }
54
55  fn forward_from(
56    _: OpaqueSender,
57    _: OpaqueReceiver,
58  ) -> Self
59  {
60  }
61}
62
63impl<T> ForwardChannel for SenderOnce<T>
64where
65  T: ForwardChannel,
66{
67  fn forward_to(
68    self,
69    sender: OpaqueSender,
70    receiver: OpaqueReceiver,
71  )
72  {
73    task::spawn_blocking(move || {
74      receiver.recv::<()>().unwrap();
75
76      let payload = T::forward_from(sender, receiver);
77
78      self.send(payload).unwrap();
79    });
80  }
81
82  fn forward_from(
83    sender1: OpaqueSender,
84    receiver1: OpaqueReceiver,
85  ) -> Self
86  {
87    let (sender2, receiver2) = once_channel();
88
89    task::spawn(async move {
90      let payload: T = receiver2.recv().await.unwrap();
91
92      task::spawn_blocking(move || {
93        sender1.send(());
94
95        payload.forward_to(sender1, receiver1);
96      });
97    });
98
99    sender2
100  }
101}
102
103impl<T> ForwardChannel for ReceiverOnce<T>
104where
105  T: ForwardChannel,
106{
107  fn forward_to(
108    self,
109    sender1: OpaqueSender,
110    receiver1: OpaqueReceiver,
111  )
112  {
113    task::spawn(async move {
114      let channel = self.recv().await.unwrap();
115
116      task::spawn_blocking(move || {
117        sender1.send(());
118
119        channel.forward_to(sender1, receiver1);
120      });
121    });
122  }
123
124  fn forward_from(
125    sender1: OpaqueSender,
126    receiver1: OpaqueReceiver,
127  ) -> Self
128  {
129    let (sender2, receiver2) = once_channel();
130
131    task::spawn_blocking(move || {
132      receiver1.recv::<()>().unwrap();
133
134      let channel = T::forward_from(sender1, receiver1);
135
136      sender2.send(channel).unwrap();
137    });
138
139    receiver2
140  }
141}
142
143impl<T> ForwardChannel for Value<T>
144where
145  T: Send + 'static,
146  T: Serialize + for<'de> Deserialize<'de>,
147{
148  fn forward_to(
149    self,
150    sender1: OpaqueSender,
151    _receiver1: OpaqueReceiver,
152  )
153  {
154    let Value(payload) = self;
155
156    sender1.send(payload);
157  }
158
159  fn forward_from(
160    _sender1: OpaqueSender,
161    receiver1: OpaqueReceiver,
162  ) -> Self
163  {
164    let payload = receiver1.recv().unwrap();
165
166    Value(payload)
167  }
168}
169
170impl<T, C> ForwardChannel for (Value<T>, C)
171where
172  T: Send + 'static,
173  T: Serialize + for<'de> Deserialize<'de>,
174  C: ForwardChannel,
175{
176  fn forward_to(
177    self,
178    sender1: OpaqueSender,
179    receiver1: OpaqueReceiver,
180  )
181  {
182    let (Value(payload), channel) = self;
183
184    task::spawn_blocking(move || {
185      sender1.send(payload);
186
187      channel.forward_to(sender1, receiver1)
188    });
189  }
190
191  fn forward_from(
192    sender1: OpaqueSender,
193    receiver1: OpaqueReceiver,
194  ) -> Self
195  {
196    let payload = receiver1.recv().unwrap();
197
198    let channel = C::forward_from(sender1, receiver1);
199
200    (Value(payload), channel)
201  }
202}
203
204impl<F, X, T> ForwardChannel for App<'static, F, X>
205where
206  X: 'static,
207  F: 'static,
208  F: TypeApp<'static, X, Applied = T>,
209  T: ForwardChannel,
210{
211  fn forward_to(
212    self,
213    sender: OpaqueSender,
214    receiver: OpaqueReceiver,
215  )
216  {
217    self.get_applied().forward_to(sender, receiver)
218  }
219
220  fn forward_from(
221    sender: OpaqueSender,
222    receiver: OpaqueReceiver,
223  ) -> Self
224  {
225    App::new(T::forward_from(sender, receiver))
226  }
227}
228
229impl<Row, F, T> ForwardChannel for AppSum<'static, Row, F>
230where
231  F: TyCon,
232  F: Send + 'static,
233  Row: 'static,
234  Row: SumApp<'static, F, Applied = T>,
235  T: ForwardChannel,
236{
237  fn forward_to(
238    self,
239    sender: OpaqueSender,
240    receiver: OpaqueReceiver,
241  )
242  {
243    self.get_sum().forward_to(sender, receiver)
244  }
245
246  fn forward_from(
247    sender: OpaqueSender,
248    receiver: OpaqueReceiver,
249  ) -> Self
250  {
251    AppSum::new(T::forward_from(sender, receiver))
252  }
253}
254
255impl<A, B> ForwardChannel for Sum<A, B>
256where
257  A: ForwardChannel,
258  B: ForwardChannel,
259{
260  fn forward_to(
261    self,
262    sender1: OpaqueSender,
263    receiver1: OpaqueReceiver,
264  )
265  {
266    match self {
267      Sum::Inl(a) => {
268        sender1.send(true);
269
270        a.forward_to(sender1, receiver1)
271      }
272      Sum::Inr(b) => {
273        sender1.send(false);
274
275        b.forward_to(sender1, receiver1)
276      }
277    }
278  }
279
280  fn forward_from(
281    sender1: OpaqueSender,
282    receiver1: OpaqueReceiver,
283  ) -> Self
284  {
285    if receiver1.recv().unwrap() {
286      Sum::Inl(A::forward_from(sender1, receiver1))
287    } else {
288      Sum::Inr(B::forward_from(sender1, receiver1))
289    }
290  }
291}
292
293impl ForwardChannel for Bottom
294{
295  fn forward_to(
296    self,
297    _: OpaqueSender,
298    _: OpaqueReceiver,
299  )
300  {
301    match self {}
302  }
303
304  fn forward_from(
305    _: OpaqueSender,
306    receiver1: OpaqueReceiver,
307  ) -> Self
308  {
309    receiver1.recv().unwrap()
310  }
311}