pub struct Recv<T, S: Session = ()> { /* private fields */ }Expand description
Supplies a value of type T, then proceeds according to S. Its dual is Send<T, Dual<S>>.
Use recv to obtain the supplied value along with the continuation S.
If the continuation is () (the empty session), use recv1 to obtain T
and discard the continuation.
§Correspondence to linear logic
Recv<A, B>is A ⊗ BRecv<Result<A, B>>is A ⊕ B
Implementations§
Source§impl<T, S: Session> Recv<T, S>where
T: Send + 'static,
impl<T, S: Session> Recv<T, S>where
T: Send + 'static,
Sourcepub async fn recv(self) -> (T, S)
pub async fn recv(self) -> (T, S)
Waits to obtain a value of type T along with the continuation S.
Examples found in repository?
examples/sequencing.rs (line 17)
15fn start_calculator() -> Calculator {
16 fork(|user: User| async {
17 let (x, user) = user.recv().await;
18 let (op, user) = user.recv().await;
19 let (y, user) = user.recv().await;
20 let result = match op {
21 Op::Plus => x + y,
22 Op::Times => x * y,
23 };
24 user.send1(result);
25 })
26}More examples
examples/recursion.rs (line 19)
13fn start_counting() -> Send<Counting> {
14 fork(|mut numbers: Recv<Counting>| async {
15 let mut total = 0;
16 loop {
17 match numbers.recv1().await {
18 Counting::More(number) => {
19 let (n, next) = number.recv().await;
20 total += n;
21 numbers = next;
22 }
23 Counting::Done(report) => break report.send1(total),
24 }
25 }
26 })
27}examples/branching.rs (line 56)
54fn boot_atm(accounts: Arc<HashMap<String, Money>>) -> ATM {
55 fork(|client: Client| async move {
56 let (Account(number), client) = client.recv().await;
57 let Some(&Money(funds)) = accounts.get(&number) else {
58 return client.send1(Err(InvalidAccount));
59 };
60 match client.choose(Ok).recv1().await {
61 Operation::CheckBalance(client) => client.send1(Amount(funds)),
62 Operation::Withdraw(client) => {
63 let (Amount(requested), client) = client.recv().await;
64 if funds >= requested {
65 client.send1(Ok(Money(requested)));
66 } else {
67 client.send1(Err(InsufficientFunds));
68 }
69 }
70 }
71 })
72}examples/multiple_participants.rs (line 36)
32fn start_playing() -> Game {
33 use {Move::*, Outcome::*, Winner::*};
34
35 fork(|game: Dual<Game>| async {
36 let ((mut player1, mut player2, mut player3), winner) = game.recv().await;
37
38 loop {
39 let (move1, outcome1) = player1.recv().await;
40 let (move2, outcome2) = player2.recv().await;
41 let (move3, outcome3) = player3.recv().await;
42
43 tokio::time::sleep(Duration::from_secs(1)).await;
44 println!("{:?} {:?} {:?}", move1, move2, move3);
45 tokio::time::sleep(Duration::from_secs(1)).await;
46
47 match (move1, move2, move3) {
48 (Up, Down, Down) | (Down, Up, Up) => {
49 outcome1.send1(Win);
50 outcome2.send1(Loss);
51 outcome3.send1(Loss);
52 break winner.send1(First);
53 }
54 (Down, Up, Down) | (Up, Down, Up) => {
55 outcome1.send1(Loss);
56 outcome2.send1(Win);
57 outcome3.send1(Loss);
58 break winner.send1(Second);
59 }
60 (Down, Down, Up) | (Up, Up, Down) => {
61 outcome1.send1(Loss);
62 outcome2.send1(Loss);
63 outcome3.send1(Win);
64 break winner.send1(Third);
65 }
66 (Up, Up, Up) | (Down, Down, Down) => {
67 player1 = outcome1.choose(Draw);
68 player2 = outcome2.choose(Draw);
69 player3 = outcome3.choose(Draw);
70 println!("Draw...");
71 }
72 }
73 }
74 })
75}examples/chat.rs (line 87)
55async fn serve(listener: TcpListener) {
56 let mut server = Server::<Login, Outbox, Nick>::start(|proxy| {
57 drop(tokio::spawn(accept_users(listener).for_each1(
58 move |user| {
59 future::ready(proxy.clone(|proxy| {
60 drop(tokio::spawn(async {
61 let Some(login) = user.recv1().await else {
62 return;
63 };
64 proxy.connect().link(login);
65 }))
66 }))
67 },
68 )))
69 });
70
71 type Inboxes = HashMap<Nick, Inbox>;
72 fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73 let entries = inboxes
74 .drain()
75 .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76 .collect::<Vec<_>>();
77 inboxes.extend(entries);
78 }
79
80 let mut inboxes = Inboxes::new();
81
82 while let Some((new_server, transition)) = server.poll().await {
83 server = new_server;
84
85 match transition {
86 Event::Connect { session: login } => {
87 let (nick, resp) = login.recv().await;
88 let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89 resp.send1(Err(LoginRefused));
90 continue;
91 };
92 let (inbox, conn) = resp.choose(Ok).recv().await;
93 entry.insert(inbox);
94 broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95 server.suspend(nick, |c| conn.send1(c));
96 }
97
98 Event::Resume {
99 session: outbox,
100 data: nick,
101 } => match outbox.recv1().await {
102 Command::Message(msg) => {
103 let (content, conn) = msg.recv().await;
104 broadcast(
105 &mut inboxes,
106 ChatLine::Message {
107 from: nick.clone(),
108 content,
109 },
110 );
111 server.suspend(nick, |c| conn.send1(c));
112 }
113 Command::Logout => {
114 if let Some(inbox) = inboxes.remove(&nick) {
115 inbox.close1();
116 }
117 broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118 }
119 },
120 }
121 }
122}pub fn poll_recv(self, cx: &mut Context<'_>) -> Result<(T, S), Self>
Source§impl<T> Recv<T, ()>where
T: Send + 'static,
impl<T> Recv<T, ()>where
T: Send + 'static,
Sourcepub async fn recv1(self) -> T
pub async fn recv1(self) -> T
Waits to obtain a value of type T, and discards the empty continuation.
Examples found in repository?
examples/multiple_participants.rs (line 79)
77fn random_player() -> Player {
78 fork(|mut round: Round| async move {
79 while let Outcome::Draw(next_round) = round.send(random_move()).recv1().await {
80 round = next_round;
81 }
82 })
83}
84
85fn random_move() -> Move {
86 if fastrand::bool() {
87 Move::Up
88 } else {
89 Move::Down
90 }
91}
92
93#[tokio::main]
94async fn main() {
95 for _ in 0..10 {
96 let winner = start_playing()
97 .send((random_player(), random_player(), random_player()))
98 .recv1()
99 .await;
100 println!("{:?}!\n", winner);
101 }
102}More examples
examples/sequencing.rs (line 34)
29async fn main() {
30 let sum = start_calculator()
31 .send(3)
32 .send(Op::Plus)
33 .send(4)
34 .recv1()
35 .await;
36
37 let product = start_calculator()
38 .send(3)
39 .send(Op::Times)
40 .send(4)
41 .recv1()
42 .await;
43
44 assert_eq!(sum, 7);
45 assert_eq!(product, 12);
46}examples/branching.rs (line 26)
23fn check_balance(number: String) -> Client {
24 fork(|atm: ATM| async move {
25 let atm = atm.send(Account(number.clone()));
26 let Ok(atm) = atm.recv1().await else {
27 return println!("Invalid account: {}", number);
28 };
29 let Amount(funds) = atm.choose(Operation::CheckBalance).recv1().await;
30 println!("{} has {}", number, funds);
31 })
32}
33
34fn withdraw(number: String, Amount(requested): Amount) -> Client {
35 fork(|atm: ATM| async move {
36 let Ok(atm) = atm.send(Account(number.clone())).recv1().await else {
37 return println!("Invalid account: {}", number);
38 };
39 let response = atm
40 .choose(Operation::Withdraw)
41 .send(Amount(requested))
42 .recv1()
43 .await;
44 match response {
45 Ok(Money(withdrawn)) => println!("{} withdrawn from {}", withdrawn, number),
46 Err(InsufficientFunds) => println!(
47 "{} has insufficient funds to withdraw {}",
48 number, requested
49 ),
50 }
51 })
52}
53
54fn boot_atm(accounts: Arc<HashMap<String, Money>>) -> ATM {
55 fork(|client: Client| async move {
56 let (Account(number), client) = client.recv().await;
57 let Some(&Money(funds)) = accounts.get(&number) else {
58 return client.send1(Err(InvalidAccount));
59 };
60 match client.choose(Ok).recv1().await {
61 Operation::CheckBalance(client) => client.send1(Amount(funds)),
62 Operation::Withdraw(client) => {
63 let (Amount(requested), client) = client.recv().await;
64 if funds >= requested {
65 client.send1(Ok(Money(requested)));
66 } else {
67 client.send1(Err(InsufficientFunds));
68 }
69 }
70 }
71 })
72}examples/recursion.rs (line 17)
13fn start_counting() -> Send<Counting> {
14 fork(|mut numbers: Recv<Counting>| async {
15 let mut total = 0;
16 loop {
17 match numbers.recv1().await {
18 Counting::More(number) => {
19 let (n, next) = number.recv().await;
20 total += n;
21 numbers = next;
22 }
23 Counting::Done(report) => break report.send1(total),
24 }
25 }
26 })
27}
28
29type Numbers = Dequeue<i64, Send<i64>>;
30type Counter = Dual<Numbers>;
31
32fn start_counting_with_queue() -> Counter {
33 fork(|numbers: Numbers| async {
34 let (total, report) = numbers
35 .fold(0, |total, add| async move { total + add })
36 .await;
37 report.send1(total);
38 })
39}
40
41#[tokio::main]
42async fn main() {
43 let sum = start_counting()
44 .choose(Counting::More)
45 .send(1)
46 .choose(Counting::More)
47 .send(2)
48 .choose(Counting::More)
49 .send(3)
50 .choose(Counting::More)
51 .send(4)
52 .choose(Counting::More)
53 .send(5)
54 .choose(Counting::Done)
55 .recv1()
56 .await;
57
58 assert_eq!(sum, 15);
59
60 let sum = start_counting_with_queue()
61 .push(1)
62 .push(2)
63 .push(3)
64 .push(4)
65 .push(5)
66 .close()
67 .recv1()
68 .await;
69
70 assert_eq!(sum, 15);
71}examples/chat.rs (line 61)
55async fn serve(listener: TcpListener) {
56 let mut server = Server::<Login, Outbox, Nick>::start(|proxy| {
57 drop(tokio::spawn(accept_users(listener).for_each1(
58 move |user| {
59 future::ready(proxy.clone(|proxy| {
60 drop(tokio::spawn(async {
61 let Some(login) = user.recv1().await else {
62 return;
63 };
64 proxy.connect().link(login);
65 }))
66 }))
67 },
68 )))
69 });
70
71 type Inboxes = HashMap<Nick, Inbox>;
72 fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73 let entries = inboxes
74 .drain()
75 .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76 .collect::<Vec<_>>();
77 inboxes.extend(entries);
78 }
79
80 let mut inboxes = Inboxes::new();
81
82 while let Some((new_server, transition)) = server.poll().await {
83 server = new_server;
84
85 match transition {
86 Event::Connect { session: login } => {
87 let (nick, resp) = login.recv().await;
88 let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89 resp.send1(Err(LoginRefused));
90 continue;
91 };
92 let (inbox, conn) = resp.choose(Ok).recv().await;
93 entry.insert(inbox);
94 broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95 server.suspend(nick, |c| conn.send1(c));
96 }
97
98 Event::Resume {
99 session: outbox,
100 data: nick,
101 } => match outbox.recv1().await {
102 Command::Message(msg) => {
103 let (content, conn) = msg.recv().await;
104 broadcast(
105 &mut inboxes,
106 ChatLine::Message {
107 from: nick.clone(),
108 content,
109 },
110 );
111 server.suspend(nick, |c| conn.send1(c));
112 }
113 Command::Logout => {
114 if let Some(inbox) = inboxes.remove(&nick) {
115 inbox.close1();
116 }
117 broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118 }
119 },
120 }
121 }
122}
123
124fn accept_users(listener: TcpListener) -> Dequeue<Recv<Option<Login>>> {
125 fork(|mut queue: Enqueue<Recv<Option<Login>>>| async move {
126 while let Ok((stream, _)) = listener.accept().await {
127 eprintln!("Client connecting...");
128
129 let Ok(addr) = stream.peer_addr() else {
130 eprintln!("ERROR: No peer address");
131 continue;
132 };
133 let Ok(web_socket) = tokio_tungstenite::accept_async(stream).await else {
134 eprintln!("ERROR: Handshake failed with {}", addr);
135 continue;
136 };
137
138 eprintln!("New WebSocket connection: {}", addr);
139 queue = queue.push(handle_user(web_socket));
140 }
141 queue.close1();
142 })
143}
144
145fn handle_user(socket: WebSocketStream<TcpStream>) -> Recv<Option<Login>> {
146 let (write, read) = socket.split();
147 let inbox = handle_inbox(write);
148 let messages = read_socket(read);
149
150 fork(|try_login: Send<Option<Login>>| async {
151 let inbox = inbox.push(ChatLine::Info(format!("What's your name?")));
152 let Queue::Item(name, messages) = messages.pop().await else {
153 inbox.close1();
154 return try_login.send1(None);
155 };
156
157 let Ok(accepted) = try_login.choose(Some).send(Nick(name)).recv1().await else {
158 inbox
159 .push(ChatLine::Error(format!("Login refused")))
160 .close1();
161 return messages.for_each1(|_| future::ready(())).await;
162 };
163
164 let conn = messages
165 .fold1(accepted.send(inbox).recv1().await, |conn, content| async {
166 conn.resume()
167 .choose(Command::Message)
168 .send(content)
169 .recv1()
170 .await
171 })
172 .await;
173
174 conn.resume().send1(Command::Logout);
175 })
176}pub fn poll_recv1(self, cx: &mut Context<'_>) -> Result<T, Self>
Trait Implementations§
Auto Trait Implementations§
impl<T, S> Freeze for Recv<T, S>
impl<T, S = ()> !RefUnwindSafe for Recv<T, S>
impl<T, S> Send for Recv<T, S>where
T: Send,
impl<T, S> Sync for Recv<T, S>where
T: Send,
impl<T, S> Unpin for Recv<T, S>
impl<T, S = ()> !UnwindSafe for Recv<T, S>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more