1extern crate either;
2
3use std::boxed::Box;
4use std::error::Error;
5use std::marker;
6use std::mem;
7use std::sync::mpsc::{Sender, Receiver};
8use std::sync::mpsc;
9use either::Either;
10
11#[derive(Debug)]
14pub struct End;
15
16#[must_use]
17#[derive(Debug)]
18pub struct Send<T, S: Session> {
19 channel: Sender<(T, S::Dual)>,
20}
21
22#[must_use]
23#[derive(Debug)]
24pub struct Recv<T, S: Session> {
25 channel: Receiver<(T, S)>,
26}
27
28pub trait Session: marker::Sized + marker::Send {
29 type Dual: Session<Dual=Self>;
30
31 #[doc(hidden)]
32 fn new() -> (Self, Self::Dual);
33}
34
35impl Session for End {
36 type Dual = End;
37
38 fn new() -> (Self, Self::Dual) {
39 return (End, End);
40 }
41}
42
43impl<T: marker::Send, S: Session> Session for Send<T, S> {
44 type Dual = Recv<T, S::Dual>;
45
46 fn new() -> (Self, Self::Dual) {
47 let (sender, receiver) = mpsc::channel::<(T, S::Dual)>();
48 return (Send { channel: sender }, Recv { channel: receiver });
49 }
50}
51
52impl<T: marker::Send, S: Session> Session for Recv<T, S> {
53 type Dual = Send<T, S::Dual>;
54
55 fn new() -> (Self, Self::Dual) {
56 let (there, here) = Self::Dual::new();
57 return (here, there);
58 }
59}
60
61
62pub fn send<'a, T: marker::Send + 'a, S: Session + 'a>(x: T, s: Send<T, S>)
65 -> Result<S, Box<Error + 'a>> {
66 let (here, there) = S::new();
67 s.channel.send((x, there))?;
68 Ok(here)
69}
70
71pub fn recv<'a, T: marker::Send, S: Session>(s: Recv<T, S>)
72 -> Result<(T, S), Box<Error + 'a>> {
73 let (v, s) = s.channel.recv()?;
74 Ok((v, s))
75}
76
77pub fn close(s: End) -> Result<(), Box<Error>> {
78 let End = s;
79 Ok(())
80}
81
82pub fn cancel<T>(x: T) -> Result<(), Box<Error>> {
83 mem::drop(x);
84 Ok(())
85}
86
87#[macro_export]
88macro_rules! fork_with_thread_id {
89
90 ($fn_name:ident) => {{
92 let (there, here) = $crate::Session::new();
93 let other_thread = ::std::thread::spawn(move || {
94 let r = $fn_name(there);
95 match r {
96 Ok(_) => (),
97 Err(e) => panic!("{}", e.description()),
98 }
99 });
100 (other_thread, here)
101 }};
102
103 (move | $session_name:ident : $session_type:ty | $forked_process:block ) => {{
105 let ($session_name, here) = <$session_type as $crate::Session>::new();
106 let other_thread = ::std::thread::spawn(move || {
107 let r = (move || -> Result<_, Box<Error>> {
108 $forked_process
109 })();
110 match r {
111 Ok(_) => (),
112 Err(e) => panic!("{}", e.description()),
113 }
114 });
115 (other_thread, here)
116 }};
117}
118
119
120#[macro_export]
121macro_rules! fork {
122
123 ($fn_name:ident) => {
125 fork_with_thread_id!($fn_name).1
126 };
127
128 (move | $session_name:ident : $session_type:ty | $forked_process:block ) => {
130 fork_with_thread_id!(move | $session_name : $session_type | $forked_process ).1
131 };
132}
133
134pub type Offer<S1, S2> = Recv<Either<S1, S2>, End>;
135pub type Select<S1, S2> = Send<Either<<S1 as Session>::Dual, <S2 as Session>::Dual>, End>;
136
137pub fn offer_either<'a, S1: Session, S2: Session, F, G, R>(s: Offer<S1, S2>, f: F, g: G)
138 -> Result<R, Box<Error + 'a>>
139where
140 F: FnOnce(S1) -> Result<R, Box<Error + 'a>>,
141 G: FnOnce(S2) -> Result<R, Box<Error + 'a>>,
142{
143 let (e, End) = recv(s)?;
144 e.either(f, g)
145}
146
147pub fn select_left<'a, S1: Session + 'a, S2: Session + 'a>(s: Select<S1, S2>)
148 -> Result<S1, Box<Error + 'a>> {
149 let (here, there) = S1::new();
150 let End = send(Either::Left(there), s)?;
151 Ok(here)
152}
153
154pub fn select_right<'a, S1: Session + 'a, S2: Session + 'a>(s: Select<S1, S2>)
155 -> Result<S2, Box<Error + 'a>> {
156 let (here, there) = S2::new();
157 let End = send(Either::Right(there), s)?;
158 Ok(here)
159}
160
161#[macro_export]
162macro_rules! offer {
163 ($session:expr, { $($pat:pat => $result:expr,)* }) => {
164 (move || -> Result<_, _> {
165 let (l, End) = recv($session)?;
166 match l {
167 $(
168 $pat => $result,
169 )*
170 }
171 })()
172 };
173}
174
175#[macro_export]
176macro_rules! select {
177 ($label:path, $session:expr) => {
178 (move || -> Result<_, Box<Error>> {
179 let (here, there) = <_ as Session>::new();
180 let End = send($label(there), $session)?;
181 Ok(here)
182 })()
183 };
184}
185
186#[cfg(test)]
187mod tests {
188 extern crate rand;
189
190 use std::marker;
191 use super::*;
192 use self::rand::{Rng, thread_rng};
193
194 #[test]
197 fn ping_works() {
198 assert!(|| -> Result<(), Box<Error>> {
199
200 let s = fork!(move |s: Send<(), End>| {
201 let s = send((), s)?;
202 close(s)
203 });
204 let ((), s) = recv(s)?;
205 close(s)
206
207 }().is_ok());
208 }
209
210 type NegServer<N> = Recv<N, Send<N, End>>;
213 type NegClient<N> = <NegServer<N> as Session>::Dual;
214
215 type AddServer<N> = Recv<N, Recv<N, Send<N, End>>>;
216 type AddClient<N> = <AddServer<N> as Session>::Dual;
217
218 type SimpleCalcServer<N> = Offer<NegServer<N>, AddServer<N>>;
219 type SimpleCalcClient<N> = <SimpleCalcServer<N> as Session>::Dual;
220
221 fn simple_calc_server(s: SimpleCalcServer<i32>) -> Result<(), Box<Error>> {
222 offer_either(s,
223 |s: NegServer<i32>| {
224 let (x, s) = recv(s)?;
225 let s = send(-x, s)?;
226 close(s)
227 },
228 |s: AddServer<i32>| {
229 let (x, s) = recv(s)?;
230 let (y, s) = recv(s)?;
231 let s = send(x.wrapping_add(y), s)?;
232 close(s)
233 })
234 }
235
236 #[test]
237 fn simple_calc_works() {
238 assert!(|| -> Result<(), Box<Error>> {
239
240 let mut rng = thread_rng();
241
242 {
244 let s: SimpleCalcClient<i32> = fork!(simple_calc_server);
245 let x: i32 = rng.gen();
246 let s = select_left::<_, AddClient<i32>>(s)?;
247 let s = send(x, s)?;
248 let (y, End) = recv(s)?;
249 assert_eq!(-x, y);
250 }
251
252 {
254 let s: SimpleCalcClient<i32> = fork!(simple_calc_server);
255 let x: i32 = rng.gen();
256 let y: i32 = rng.gen();
257 let s = select_right::<NegClient<i32>, _>(s)?;
258 let s = send(x, s)?;
259 let s = send(y, s)?;
260 let (z, End) = recv(s)?;
261 assert_eq!(x.wrapping_add(y), z);
262 }
263
264 Ok(())
265
266 }().is_ok());
267 }
268
269 enum CalcOp<N: marker::Send> {
272 Neg(NegServer<N>),
273 Add(AddServer<N>),
274 }
275 type NiceCalcServer<N> = Recv<CalcOp<N>, End>;
276 type NiceCalcClient<N> = <NiceCalcServer<N> as Session>::Dual;
277
278 fn nice_calc_server(s: NiceCalcServer<i32>) -> Result<(), Box<Error>> {
279 offer!(s, {
280 CalcOp::Neg(s) => {
281 let (x, s) = recv(s)?;
282 let s = send(-x, s)?;
283 close(s)
284 },
285 CalcOp::Add(s) => {
286 let (x, s) = recv(s)?;
287 let (y, s) = recv(s)?;
288 let s = send(x.wrapping_add(y), s)?;
289 close(s)
290 },
291 })
292 }
293
294 #[test]
295 fn nice_calc_works() {
296 assert!(|| -> Result<(), Box<Error>> {
297
298 let mut rng = thread_rng();
300
301 {
303 let s: NiceCalcClient<i32> = fork!(nice_calc_server);
304 let x: i32 = rng.gen();
305 let s = select!(CalcOp::Neg, s)?;
306 let s = send(x, s)?;
307 let (y, s) = recv(s)?;
308 close(s)?;
309 assert_eq!(-x, y);
310 }
311
312 {
314 let s: NiceCalcClient<i32> = fork!(nice_calc_server);
315 let x: i32 = rng.gen();
316 let y: i32 = rng.gen();
317 let s = select!(CalcOp::Add, s)?;
318 let s = send(x, s)?;
319 let s = send(y, s)?;
320 let (z, s) = recv(s)?;
321 close(s)?;
322 assert_eq!(x.wrapping_add(y), z);
323 }
324
325 Ok(())
326
327 }().is_ok());
328 }
329
330 #[test]
331 fn cancel_send_works() {
332
333 let (other_thread, s) = fork_with_thread_id!(nice_calc_server);
334
335 assert!(|| -> Result<(), Box<Error>> {
336
337 cancel(s)?;
338 Ok(())
339
340 }().is_ok());
341
342 assert!(other_thread.join().is_err());
343 }
344
345 #[test]
346 fn cancel_recv_works() {
347
348 let mut rng = thread_rng();
350 let x: i32 = rng.gen();
351 let y: i32 = rng.gen();
352
353 let (other_thread, s) = fork_with_thread_id!(
354 move |s: NiceCalcServer<i32>| {cancel(s)});
355
356 assert!(|| -> Result<(), Box<Error>> {
357
358 let s = select!(CalcOp::Add, s)?;
359 let s = send(x, s)?;
360 let s = send(y, s)?;
361 let (z, s) = recv(s)?;
362 close(s)?;
363 assert_eq!(x.wrapping_add(y), z);
364 Ok(())
365
366 }().is_err());
367
368 assert!(other_thread.join().is_ok());
369 }
370
371 #[test]
372 fn delegation_works() {
373 let (other_thread1, s) = fork_with_thread_id!(nice_calc_server);
374 let (other_thread2, u) = fork_with_thread_id!(
375 move |u: Recv<NiceCalcClient<i32>, End>| {cancel(u)});
376
377 assert!(|| -> Result<(), Box<Error>> {
378
379 let u = send(s, u)?;
380 close(u)?;
381 Ok(())
382
383 }().is_ok());
384
385 assert!(other_thread1.join().is_err());
386 assert!(other_thread2.join().is_ok());
387 }
388
389 #[test]
390 fn closure_works() {
391 let (other_thread, s) = fork_with_thread_id!(nice_calc_server);
392
393 assert!(|| -> Result<i32, Box<Error>> {
394
395 let f = move |x: i32| -> Result<i32, Box<Error>> {
397 let s = select!(CalcOp::Neg, s)?;
398 let s = send(x, s)?;
399 let (y, s) = recv(s)?;
400 close(s)?;
401 Ok(y)
402 };
403
404 Err(Box::new(mpsc::RecvError))?;
406 f(5)
407
408 }().is_err());
409
410 assert!(other_thread.join().is_err());
411 }
412
413 enum SumOp<N: marker::Send> {
414 More(Recv<N, NiceSumServer<N>>),
415 Done(Send<N, End>),
416 }
417 type NiceSumServer<N> = Recv<SumOp<N>, End>;
418 type NiceSumClient<N> = <NiceSumServer<N> as Session>::Dual;
419
420 fn nice_sum_server(s: NiceSumServer<i32>) -> Result<(), Box<Error>> {
421 nice_sum_server_accum(s, 0)
422 }
423
424 fn nice_sum_server_accum(s: NiceSumServer<i32>, x: i32) -> Result<(), Box<Error>> {
425 offer!(s, {
426 SumOp::More(s) => {
427 let (y, s) = recv(s)?;
428 nice_sum_server_accum(s, x.wrapping_add(y))
429 },
430 SumOp::Done(s) => {
431 let s = send(x, s)?;
432 close(s)
433 },
434 })
435 }
436
437 fn nice_sum_client_accum(s: NiceSumClient<i32>, mut xs: Vec<i32>)
438 -> Result<i32, Box<Error>> {
439 match xs.pop() {
440 Option::Some(x) => {
441 let s = select!(SumOp::More, s)?;
442 let s = send(x, s)?;
443 nice_sum_client_accum(s, xs)
444 },
445 Option::None => {
446 let s = select!(SumOp::Done, s)?;
447 let (sum, s) = recv(s)?;
448 close(s)?;
449 Ok(sum)
450 },
451 }
452 }
453
454 #[test]
455 fn recursion_works() {
456
457 let mut rng = thread_rng();
459 let xs: Vec<i32> = (1..100).map(|_| rng.gen()).collect();
460 let sum1: i32 = xs.iter().fold(0, |sum, &x| sum.wrapping_add(x));
461
462 let (other_thread, s) = fork_with_thread_id!(nice_sum_server);
463
464 assert!(|| -> Result<(), Box<Error>> {
465
466 let sum2 = nice_sum_client_accum(s, xs)?;
467 assert_eq!(sum1, sum2);
468 Ok(())
469
470 }().is_ok());
471
472 assert!(other_thread.join().is_ok());
473 }
474
475 #[allow(dead_code)]
476 fn deadlock_loop() {
477
478 let s = fork!(move |s: Send<(), End>| {
479 loop {
480 if false { break; }
482 }
483 let End = send((), s)?;
484 Ok(())
485 });
486
487 || -> Result<(), Box<Error>> {
488 let ((), End) = recv(s)?;
489 Ok(())
490 }().unwrap();
491 }
492
493 #[allow(dead_code)]
494 fn deadlock_forget() {
495
496 let s = fork!(move |s: Send<(), End>| {
497 mem::forget(s);
498 Ok(())
499 });
500
501 || -> Result<(), Box<Error>> {
502 let ((), End) = recv(s)?;
503 Ok(())
504 }().unwrap();
505 }
506
507 #[allow(dead_code)]
508 fn deadlock_new() {
509
510 let (s1, r1) = <Send<(), End>>::new();
511 let r2 = fork!(move |s2: Send<(), End>| {
512 let (x, End) = recv(r1)?;
513 let End = send(x, s2)?;
514 Ok(())
515 });
516
517 || -> Result<(), Box<Error>> {
518 let (x, End) = recv(r2)?;
519 let End = send(x, s1)?;
520 Ok(())
521 }().unwrap();
522 }
523}
524
525