Recv

Struct Recv 

Source
pub struct Recv<T, S: Session = ()> { /* private fields */ }
Expand description

Supplies a value of type T, then proceeds according to S. Its dual is Send<T, Dual<S>>.

Use recv to obtain the supplied value along with the continuation S. If the continuation is () (the empty session), use recv1 to obtain T and discard the continuation.

§Correspondence to linear logic

  • Recv<A, B> is A ⊗ B
  • Recv<Result<A, B>> is A ⊕ B

Implementations§

Source§

impl<T, S: Session> Recv<T, S>
where T: Send + 'static,

Source

pub async fn recv(self) -> (T, S)

Waits to obtain a value of type T along with the continuation S.

Examples found in repository?
examples/sequencing.rs (line 17)
15fn start_calculator() -> Calculator {
16    fork(|user: User| async {
17        let (x, user) = user.recv().await;
18        let (op, user) = user.recv().await;
19        let (y, user) = user.recv().await;
20        let result = match op {
21            Op::Plus => x + y,
22            Op::Times => x * y,
23        };
24        user.send1(result);
25    })
26}
More examples
Hide additional examples
examples/recursion.rs (line 19)
13fn start_counting() -> Send<Counting> {
14    fork(|mut numbers: Recv<Counting>| async {
15        let mut total = 0;
16        loop {
17            match numbers.recv1().await {
18                Counting::More(number) => {
19                    let (n, next) = number.recv().await;
20                    total += n;
21                    numbers = next;
22                }
23                Counting::Done(report) => break report.send1(total),
24            }
25        }
26    })
27}
examples/branching.rs (line 56)
54fn boot_atm(accounts: Arc<HashMap<String, Money>>) -> ATM {
55    fork(|client: Client| async move {
56        let (Account(number), client) = client.recv().await;
57        let Some(&Money(funds)) = accounts.get(&number) else {
58            return client.send1(Err(InvalidAccount));
59        };
60        match client.choose(Ok).recv1().await {
61            Operation::CheckBalance(client) => client.send1(Amount(funds)),
62            Operation::Withdraw(client) => {
63                let (Amount(requested), client) = client.recv().await;
64                if funds >= requested {
65                    client.send1(Ok(Money(requested)));
66                } else {
67                    client.send1(Err(InsufficientFunds));
68                }
69            }
70        }
71    })
72}
examples/multiple_participants.rs (line 36)
32fn start_playing() -> Game {
33    use {Move::*, Outcome::*, Winner::*};
34
35    fork(|game: Dual<Game>| async {
36        let ((mut player1, mut player2, mut player3), winner) = game.recv().await;
37
38        loop {
39            let (move1, outcome1) = player1.recv().await;
40            let (move2, outcome2) = player2.recv().await;
41            let (move3, outcome3) = player3.recv().await;
42
43            tokio::time::sleep(Duration::from_secs(1)).await;
44            println!("{:?} {:?} {:?}", move1, move2, move3);
45            tokio::time::sleep(Duration::from_secs(1)).await;
46
47            match (move1, move2, move3) {
48                (Up, Down, Down) | (Down, Up, Up) => {
49                    outcome1.send1(Win);
50                    outcome2.send1(Loss);
51                    outcome3.send1(Loss);
52                    break winner.send1(First);
53                }
54                (Down, Up, Down) | (Up, Down, Up) => {
55                    outcome1.send1(Loss);
56                    outcome2.send1(Win);
57                    outcome3.send1(Loss);
58                    break winner.send1(Second);
59                }
60                (Down, Down, Up) | (Up, Up, Down) => {
61                    outcome1.send1(Loss);
62                    outcome2.send1(Loss);
63                    outcome3.send1(Win);
64                    break winner.send1(Third);
65                }
66                (Up, Up, Up) | (Down, Down, Down) => {
67                    player1 = outcome1.choose(Draw);
68                    player2 = outcome2.choose(Draw);
69                    player3 = outcome3.choose(Draw);
70                    println!("Draw...");
71                }
72            }
73        }
74    })
75}
examples/chat.rs (line 87)
55async fn serve(listener: TcpListener) {
56    let mut server = Server::<Login, Outbox, Nick>::start(|proxy| {
57        drop(tokio::spawn(accept_users(listener).for_each1(
58            move |user| {
59                future::ready(proxy.clone(|proxy| {
60                    drop(tokio::spawn(async {
61                        let Some(login) = user.recv1().await else {
62                            return;
63                        };
64                        proxy.connect().link(login);
65                    }))
66                }))
67            },
68        )))
69    });
70
71    type Inboxes = HashMap<Nick, Inbox>;
72    fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73        let entries = inboxes
74            .drain()
75            .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76            .collect::<Vec<_>>();
77        inboxes.extend(entries);
78    }
79
80    let mut inboxes = Inboxes::new();
81
82    while let Some((new_server, transition)) = server.poll().await {
83        server = new_server;
84
85        match transition {
86            Event::Connect { session: login } => {
87                let (nick, resp) = login.recv().await;
88                let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89                    resp.send1(Err(LoginRefused));
90                    continue;
91                };
92                let (inbox, conn) = resp.choose(Ok).recv().await;
93                entry.insert(inbox);
94                broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95                server.suspend(nick, |c| conn.send1(c));
96            }
97
98            Event::Resume {
99                session: outbox,
100                data: nick,
101            } => match outbox.recv1().await {
102                Command::Message(msg) => {
103                    let (content, conn) = msg.recv().await;
104                    broadcast(
105                        &mut inboxes,
106                        ChatLine::Message {
107                            from: nick.clone(),
108                            content,
109                        },
110                    );
111                    server.suspend(nick, |c| conn.send1(c));
112                }
113                Command::Logout => {
114                    if let Some(inbox) = inboxes.remove(&nick) {
115                        inbox.close1();
116                    }
117                    broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118                }
119            },
120        }
121    }
122}
Source

