loop_three/
loop_three.rs

1use anyhow::Result;
2use party_types::{
3    pair, prot, rec,
4    transports::Crossbeam,
5    Choice,
6    ChoiceResult::{One, Two},
7    End, Rx, Tx,
8};
9use std::thread::spawn;
10
11prot!(prot_q, Tx<A, u32, Rx<A, u32, Tx<A, String, End>>>);
12
13rec!(
14    RecA,
15    Choice<Q, u32, Tx<B, u32, Choice<C, u32, Tx<Q, u32, RecA>, B, u32, Tx<Q, u32, RecA>>>, Q, String, End>
16);
17prot!(prot_a, RecA);
18
19rec!(RecB, Rx<A, u32, (Tx<C, u32, RecB>, Tx<A, u32, RecB>)>);
20prot!(prot_b, RecB);
21
22rec!(RecC, Rx<B, u32, Tx<A, u32, RecC>>);
23prot!(prot_c, RecC);
24
25struct Q;
26struct A;
27struct B;
28struct C;
29
30fn main() -> Result<()> {
31    let (mut ch_qa, mut ch_aq) = pair::<Q, A, _>(&Crossbeam);
32    let (mut ch_ab, mut ch_ba) = pair::<A, B, _>(&Crossbeam);
33    let (mut ch_ac, mut ch_ca) = pair::<A, C, _>(&Crossbeam);
34    let (mut ch_bc, mut ch_cb) = pair::<B, C, _>(&Crossbeam);
35
36    let thread_a = spawn(move || -> Result<End> {
37        let mut prot = prot_a().rec();
38        let mut ch_aq2 = ch_aq.clone();
39        loop {
40            match prot.recv(&mut ch_aq, &mut ch_aq2)? {
41                One(value, cont) => {
42                    let cont = cont.send(&mut ch_ab, value)?;
43                    let (value, cont) = match cont.recv(&mut ch_ac, &mut ch_ab)? {
44                        One(value, cont) => (value, cont),
45                        Two(value, cont) => (value, cont),
46                    };
47                    let cont = cont.send(&mut ch_aq, value)?;
48                    prot = cont.rec();
49                }
50                Two(v, cont) => {
51                    println!("process A got string {}", v);
52                    return Ok(cont);
53                }
54            }
55        }
56    });
57
58    let thread_b = spawn(move || -> Result<End> {
59        let mut prot = prot_b().rec();
60        loop {
61            let (value, p) = prot.recv(&mut ch_ba)?;
62            if value > 100 {
63                let cont = p.0.send(&mut ch_bc, value)?;
64                prot = cont.rec();
65            } else {
66                let cont = p.1.send(&mut ch_ba, value)?;
67                prot = cont.rec();
68            }
69        }
70    });
71
72    let thread_c = spawn(move || -> Result<End> {
73        let mut prot = prot_c().rec();
74        loop {
75            let (value, p) = prot.recv(&mut ch_cb)?;
76            prot = p.send(&mut ch_ca, value)?.rec();
77        }
78    });
79
80    // use the current thread for role Q
81    let prot = prot_q();
82    let prot = prot.send(&mut ch_qa, 1)?;
83    let (value, prot) = prot.recv(&mut ch_qa)?;
84    println!("received {}", value);
85    let _prot: End = prot.send(&mut ch_qa, "stop".to_string())?;
86
87    // all threads end now because A shuts down, killing the channel to B (which then shuts down),
88    // killing the channel to C (which then shuts down)
89    println!("1 {:?}", thread_a.join().unwrap());
90    println!("2 {:?}", thread_b.join().unwrap());
91    println!("3 {:?}", thread_c.join().unwrap());
92
93    Ok(())
94}