rusty_variation/
lib.rs

1extern crate either;
2
3use std::boxed::Box;
4use std::error::Error;
5use std::marker;
6use std::mem;
7use std::sync::mpsc::{Sender, Receiver};
8use std::sync::mpsc;
9use either::Either;
10
11/// The session types supported.
12
13#[derive(Debug)]
14pub struct End;
15
16#[must_use]
17#[derive(Debug)]
18pub struct Send<T, S: Session> {
19    channel: Sender<(T, S::Dual)>,
20}
21
22#[must_use]
23#[derive(Debug)]
24pub struct Recv<T, S: Session> {
25    channel: Receiver<(T, S)>,
26}
27
28pub trait Session: marker::Sized + marker::Send {
29    type Dual: Session<Dual=Self>;
30
31    #[doc(hidden)]
32    fn new() -> (Self, Self::Dual);
33}
34
35impl Session for End {
36    type Dual = End;
37
38    fn new() -> (Self, Self::Dual) {
39        return (End, End);
40    }
41}
42
43impl<T: marker::Send, S: Session> Session for Send<T, S> {
44    type Dual = Recv<T, S::Dual>;
45
46    fn new() -> (Self, Self::Dual) {
47        let (sender, receiver) = mpsc::channel::<(T, S::Dual)>();
48        return (Send { channel: sender }, Recv { channel: receiver });
49    }
50}
51
52impl<T: marker::Send, S: Session> Session for Recv<T, S> {
53    type Dual = Send<T, S::Dual>;
54
55    fn new() -> (Self, Self::Dual) {
56        let (there, here) = Self::Dual::new();
57        return (here, there);
58    }
59}
60
61
62/// The communication primitives.
63
64pub fn send<'a, T: marker::Send + 'a, S: Session + 'a>(x: T, s: Send<T, S>)
65                                                       -> Result<S, Box<Error + 'a>> {
66    let (here, there) = S::new();
67    s.channel.send((x, there))?;
68    Ok(here)
69}
70
71pub fn recv<'a, T: marker::Send, S: Session>(s: Recv<T, S>)
72                                                -> Result<(T, S), Box<Error + 'a>> {
73    let (v, s) = s.channel.recv()?;
74    Ok((v, s))
75}
76
77pub fn close(s: End) -> Result<(), Box<Error>> {
78    let End = s;
79    Ok(())
80}
81
82pub fn cancel<T>(x: T) -> Result<(), Box<Error>> {
83    mem::drop(x);
84    Ok(())
85}
86
87#[macro_export]
88macro_rules! fork_with_thread_id {
89
90    // Syntax `fork_with_thread_id!(nice_calc_server)`
91    ($fn_name:ident) => {{
92        let (there, here) = $crate::Session::new();
93        let other_thread = ::std::thread::spawn(move || {
94            let r = $fn_name(there);
95            match r {
96                Ok(_) => (),
97                Err(e) => panic!("{}", e.description()),
98            }
99        });
100        (other_thread, here)
101    }};
102
103    // Syntax `fork_with_thread_id!(move |s: NiceCalcServer<i32>| { ... })`
104    (move | $session_name:ident : $session_type:ty | $forked_process:block ) => {{
105        let ($session_name, here) = <$session_type as $crate::Session>::new();
106        let other_thread = ::std::thread::spawn(move || {
107            let r = (move || -> Result<_, Box<Error>> {
108                $forked_process
109            })();
110            match r {
111                Ok(_) => (),
112                Err(e) => panic!("{}", e.description()),
113            }
114        });
115        (other_thread, here)
116    }};
117}
118
119
120#[macro_export]
121macro_rules! fork {
122
123    // Syntax `fork!(nice_calc_server)`
124    ($fn_name:ident) => {
125        fork_with_thread_id!($fn_name).1
126    };
127
128    // Syntax `fork!(move |s: NiceCalcServer<i32>| { ... })`
129    (move | $session_name:ident : $session_type:ty | $forked_process:block ) => {
130        fork_with_thread_id!(move | $session_name : $session_type | $forked_process ).1
131    };
132}
133
134pub type Offer<S1, S2> = Recv<Either<S1, S2>, End>;
135pub type Select<S1, S2> = Send<Either<<S1 as Session>::Dual, <S2 as Session>::Dual>, End>;
136
137pub fn offer_either<'a, S1: Session, S2: Session, F, G, R>(s: Offer<S1, S2>, f: F, g: G)
138                                                           -> Result<R, Box<Error + 'a>>
139where
140    F: FnOnce(S1) -> Result<R, Box<Error + 'a>>,
141    G: FnOnce(S2) -> Result<R, Box<Error + 'a>>,
142{
143    let (e, End) = recv(s)?;
144    e.either(f, g)
145}
146
147pub fn select_left<'a, S1: Session + 'a, S2: Session + 'a>(s: Select<S1, S2>)
148                                                           -> Result<S1, Box<Error + 'a>> {
149    let (here, there) = S1::new();
150    let End = send(Either::Left(there), s)?;
151    Ok(here)
152}
153
154pub fn select_right<'a, S1: Session + 'a, S2: Session + 'a>(s: Select<S1, S2>)
155                                                            -> Result<S2, Box<Error + 'a>> {
156    let (here, there) = S2::new();
157    let End = send(Either::Right(there), s)?;
158    Ok(here)
159}
160
161#[macro_export]
162macro_rules! offer {
163    ($session:expr, { $($pat:pat => $result:expr,)* }) => {
164        (move || -> Result<_, _> {
165            let (l, End) = recv($session)?;
166            match l {
167                $(
168                    $pat => $result,
169                )*
170            }
171        })()
172    };
173}
174
175#[macro_export]
176macro_rules! select {
177    ($label:path, $session:expr) => {
178        (move || -> Result<_, Box<Error>> {
179            let (here, there) = <_ as Session>::new();
180            let End = send($label(there), $session)?;
181            Ok(here)
182        })()
183    };
184}
185
186#[cfg(test)]
187mod tests {
188    extern crate rand;
189
190    use std::marker;
191    use super::*;
192    use self::rand::{Rng, thread_rng};
193
194    // Test sending a ping across threads.
195
196    #[test]
197    fn ping_works() {
198        assert!(|| -> Result<(), Box<Error>> {
199
200            let s = fork!(move |s: Send<(), End>| {
201                let s = send((), s)?;
202                close(s)
203            });
204            let ((), s) = recv(s)?;
205            close(s)
206
207        }().is_ok());
208    }
209
210    // Test a simple calculator server, implemented using binary choice.
211
212    type NegServer<N> = Recv<N, Send<N, End>>;
213    type NegClient<N> = <NegServer<N> as Session>::Dual;
214
215    type AddServer<N> = Recv<N, Recv<N, Send<N, End>>>;
216    type AddClient<N> = <AddServer<N> as Session>::Dual;
217
218    type SimpleCalcServer<N> = Offer<NegServer<N>, AddServer<N>>;
219    type SimpleCalcClient<N> = <SimpleCalcServer<N> as Session>::Dual;
220
221    fn simple_calc_server(s: SimpleCalcServer<i32>) -> Result<(), Box<Error>> {
222        offer_either(s,
223                     |s: NegServer<i32>| {
224                         let (x, s) = recv(s)?;
225                         let s = send(-x, s)?;
226                         close(s)
227                     },
228                     |s: AddServer<i32>| {
229                         let (x, s) = recv(s)?;
230                         let (y, s) = recv(s)?;
231                         let s = send(x.wrapping_add(y), s)?;
232                         close(s)
233                     })
234    }
235
236    #[test]
237    fn simple_calc_works() {
238        assert!(|| -> Result<(), Box<Error>> {
239
240            let mut rng = thread_rng();
241
242            // Test the negation function.
243            {
244                let s: SimpleCalcClient<i32> = fork!(simple_calc_server);
245                let x: i32 = rng.gen();
246                let s = select_left::<_, AddClient<i32>>(s)?;
247                let s = send(x, s)?;
248                let (y, End) = recv(s)?;
249                assert_eq!(-x, y);
250            }
251
252            // Test the addition function.
253            {
254                let s: SimpleCalcClient<i32> = fork!(simple_calc_server);
255                let x: i32 = rng.gen();
256                let y: i32 = rng.gen();
257                let s = select_right::<NegClient<i32>, _>(s)?;
258                let s = send(x, s)?;
259                let s = send(y, s)?;
260                let (z, End) = recv(s)?;
261                assert_eq!(x.wrapping_add(y), z);
262            }
263
264            Ok(())
265
266        }().is_ok());
267    }
268
269    // Test a nice calculator server, implemented using variant types.
270
271    enum CalcOp<N: marker::Send> {
272        Neg(NegServer<N>),
273        Add(AddServer<N>),
274    }
275    type NiceCalcServer<N> = Recv<CalcOp<N>, End>;
276    type NiceCalcClient<N> = <NiceCalcServer<N> as Session>::Dual;
277
278    fn nice_calc_server(s: NiceCalcServer<i32>) -> Result<(), Box<Error>> {
279        offer!(s, {
280            CalcOp::Neg(s) => {
281                let (x, s) = recv(s)?;
282                let s = send(-x, s)?;
283                close(s)
284            },
285            CalcOp::Add(s) => {
286                let (x, s) = recv(s)?;
287                let (y, s) = recv(s)?;
288                let s = send(x.wrapping_add(y), s)?;
289                close(s)
290            },
291        })
292    }
293
294    #[test]
295    fn nice_calc_works() {
296        assert!(|| -> Result<(), Box<Error>> {
297
298            // Pick some random numbers.
299            let mut rng = thread_rng();
300
301            // Test the negation function.
302            {
303                let s: NiceCalcClient<i32> = fork!(nice_calc_server);
304                let x: i32 = rng.gen();
305                let s = select!(CalcOp::Neg, s)?;
306                let s = send(x, s)?;
307                let (y, s) = recv(s)?;
308                close(s)?;
309                assert_eq!(-x, y);
310            }
311
312            // Test the addition function.
313            {
314                let s: NiceCalcClient<i32> = fork!(nice_calc_server);
315                let x: i32 = rng.gen();
316                let y: i32 = rng.gen();
317                let s = select!(CalcOp::Add, s)?;
318                let s = send(x, s)?;
319                let s = send(y, s)?;
320                let (z, s) = recv(s)?;
321                close(s)?;
322                assert_eq!(x.wrapping_add(y), z);
323            }
324
325            Ok(())
326
327        }().is_ok());
328    }
329
330    #[test]
331    fn cancel_send_works() {
332
333        let (other_thread, s) = fork_with_thread_id!(nice_calc_server);
334
335        assert!(|| -> Result<(), Box<Error>> {
336
337            cancel(s)?;
338            Ok(())
339
340        }().is_ok());
341
342        assert!(other_thread.join().is_err());
343    }
344
345    #[test]
346    fn cancel_recv_works() {
347
348        // Pick some random numbers.
349        let mut rng = thread_rng();
350        let x: i32 = rng.gen();
351        let y: i32 = rng.gen();
352
353        let (other_thread, s) = fork_with_thread_id!(
354            move |s: NiceCalcServer<i32>| {cancel(s)});
355
356        assert!(|| -> Result<(), Box<Error>> {
357
358            let s = select!(CalcOp::Add, s)?;
359            let s = send(x, s)?;
360            let s = send(y, s)?;
361            let (z, s) = recv(s)?;
362            close(s)?;
363            assert_eq!(x.wrapping_add(y), z);
364            Ok(())
365
366        }().is_err());
367
368        assert!(other_thread.join().is_ok());
369    }
370
371    #[test]
372    fn delegation_works() {
373        let (other_thread1, s) = fork_with_thread_id!(nice_calc_server);
374        let (other_thread2, u) = fork_with_thread_id!(
375            move |u: Recv<NiceCalcClient<i32>, End>| {cancel(u)});
376
377        assert!(|| -> Result<(), Box<Error>> {
378
379            let u = send(s, u)?;
380            close(u)?;
381            Ok(())
382
383        }().is_ok());
384
385        assert!(other_thread1.join().is_err());
386        assert!(other_thread2.join().is_ok());
387    }
388
389    #[test]
390    fn closure_works() {
391        let (other_thread, s) = fork_with_thread_id!(nice_calc_server);
392
393        assert!(|| -> Result<i32, Box<Error>> {
394
395            // Create a closure which uses the session.
396            let f = move |x: i32| -> Result<i32, Box<Error>> {
397                let s = select!(CalcOp::Neg, s)?;
398                let s = send(x, s)?;
399                let (y, s) = recv(s)?;
400                close(s)?;
401                Ok(y)
402            };
403
404            // Let the closure go out of scope.
405            Err(Box::new(mpsc::RecvError))?;
406            f(5)
407
408        }().is_err());
409
410        assert!(other_thread.join().is_err());
411    }
412
413    enum SumOp<N: marker::Send> {
414        More(Recv<N, NiceSumServer<N>>),
415        Done(Send<N, End>),
416    }
417    type NiceSumServer<N> = Recv<SumOp<N>, End>;
418    type NiceSumClient<N> = <NiceSumServer<N> as Session>::Dual;
419
420    fn nice_sum_server(s: NiceSumServer<i32>) -> Result<(), Box<Error>> {
421        nice_sum_server_accum(s, 0)
422    }
423
424    fn nice_sum_server_accum(s: NiceSumServer<i32>, x: i32) -> Result<(), Box<Error>> {
425        offer!(s, {
426            SumOp::More(s) => {
427                let (y, s) = recv(s)?;
428                nice_sum_server_accum(s, x.wrapping_add(y))
429            },
430            SumOp::Done(s) => {
431                let s = send(x, s)?;
432                close(s)
433            },
434        })
435    }
436
437    fn nice_sum_client_accum(s: NiceSumClient<i32>, mut xs: Vec<i32>)
438                             -> Result<i32, Box<Error>> {
439        match xs.pop() {
440            Option::Some(x) => {
441                let s = select!(SumOp::More, s)?;
442                let s = send(x, s)?;
443                nice_sum_client_accum(s, xs)
444            },
445            Option::None => {
446                let s = select!(SumOp::Done, s)?;
447                let (sum, s) = recv(s)?;
448                close(s)?;
449                Ok(sum)
450            },
451        }
452    }
453
454    #[test]
455    fn recursion_works() {
456
457        // Pick some random numbers.
458        let mut rng = thread_rng();
459        let xs: Vec<i32> = (1..100).map(|_| rng.gen()).collect();
460        let sum1: i32 = xs.iter().fold(0, |sum, &x| sum.wrapping_add(x));
461
462        let (other_thread, s) = fork_with_thread_id!(nice_sum_server);
463
464        assert!(|| -> Result<(), Box<Error>> {
465
466            let sum2 = nice_sum_client_accum(s, xs)?;
467            assert_eq!(sum1, sum2);
468            Ok(())
469
470        }().is_ok());
471
472        assert!(other_thread.join().is_ok());
473    }
474
475    #[allow(dead_code)]
476    fn deadlock_loop() {
477
478        let s = fork!(move |s: Send<(), End>| {
479            loop {
480                // Let's trick the reachability checker
481                if false { break; }
482            }
483            let End = send((), s)?;
484            Ok(())
485        });
486
487        || -> Result<(), Box<Error>> {
488            let ((), End) = recv(s)?;
489            Ok(())
490        }().unwrap();
491    }
492
493    #[allow(dead_code)]
494    fn deadlock_forget() {
495
496        let s = fork!(move |s: Send<(), End>| {
497            mem::forget(s);
498            Ok(())
499        });
500
501        || -> Result<(), Box<Error>> {
502            let ((), End) = recv(s)?;
503            Ok(())
504        }().unwrap();
505    }
506
507    #[allow(dead_code)]
508    fn deadlock_new() {
509
510        let (s1, r1) = <Send<(), End>>::new();
511        let r2 = fork!(move |s2: Send<(), End>| {
512            let (x, End) = recv(r1)?;
513            let End = send(x, s2)?;
514            Ok(())
515        });
516
517        || -> Result<(), Box<Error>> {
518            let (x, End) = recv(r2)?;
519            let End = send(x, s1)?;
520            Ok(())
521        }().unwrap();
522    }
523}
524
525// */
526// */
527// */
528// */
529// */