pub struct Enqueue<T, S: Session = ()> { /* private fields */ }
Expand description
Accepts an arbitrary number of values of type T
, then proceeds according to S
. Its dual
is Dequeue<T, Dual<S>>
.
Use push
to send a value over the queue. To stop sending values and obtain the
continuaiton S
, use close
, or close1
if S
is ()
(the
empty session).
Implementations§
Source§impl<T, S: Session> Enqueue<T, S>where
T: Send + 'static,
impl<T, S: Session> Enqueue<T, S>where
T: Send + 'static,
Sourcepub fn close(self) -> S
pub fn close(self) -> S
Closes the queue, signaling to the other side that no more items will be pushed. Returns
the continuation S
.
Examples found in repository?
examples/recursion.rs (line 66)
42async fn main() {
43 let sum = start_counting()
44 .choose(Counting::More)
45 .send(1)
46 .choose(Counting::More)
47 .send(2)
48 .choose(Counting::More)
49 .send(3)
50 .choose(Counting::More)
51 .send(4)
52 .choose(Counting::More)
53 .send(5)
54 .choose(Counting::Done)
55 .recv1()
56 .await;
57
58 assert_eq!(sum, 15);
59
60 let sum = start_counting_with_queue()
61 .push(1)
62 .push(2)
63 .push(3)
64 .push(4)
65 .push(5)
66 .close()
67 .recv1()
68 .await;
69
70 assert_eq!(sum, 15);
71}
Sourcepub fn push(self, item: T) -> Self
pub fn push(self, item: T) -> Self
Pushes a value of type T
into the queue. The items will be received in the same order
as they were pushed.
Examples found in repository?
examples/chat.rs (line 75)
72 fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73 let entries = inboxes
74 .drain()
75 .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76 .collect::<Vec<_>>();
77 inboxes.extend(entries);
78 }
79
80 let mut inboxes = Inboxes::new();
81
82 while let Some((new_server, transition)) = server.poll().await {
83 server = new_server;
84
85 match transition {
86 Event::Connect { session: login } => {
87 let (nick, resp) = login.recv().await;
88 let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89 resp.send1(Err(LoginRefused));
90 continue;
91 };
92 let (inbox, conn) = resp.choose(Ok).recv().await;
93 entry.insert(inbox);
94 broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95 server.suspend(nick, |c| conn.send1(c));
96 }
97
98 Event::Resume {
99 session: outbox,
100 data: nick,
101 } => match outbox.recv1().await {
102 Command::Message(msg) => {
103 let (content, conn) = msg.recv().await;
104 broadcast(
105 &mut inboxes,
106 ChatLine::Message {
107 from: nick.clone(),
108 content,
109 },
110 );
111 server.suspend(nick, |c| conn.send1(c));
112 }
113 Command::Logout => {
114 if let Some(inbox) = inboxes.remove(&nick) {
115 inbox.close1();
116 }
117 broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118 }
119 },
120 }
121 }
122}
123
124fn accept_users(listener: TcpListener) -> Dequeue<Recv<Option<Login>>> {
125 fork(|mut queue: Enqueue<Recv<Option<Login>>>| async move {
126 while let Ok((stream, _)) = listener.accept().await {
127 eprintln!("Client connecting...");
128
129 let Ok(addr) = stream.peer_addr() else {
130 eprintln!("ERROR: No peer address");
131 continue;
132 };
133 let Ok(web_socket) = tokio_tungstenite::accept_async(stream).await else {
134 eprintln!("ERROR: Handshake failed with {}", addr);
135 continue;
136 };
137
138 eprintln!("New WebSocket connection: {}", addr);
139 queue = queue.push(handle_user(web_socket));
140 }
141 queue.close1();
142 })
143}
144
145fn handle_user(socket: WebSocketStream<TcpStream>) -> Recv<Option<Login>> {
146 let (write, read) = socket.split();
147 let inbox = handle_inbox(write);
148 let messages = read_socket(read);
149
150 fork(|try_login: Send<Option<Login>>| async {
151 let inbox = inbox.push(ChatLine::Info(format!("What's your name?")));
152 let Queue::Item(name, messages) = messages.pop().await else {
153 inbox.close1();
154 return try_login.send1(None);
155 };
156
157 let Ok(accepted) = try_login.choose(Some).send(Nick(name)).recv1().await else {
158 inbox
159 .push(ChatLine::Error(format!("Login refused")))
160 .close1();
161 return messages.for_each1(|_| future::ready(())).await;
162 };
163
164 let conn = messages
165 .fold1(accepted.send(inbox).recv1().await, |conn, content| async {
166 conn.resume()
167 .choose(Command::Message)
168 .send(content)
169 .recv1()
170 .await
171 })
172 .await;
173
174 conn.resume().send1(Command::Logout);
175 })
176}
177
178fn handle_inbox(write: WebSocketWrite) -> Inbox {
179 fork(|lines: Dequeue<ChatLine>| async {
180 lines
181 .fold1(write, |mut write, line| async {
182 write
183 .send(Message::text(match line {
184 ChatLine::Message {
185 from: Nick(name),
186 content,
187 } => format!("{}> {}", name, content),
188 ChatLine::Info(content) => format!("> {}", content),
189 ChatLine::Error(content) => format!("? {}", content),
190 }))
191 .await
192 .unwrap_or_else(|err| eprintln!("ERROR: {}", err));
193 write
194 })
195 .await
196 .close()
197 .await
198 .unwrap_or_else(|err| eprintln!("ERROR: {}", err));
199 })
200}
201
202fn read_socket(read: WebSocketRead) -> Dequeue<String> {
203 fork(|queue: Enqueue<String>| async {
204 read.fold(queue, |queue, msg| async {
205 match msg {
206 Ok(Message::Text(content)) => queue.push(content),
207 _ => queue,
208 }
209 })
210 .await
211 .close1()
212 })
213}
More examples
examples/recursion.rs (line 61)
42async fn main() {
43 let sum = start_counting()
44 .choose(Counting::More)
45 .send(1)
46 .choose(Counting::More)
47 .send(2)
48 .choose(Counting::More)
49 .send(3)
50 .choose(Counting::More)
51 .send(4)
52 .choose(Counting::More)
53 .send(5)
54 .choose(Counting::Done)
55 .recv1()
56 .await;
57
58 assert_eq!(sum, 15);
59
60 let sum = start_counting_with_queue()
61 .push(1)
62 .push(2)
63 .push(3)
64 .push(4)
65 .push(5)
66 .close()
67 .recv1()
68 .await;
69
70 assert_eq!(sum, 15);
71}
Source§impl<T> Enqueue<T, ()>where
T: Send + 'static,
impl<T> Enqueue<T, ()>where
T: Send + 'static,
Sourcepub fn close1(self)
pub fn close1(self)
Closes the queue, signaling to the other side that no more items will be pushed.
Examples found in repository?
examples/chat.rs (line 115)
55async fn serve(listener: TcpListener) {
56 let mut server = Server::<Login, Outbox, Nick>::start(|proxy| {
57 drop(tokio::spawn(accept_users(listener).for_each1(
58 move |user| {
59 future::ready(proxy.clone(|proxy| {
60 drop(tokio::spawn(async {
61 let Some(login) = user.recv1().await else {
62 return;
63 };
64 proxy.connect().link(login);
65 }))
66 }))
67 },
68 )))
69 });
70
71 type Inboxes = HashMap<Nick, Inbox>;
72 fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73 let entries = inboxes
74 .drain()
75 .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76 .collect::<Vec<_>>();
77 inboxes.extend(entries);
78 }
79
80 let mut inboxes = Inboxes::new();
81
82 while let Some((new_server, transition)) = server.poll().await {
83 server = new_server;
84
85 match transition {
86 Event::Connect { session: login } => {
87 let (nick, resp) = login.recv().await;
88 let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89 resp.send1(Err(LoginRefused));
90 continue;
91 };
92 let (inbox, conn) = resp.choose(Ok).recv().await;
93 entry.insert(inbox);
94 broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95 server.suspend(nick, |c| conn.send1(c));
96 }
97
98 Event::Resume {
99 session: outbox,
100 data: nick,
101 } => match outbox.recv1().await {
102 Command::Message(msg) => {
103 let (content, conn) = msg.recv().await;
104 broadcast(
105 &mut inboxes,
106 ChatLine::Message {
107 from: nick.clone(),
108 content,
109 },
110 );
111 server.suspend(nick, |c| conn.send1(c));
112 }
113 Command::Logout => {
114 if let Some(inbox) = inboxes.remove(&nick) {
115 inbox.close1();
116 }
117 broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118 }
119 },
120 }
121 }
122}
123
124fn accept_users(listener: TcpListener) -> Dequeue<Recv<Option<Login>>> {
125 fork(|mut queue: Enqueue<Recv<Option<Login>>>| async move {
126 while let Ok((stream, _)) = listener.accept().await {
127 eprintln!("Client connecting...");
128
129 let Ok(addr) = stream.peer_addr() else {
130 eprintln!("ERROR: No peer address");
131 continue;
132 };
133 let Ok(web_socket) = tokio_tungstenite::accept_async(stream).await else {
134 eprintln!("ERROR: Handshake failed with {}", addr);
135 continue;
136 };
137
138 eprintln!("New WebSocket connection: {}", addr);
139 queue = queue.push(handle_user(web_socket));
140 }
141 queue.close1();
142 })
143}
144
145fn handle_user(socket: WebSocketStream<TcpStream>) -> Recv<Option<Login>> {
146 let (write, read) = socket.split();
147 let inbox = handle_inbox(write);
148 let messages = read_socket(read);
149
150 fork(|try_login: Send<Option<Login>>| async {
151 let inbox = inbox.push(ChatLine::Info(format!("What's your name?")));
152 let Queue::Item(name, messages) = messages.pop().await else {
153 inbox.close1();
154 return try_login.send1(None);
155 };
156
157 let Ok(accepted) = try_login.choose(Some).send(Nick(name)).recv1().await else {
158 inbox
159 .push(ChatLine::Error(format!("Login refused")))
160 .close1();
161 return messages.for_each1(|_| future::ready(())).await;
162 };
163
164 let conn = messages
165 .fold1(accepted.send(inbox).recv1().await, |conn, content| async {
166 conn.resume()
167 .choose(Command::Message)
168 .send(content)
169 .recv1()
170 .await
171 })
172 .await;
173
174 conn.resume().send1(Command::Logout);
175 })
176}
177
178fn handle_inbox(write: WebSocketWrite) -> Inbox {
179 fork(|lines: Dequeue<ChatLine>| async {
180 lines
181 .fold1(write, |mut write, line| async {
182 write
183 .send(Message::text(match line {
184 ChatLine::Message {
185 from: Nick(name),
186 content,
187 } => format!("{}> {}", name, content),
188 ChatLine::Info(content) => format!("> {}", content),
189 ChatLine::Error(content) => format!("? {}", content),
190 }))
191 .await
192 .unwrap_or_else(|err| eprintln!("ERROR: {}", err));
193 write
194 })
195 .await
196 .close()
197 .await
198 .unwrap_or_else(|err| eprintln!("ERROR: {}", err));
199 })
200}
201
202fn read_socket(read: WebSocketRead) -> Dequeue<String> {
203 fork(|queue: Enqueue<String>| async {
204 read.fold(queue, |queue, msg| async {
205 match msg {
206 Ok(Message::Text(content)) => queue.push(content),
207 _ => queue,
208 }
209 })
210 .await
211 .close1()
212 })
213}
Trait Implementations§
Auto Trait Implementations§
impl<T, S> Freeze for Enqueue<T, S>
impl<T, S = ()> !RefUnwindSafe for Enqueue<T, S>
impl<T, S> Send for Enqueue<T, S>where
T: Send,
impl<T, S> Sync for Enqueue<T, S>where
T: Send,
impl<T, S> Unpin for Enqueue<T, S>
impl<T, S = ()> !UnwindSafe for Enqueue<T, S>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more