chat/
chat.rs

1use futures::{
2    future,
3    stream::{SplitSink, SplitStream},
4    SinkExt, StreamExt,
5};
6use par::{
7    exchange::{Recv, Send},
8    queue::{Dequeue, Enqueue, Queue},
9    runtimes::tokio::fork,
10    server::{Connection, Event, Server},
11    Dual, Session,
12};
13use std::{
14    collections::{hash_map::Entry, HashMap},
15    io,
16};
17use tokio::net::{TcpListener, TcpStream};
18use tokio_tungstenite::{
19    tungstenite::{self, Message},
20    WebSocketStream,
21};
22
23type WebSocketWrite = SplitSink<WebSocketStream<TcpStream>, tungstenite::Message>;
24type WebSocketRead = SplitStream<WebSocketStream<TcpStream>>;
25
26#[derive(Clone, PartialEq, Eq, Hash)]
27struct Nick(String);
28struct LoginRefused;
29
30#[derive(Clone)]
31enum ChatLine {
32    Message { from: Nick, content: String },
33    Info(String),
34    Error(String),
35}
36
37type Login = Recv<Nick, Send<Result<Send<Inbox, Recv<Conn>>, LoginRefused>>>;
38type Inbox = Enqueue<ChatLine>;
39type Conn = Connection<Dual<Outbox>>;
40type Outbox = Recv<Command>;
41enum Command {
42    Message(Recv<String, Send<Conn>>),
43    Logout,
44}
45
46#[tokio::main]
47async fn main() -> io::Result<()> {
48    let addr = "127.0.0.1:3000";
49    let listener = TcpListener::bind(addr).await?;
50    eprintln!("Listening on: {}", addr);
51    serve(listener).await;
52    Ok(())
53}
54
55async fn serve(listener: TcpListener) {
56    let mut server = Server::<Login, Outbox, Nick>::start(|proxy| {
57        drop(tokio::spawn(accept_users(listener).for_each1(
58            move |user| {
59                future::ready(proxy.clone(|proxy| {
60                    drop(tokio::spawn(async {
61                        let Some(login) = user.recv1().await else {
62                            return;
63                        };
64                        proxy.connect().link(login);
65                    }))
66                }))
67            },
68        )))
69    });
70
71    type Inboxes = HashMap<Nick, Inbox>;
72    fn broadcast(inboxes: &mut Inboxes, line: ChatLine) {
73        let entries = inboxes
74            .drain()
75            .map(|(nick, inbox)| (nick, inbox.push(line.clone())))
76            .collect::<Vec<_>>();
77        inboxes.extend(entries);
78    }
79
80    let mut inboxes = Inboxes::new();
81
82    while let Some((new_server, transition)) = server.poll().await {
83        server = new_server;
84
85        match transition {
86            Event::Connect { session: login } => {
87                let (nick, resp) = login.recv().await;
88                let Entry::Vacant(entry) = inboxes.entry(nick.clone()) else {
89                    resp.send1(Err(LoginRefused));
90                    continue;
91                };
92                let (inbox, conn) = resp.choose(Ok).recv().await;
93                entry.insert(inbox);
94                broadcast(&mut inboxes, ChatLine::Info(format!("{} joined", nick.0)));
95                server.suspend(nick, |c| conn.send1(c));
96            }
97
98            Event::Resume {
99                session: outbox,
100                data: nick,
101            } => match outbox.recv1().await {
102                Command::Message(msg) => {
103                    let (content, conn) = msg.recv().await;
104                    broadcast(
105                        &mut inboxes,
106                        ChatLine::Message {
107                            from: nick.clone(),
108                            content,
109                        },
110                    );
111                    server.suspend(nick, |c| conn.send1(c));
112                }
113                Command::Logout => {
114                    if let Some(inbox) = inboxes.remove(&nick) {
115                        inbox.close1();
116                    }
117                    broadcast(&mut inboxes, ChatLine::Info(format!("{} left", nick.0)));
118                }
119            },
120        }
121    }
122}
123
124fn accept_users(listener: TcpListener) -> Dequeue<Recv<Option<Login>>> {
125    fork(|mut queue: Enqueue<Recv<Option<Login>>>| async move {
126        while let Ok((stream, _)) = listener.accept().await {
127            eprintln!("Client connecting...");
128
129            let Ok(addr) = stream.peer_addr() else {
130                eprintln!("ERROR: No peer address");
131                continue;
132            };
133            let Ok(web_socket) = tokio_tungstenite::accept_async(stream).await else {
134                eprintln!("ERROR: Handshake failed with {}", addr);
135                continue;
136            };
137
138            eprintln!("New WebSocket connection: {}", addr);
139            queue = queue.push(handle_user(web_socket));
140        }
141        queue.close1();
142    })
143}
144
145fn handle_user(socket: WebSocketStream<TcpStream>) -> Recv<Option<Login>> {
146    let (write, read) = socket.split();
147    let inbox = handle_inbox(write);
148    let messages = read_socket(read);
149
150    fork(|try_login: Send<Option<Login>>| async {
151        let inbox = inbox.push(ChatLine::Info(format!("What's your name?")));
152        let Queue::Item(name, messages) = messages.pop().await else {
153            inbox.close1();
154            return try_login.send1(None);
155        };
156
157        let Ok(accepted) = try_login.choose(Some).send(Nick(name)).recv1().await else {
158            inbox
159                .push(ChatLine::Error(format!("Login refused")))
160                .close1();
161            return messages.for_each1(|_| future::ready(())).await;
162        };
163
164        let conn = messages
165            .fold1(accepted.send(inbox).recv1().await, |conn, content| async {
166                conn.resume()
167                    .choose(Command::Message)
168                    .send(content)
169                    .recv1()
170                    .await
171            })
172            .await;
173
174        conn.resume().send1(Command::Logout);
175    })
176}
177
178fn handle_inbox(write: WebSocketWrite) -> Inbox {
179    fork(|lines: Dequeue<ChatLine>| async {
180        lines
181            .fold1(write, |mut write, line| async {
182                write
183                    .send(Message::text(match line {
184                        ChatLine::Message {
185                            from: Nick(name),
186                            content,
187                        } => format!("{}> {}", name, content),
188                        ChatLine::Info(content) => format!("> {}", content),
189                        ChatLine::Error(content) => format!("? {}", content),
190                    }))
191                    .await
192                    .unwrap_or_else(|err| eprintln!("ERROR: {}", err));
193                write
194            })
195            .await
196            .close()
197            .await
198            .unwrap_or_else(|err| eprintln!("ERROR: {}", err));
199    })
200}
201
202fn read_socket(read: WebSocketRead) -> Dequeue<String> {
203    fork(|queue: Enqueue<String>| async {
204        read.fold(queue, |queue, msg| async {
205            match msg {
206                Ok(Message::Text(content)) => queue.push(content),
207                _ => queue,
208            }
209        })
210        .await
211        .close1()
212    })
213}