pub fn poll_recv(self, cx: &mut Context<'_>) -> Result<(T, S), Self>

Source§

impl<T> Recv<T, ()>
where T: Send + 'static,

Source

pub async fn recv1(self) -> T

Waits to obtain a value of type T, and discards the empty continuation.

Examples found in repository?
examples/multiple_participants.rs (line 79)
77fn random_player() -> Player {
78    fork(|mut round: Round| async move {
79        while let Outcome::Draw(next_round) = round.send(random_move()).recv1().await {
80            round = next_round;
81        }
82    })
83}
84
85fn random_move() -> Move {
86    if fastrand::bool() {
87        Move::Up
88    } else {
89        Move::Down
90    }
91}
92
93#[tokio::main]
94async fn main() {
95    for _ in 0..10 {
96        let winner = start_playing()
97            .send((random_player(), random_player(), random_player()))
98            .recv1()
99            .await;
100        println!("{:?}!\n", winner);
101    }
102}
More examples
Hide additional examples
examples/sequencing.rs (line 34)
29async fn main() {
30    let sum = start_calculator()
31        .send(3)
32        .send(Op::Plus)
33        .send(4)
34        .recv1()
35        .await;
36
37    let product = start_calculator()
38        .send(3)
39        .send(Op::Times)
40        .send(4)
41        .recv1()
42        .await;
43
44    assert_eq!(sum, 7);
45    assert_eq!(product, 12);
46}
examples/branching.rs (line 26)
23fn check_balance(number: String) -> Client {
24    fork(|atm: ATM| async move {
25        let atm = atm.send(Account(number.clone()));
26        let Ok(atm) = atm.recv1().await else {
27            return println!("Invalid account: {}", number);
28        };
29        let Amount(funds) = atm.choose(Operation::CheckBalance).recv1().await;
30        println!("{} has {}", number, funds);
31    })
32}
33
34fn withdraw(number: String, Amount(requested): Amount) -> Client {
35    fork(|atm: ATM| async move {
36        let Ok(atm) = atm.send(Account(number.clone())).recv1().await else {
37            return println!("Invalid account: {}", number);
38        };
39        let response = atm
40            .choose(Operation::Withdraw)
41            .send(Amount(requested))
42            .recv1()
43            .await;
44        match response {
45            Ok(Money(withdrawn)) => println!("{} withdrawn from {}", withdrawn, number),
46            Err(InsufficientFunds) => println!(
47                "{} has insufficient funds to withdraw {}",
48                number, requested
49            ),
50        }
51    })
52}
53
54fn boot_atm(accounts: Arc<HashMap<String, Money>>) -> ATM {
55    fork(|client: Client| async move {
56        let (Account(number), client) = client.recv().await;
57        let Some(&Money(funds)) = accounts.get(&number) else {
58            return client.send1(Err(InvalidAccount));
59        };
60        match client.choose(Ok).recv1().await {
61            Operation::CheckBalance(client) => client.send1(Amount(funds)),
62            Operation::Withdraw(client) => {
63                let (Amount(requested), client) = client.recv().await;
64                if funds >= requested {
65                    client.send1(Ok(Money(requested)));
66                } else {
67                    client.send1(Err(InsufficientFunds));
68                }
69            }
70        }
71    })
72}
examples/recursion.rs (line 17)
13fn start_counting() -> Send<Counting> {
14    fork(|mut numbers: Recv<Counting>| async {
15        let mut total = 0;
16        loop {
17            match numbers.recv1().await {
18                Counting::More(number) => {
19                    let (n, next) = number.recv().await;
20                    total += n;
21                    numbers = next;
22                }
23                Counting::Done(report) => break report.send1(total),
24            }
25        }
26    })
27}
28
29type Numbers = Dequeue<i64, Send<i64>>;
30type Counter = Dual<Numbers>;
31
32fn start_counting_with_queue() -> Counter {
33    fork(|numbers: Numbers| async {
34        let (total, report) = numbers
35            .fold(0, |total, add| async move { total + add })
36            .await;
37        report.send1(total);
38    })
39}
40
41#[tokio::main]
42async fn main() {
43    let sum = start_counting()
44        .choose(Counting::More)
45        .send(1)
46        .choose(Counting::More)
47        .send(2)
48        .choose(Counting::More)
49        .send(3)
50        .choose(Counting::More)
51        .send(4)
52        .choose(Counting::More)
53        .send(5)
54        .choose(Counting::Done)
55        .recv1()
56        .await;
57
58    assert_eq!(sum, 15);
59
60    let sum = start_counting_with_queue()
61        .push(1)
62        .push(2)
63        .push(3)
64        .push(4)
65        .push(5)
66        .close()
67        .recv1()
68        .await;
69
70    assert_eq!(sum, 15);
71}
examples/chat.rs (line 61)
55async fn serve(listener: TcpListener) {
56    let mut server = Server::<Login, Outbox, Nick>::start(|proxy| {
57        drop(tokio::spawn(accept_users(listener).for_each1(
58            move |user| {
59                future::ready(proxy.clone(|proxy| {
60                    drop(tokio::spawn(async {
61                        let Some(login) = user.recv1().await else {
62                            return;
63                        };
64                        proxy.connect().link(login);
65                    }))
66                }))
67            },
68        )))
69    });
70
71    type Inboxes = HashMap<Nick, Inbox>;
72    fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73        let entries = inboxes
74            .drain()
75            .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76            .collect::<Vec<_>>();
77        inboxes.extend(entries);
78    }
79
80    let mut inboxes = Inboxes::new();
81
82    while let Some((new_server, transition)) = server.poll().await {
83        server = new_server;
84
85        match transition {
86            Event::Connect { session: login } => {
87                let (nick, resp) = login.recv().await;
88                let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89                    resp.send1(Err(LoginRefused));
90                    continue;
91                };
92                let (inbox, conn) = resp.choose(Ok).recv().await;
93                entry.insert(inbox);
94                broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95                server.suspend(nick, |c| conn.send1(c));
96            }
97
98            Event::Resume {
99                session: outbox,
100                data: nick,
101            } => match outbox.recv1().await {
102                Command::Message(msg) => {
103                    let (content, conn) = msg.recv().await;
104                    broadcast(
105                        &mut inboxes,
106                        ChatLine::Message {
107                            from: nick.clone(),
108                            content,
109                        },
110                    );
111                    server.suspend(nick, |c| conn.send1(c));
112                }
113                Command::Logout => {
114                    if let Some(inbox) = inboxes.remove(&nick) {
115                        inbox.close1();
116                    }
117                    broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118                }
119            },
120        }
121    }
122}
123
124fn accept_users(listener: TcpListener) -> Dequeue<Recv<Option<Login>>> {
125    fork(|mut queue: Enqueue<Recv<Option<Login>>>| async move {
126        while let Ok((stream, _)) = listener.accept().await {
127            eprintln!("Client connecting...");
128
129            let Ok(addr) = stream.peer_addr() else {
130                eprintln!("ERROR: No peer address");
131                continue;
132            };
133            let Ok(web_socket) = tokio_tungstenite::accept_async(stream).await else {
134                eprintln!("ERROR: Handshake failed with {}", addr);
135                continue;
136            };
137
138            eprintln!("New WebSocket connection: {}", addr);
139            queue = queue.push(handle_user(web_socket));
140        }
141        queue.close1();
142    })
143}
144
145fn handle_user(socket: WebSocketStream<TcpStream>) -> Recv<Option<Login>> {
146    let (write, read) = socket.split();
147    let inbox = handle_inbox(write);
148    let messages = read_socket(read);
149
150    fork(|try_login: Send<Option<Login>>| async {
151        let inbox = inbox.push(ChatLine::Info(format!("What's your name?")));
152        let Queue::Item(name, messages) = messages.pop().await else {
153            inbox.close1();
154            return try_login.send1(None);
155        };
156
157        let Ok(accepted) = try_login.choose(Some).send(Nick(name)).recv1().await else {
158            inbox
159                .push(ChatLine::Error(format!("Login refused")))
160                .close1();
161            return messages.for_each1(|_| future::ready(())).await;
162        };
163
164        let conn = messages
165            .fold1(accepted.send(inbox).recv1().await, |conn, content| async {
166                conn.resume()
167                    .choose(Command::Message)
168                    .send(content)
169                    .recv1()
170                    .await
171            })
172            .await;
173
174        conn.resume().send1(Command::Logout);
175    })
176}
Source

pub fn poll_recv1(self, cx: &mut Context<'_>) -> Result<T, Self>

Trait Implementations§

Source§

impl<T, S: Session> Session for Recv<T, S>
where T: Send + 'static,

Source§

type Dual = Send<T, <S as Session>::Dual>

Source§

fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self

Auto Trait Implementations§

§

impl<T, S> Freeze for Recv<T, S>

§

impl<T, S = ()> !RefUnwindSafe for Recv<T, S>

§

impl<T, S> Send for Recv<T, S>
where T: Send,

§

impl<T, S> Sync for Recv<T, S>
where T: Send,

§

impl<T, S> Unpin for Recv<T, S>

§

impl<T, S = ()> !UnwindSafe for Recv<T, S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.