bevy_nest/
server.rs

1use std::sync::Arc;
2
3use bevy::prelude::*;
4use dashmap::DashMap;
5use tokio::{
6    io::{AsyncReadExt, AsyncWriteExt},
7    net::{TcpListener, ToSocketAddrs},
8    runtime::{Builder, Runtime},
9    task::JoinHandle,
10};
11use uuid::Uuid;
12
13use crate::{
14    channel::Channel,
15    errors::NetworkError,
16    events::{Inbox, IncomingConnection, Message, NetworkEvent, Outbox},
17    telnet::*,
18};
19
20/// A unique identifier for a client.
21#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
22pub struct ClientId(Uuid);
23
24impl ClientId {
25    pub fn new() -> Self {
26        Self(Uuid::new_v4())
27    }
28}
29
30impl Default for ClientId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36struct Client {
37    outbox: Channel<Outbox>,
38    #[allow(dead_code)]
39    read_task: JoinHandle<()>,
40    #[allow(dead_code)]
41    write_task: JoinHandle<()>,
42}
43
44#[derive(Resource)]
45pub struct Server {
46    runtime: Runtime,
47    clients: Arc<DashMap<ClientId, Client>>,
48    // Incoming connections.
49    pub(crate) incoming: Channel<IncomingConnection>,
50    // Recently disconnected clients.
51    pub(crate) lost: Channel<ClientId>,
52    // Network events.
53    pub(crate) events: Channel<NetworkEvent>,
54    // Messages received from clients.
55    pub(crate) inbox: Channel<Inbox>,
56}
57
58impl Server {
59    pub(crate) fn new() -> Self {
60        Self {
61            runtime: Builder::new_multi_thread()
62                .enable_io()
63                .build()
64                .expect("Could not build runtime"),
65            incoming: Channel::new(),
66            clients: Arc::new(DashMap::new()),
67            lost: Channel::new(),
68            events: Channel::new(),
69            inbox: Channel::new(),
70        }
71    }
72
73    /// Start listening for incoming connections on the given address.
74    /// This should be called from [`add_startup_system`](bevy::app::App.add_startup_system).
75    pub fn listen(&self, address: impl ToSocketAddrs + Send + 'static) {
76        let events = self.events.sender.clone();
77        let incoming = self.incoming.sender.clone();
78
79        // Spawn a new task to listen for incoming connections.
80        self.runtime.spawn(async move {
81            // Create a TCP listener.
82            let listener = match TcpListener::bind(address).await {
83                Ok(listener) => listener,
84                Err(err) => {
85                    if let Err(error) = events.send(NetworkEvent::Error(NetworkError::Listen(err)))
86                    {
87                        error!("Could not send error: {error}");
88                    };
89
90                    return;
91                }
92            };
93
94            info!("Listening");
95
96            loop {
97                // Wait for a new connection.
98                match listener.accept().await {
99                    // If we get a new connection, send it to the incoming channel
100                    // to be proccessed later.
101                    Ok((socket, address)) => {
102                        info!("Accepted connection from {address}");
103
104                        if let Err(err) = incoming.send(IncomingConnection { socket }) {
105                            error!("Failed to send incoming connection: {err}");
106                        }
107                    }
108                    Err(err) => {
109                        if let Err(err) =
110                            events.send(NetworkEvent::Error(NetworkError::Accept(err)))
111                        {
112                            error!("Could not send error: {err}");
113                        };
114                    }
115                }
116            }
117        });
118    }
119
120    /// Disconnect a client. This will send a [`NetworkEvent::Disconnected`] event.
121    pub fn disconnect(&self, client_id: &ClientId) {
122        self.remove_client(client_id);
123    }
124
125    pub(crate) fn setup_client(&self, connection: IncomingConnection) {
126        let (mut read_socket, mut write_socket) = connection.socket.into_split();
127
128        let id = ClientId::new();
129        let outbox: Channel<Outbox> = Channel::new();
130
131        let read_events_sender = self.events.sender.clone();
132        let write_events_sender = self.events.sender.clone();
133        let inbox_sender = self.inbox.sender.clone();
134        let outbox_receiver = outbox.receiver.clone();
135        let lost_sender = self.lost.sender.clone();
136
137        self.clients.insert(
138            id,
139            Client {
140                outbox,
141                // Spawn a new task to read from the socket.
142                // Messages received are sent to the server's inbox.
143                read_task: self.runtime.spawn(async move {
144                    // Create a buffer to read data into.
145                    let max_packet_size = 1024;
146                    let mut buffer = vec![0; max_packet_size];
147
148                    info!("Starting read task for {id:?}");
149
150                    loop {
151                        // Read data from the socket.
152                        let length = match read_socket.read(&mut buffer).await {
153                            Ok(n) => n,
154                            Err(err) => {
155                                if let Err(err) = read_events_sender
156                                    .send(NetworkEvent::Error(NetworkError::SocketRead(err, id)))
157                                {
158                                    error!("Could not send error: {err}");
159                                };
160
161                                break;
162                            }
163                        };
164
165                        // If the length is 0, the socket has been closed.
166                        if length == 0 {
167                            if let Err(err) = lost_sender.send(id) {
168                                error!("Could not send lost connection: {err}");
169                            }
170
171                            break;
172                        }
173
174                        if buffer[0] == 255 {
175                            // This is a command because the first byte is 255.
176                            // See: https://users.cs.cf.ac.uk/Dave.Marshall/Internet/node141.html
177                            if let Err(error) = inbox_sender.send(Inbox {
178                                from: id,
179                                content: Message::Command(buffer[..length].to_vec()),
180                            }) {
181                                error!("Could not send to inbox: {error}");
182                            }
183                        } else {
184                            // Convert the buffer into a string.
185                            let clean = std::str::from_utf8(&buffer[..length]).unwrap_or("").trim();
186
187                            // Send the message to the inbox.
188                            if !clean.is_empty() {
189                                if let Err(error) = inbox_sender.send(Inbox {
190                                    from: id,
191                                    content: Message::Text(clean.into()),
192                                }) {
193                                    error!("Could not send to inbox: {error}");
194                                }
195                            }
196                        }
197                    }
198                }),
199                write_task: self.runtime.spawn(async move {
200                    // Iterate over messages received from the outbox
201                    // and write them to the socket.
202                    while let Ok(out) = outbox_receiver.recv() {
203                        match out.content {
204                            Message::Text(text) => {
205                                if let Err(err) =
206                                    write_socket.write_all((text + "\r\n").as_bytes()).await
207                                {
208                                    if let Err(err) = write_events_sender.send(NetworkEvent::Error(
209                                        NetworkError::SocketWrite(err, out.to),
210                                    )) {
211                                        error!("Could not send error: {err}");
212                                    };
213
214                                    break;
215                                }
216                            }
217                            Message::Command(command) => {
218                                if let Err(err) = write_socket.write_all(command.as_slice()).await {
219                                    if let Err(err) = write_events_sender.send(NetworkEvent::Error(
220                                        NetworkError::SocketWrite(err, out.to),
221                                    )) {
222                                        error!("Could not send error: {err}");
223                                    };
224
225                                    break;
226                                }
227                            }
228                            Message::GMCP(payload) => {
229                                let mut seq = vec![IAC, SB, GMCP];
230
231                                seq.extend(payload.package.as_bytes());
232
233                                if let Some(subpackage) = payload.subpackage {
234                                    seq.push(b'.');
235                                    seq.extend(subpackage.as_bytes());
236                                }
237
238                                if let Some(data) = payload.data {
239                                    seq.push(b' ');
240                                    seq.extend(data.as_bytes());
241                                }
242
243                                seq.extend(vec![IAC, SE]);
244
245                                if let Err(err) = write_socket.write_all(seq.as_slice()).await {
246                                    if let Err(err) = write_events_sender.send(NetworkEvent::Error(
247                                        NetworkError::SocketWrite(err, out.to),
248                                    )) {
249                                        error!("Could not send error: {err}");
250                                    };
251
252                                    break;
253                                }
254                            }
255                        }
256                    }
257                }),
258            },
259        );
260
261        if let Err(err) = self.events.sender.send(NetworkEvent::Connected(id)) {
262            error!("Could not send connected event: {err}");
263        }
264    }
265
266    // Remove a client from the server.
267    pub(crate) fn remove_client(&self, id: &ClientId) {
268        self.clients.remove(id);
269
270        info!("Client disconnected: {id:?}");
271
272        if let Err(err) = self.events.sender.send(NetworkEvent::Disconnected(*id)) {
273            error!("Could not send event: {err}");
274        }
275    }
276
277    /// Send a message to a client's outbox.
278    pub(crate) fn send(&self, out: &Outbox) {
279        match &out.content {
280            Message::Text(text) => {
281                if let Some(client) = self.clients.get(&out.to) {
282                    if let Err(err) = client.outbox.sender.send(Outbox {
283                        to: out.to,
284                        content: Message::Text(text.clone()),
285                    }) {
286                        error!("Could not send message: {err}");
287                    }
288                }
289            }
290            Message::Command(command) => {
291                if let Some(client) = self.clients.get(&out.to) {
292                    if let Err(err) = client.outbox.sender.send(Outbox {
293                        to: out.to,
294                        content: Message::Command(command.clone()),
295                    }) {
296                        error!("Could not send message: {err}");
297                    }
298                }
299            }
300            Message::GMCP(payload) => {
301                if let Some(client) = self.clients.get(&out.to) {
302                    if let Err(err) = client.outbox.sender.send(Outbox {
303                        to: out.to,
304                        content: Message::GMCP(payload.clone()),
305                    }) {
306                        error!("Could not send message: {err}");
307                    }
308                }
309            }
310        }
311    }
312}