1use std::sync::Arc;
2
3use bevy::prelude::*;
4use dashmap::DashMap;
5use tokio::{
6 io::{AsyncReadExt, AsyncWriteExt},
7 net::{TcpListener, ToSocketAddrs},
8 runtime::{Builder, Runtime},
9 task::JoinHandle,
10};
11use uuid::Uuid;
12
13use crate::{
14 channel::Channel,
15 errors::NetworkError,
16 events::{Inbox, IncomingConnection, Message, NetworkEvent, Outbox},
17 telnet::*,
18};
19
20#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]
22pub struct ClientId(Uuid);
23
24impl ClientId {
25 pub fn new() -> Self {
26 Self(Uuid::new_v4())
27 }
28}
29
30impl Default for ClientId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36struct Client {
37 outbox: Channel<Outbox>,
38 #[allow(dead_code)]
39 read_task: JoinHandle<()>,
40 #[allow(dead_code)]
41 write_task: JoinHandle<()>,
42}
43
44#[derive(Resource)]
45pub struct Server {
46 runtime: Runtime,
47 clients: Arc<DashMap<ClientId, Client>>,
48 pub(crate) incoming: Channel<IncomingConnection>,
50 pub(crate) lost: Channel<ClientId>,
52 pub(crate) events: Channel<NetworkEvent>,
54 pub(crate) inbox: Channel<Inbox>,
56}
57
58impl Server {
59 pub(crate) fn new() -> Self {
60 Self {
61 runtime: Builder::new_multi_thread()
62 .enable_io()
63 .build()
64 .expect("Could not build runtime"),
65 incoming: Channel::new(),
66 clients: Arc::new(DashMap::new()),
67 lost: Channel::new(),
68 events: Channel::new(),
69 inbox: Channel::new(),
70 }
71 }
72
73 pub fn listen(&self, address: impl ToSocketAddrs + Send + 'static) {
76 let events = self.events.sender.clone();
77 let incoming = self.incoming.sender.clone();
78
79 self.runtime.spawn(async move {
81 let listener = match TcpListener::bind(address).await {
83 Ok(listener) => listener,
84 Err(err) => {
85 if let Err(error) = events.send(NetworkEvent::Error(NetworkError::Listen(err)))
86 {
87 error!("Could not send error: {error}");
88 };
89
90 return;
91 }
92 };
93
94 info!("Listening");
95
96 loop {
97 match listener.accept().await {
99 Ok((socket, address)) => {
102 info!("Accepted connection from {address}");
103
104 if let Err(err) = incoming.send(IncomingConnection { socket }) {
105 error!("Failed to send incoming connection: {err}");
106 }
107 }
108 Err(err) => {
109 if let Err(err) =
110 events.send(NetworkEvent::Error(NetworkError::Accept(err)))
111 {
112 error!("Could not send error: {err}");
113 };
114 }
115 }
116 }
117 });
118 }
119
120 pub fn disconnect(&self, client_id: &ClientId) {
122 self.remove_client(client_id);
123 }
124
125 pub(crate) fn setup_client(&self, connection: IncomingConnection) {
126 let (mut read_socket, mut write_socket) = connection.socket.into_split();
127
128 let id = ClientId::new();
129 let outbox: Channel<Outbox> = Channel::new();
130
131 let read_events_sender = self.events.sender.clone();
132 let write_events_sender = self.events.sender.clone();
133 let inbox_sender = self.inbox.sender.clone();
134 let outbox_receiver = outbox.receiver.clone();
135 let lost_sender = self.lost.sender.clone();
136
137 self.clients.insert(
138 id,
139 Client {
140 outbox,
141 read_task: self.runtime.spawn(async move {
144 let max_packet_size = 1024;
146 let mut buffer = vec![0; max_packet_size];
147
148 info!("Starting read task for {id:?}");
149
150 loop {
151 let length = match read_socket.read(&mut buffer).await {
153 Ok(n) => n,
154 Err(err) => {
155 if let Err(err) = read_events_sender
156 .send(NetworkEvent::Error(NetworkError::SocketRead(err, id)))
157 {
158 error!("Could not send error: {err}");
159 };
160
161 break;
162 }
163 };
164
165 if length == 0 {
167 if let Err(err) = lost_sender.send(id) {
168 error!("Could not send lost connection: {err}");
169 }
170
171 break;
172 }
173
174 if buffer[0] == 255 {
175 if let Err(error) = inbox_sender.send(Inbox {
178 from: id,
179 content: Message::Command(buffer[..length].to_vec()),
180 }) {
181 error!("Could not send to inbox: {error}");
182 }
183 } else {
184 let clean = std::str::from_utf8(&buffer[..length]).unwrap_or("").trim();
186
187 if !clean.is_empty() {
189 if let Err(error) = inbox_sender.send(Inbox {
190 from: id,
191 content: Message::Text(clean.into()),
192 }) {
193 error!("Could not send to inbox: {error}");
194 }
195 }
196 }
197 }
198 }),
199 write_task: self.runtime.spawn(async move {
200 while let Ok(out) = outbox_receiver.recv() {
203 match out.content {
204 Message::Text(text) => {
205 if let Err(err) =
206 write_socket.write_all((text + "\r\n").as_bytes()).await
207 {
208 if let Err(err) = write_events_sender.send(NetworkEvent::Error(
209 NetworkError::SocketWrite(err, out.to),
210 )) {
211 error!("Could not send error: {err}");
212 };
213
214 break;
215 }
216 }
217 Message::Command(command) => {
218 if let Err(err) = write_socket.write_all(command.as_slice()).await {
219 if let Err(err) = write_events_sender.send(NetworkEvent::Error(
220 NetworkError::SocketWrite(err, out.to),
221 )) {
222 error!("Could not send error: {err}");
223 };
224
225 break;
226 }
227 }
228 Message::GMCP(payload) => {
229 let mut seq = vec![IAC, SB, GMCP];
230
231 seq.extend(payload.package.as_bytes());
232
233 if let Some(subpackage) = payload.subpackage {
234 seq.push(b'.');
235 seq.extend(subpackage.as_bytes());
236 }
237
238 if let Some(data) = payload.data {
239 seq.push(b' ');
240 seq.extend(data.as_bytes());
241 }
242
243 seq.extend(vec![IAC, SE]);
244
245 if let Err(err) = write_socket.write_all(seq.as_slice()).await {
246 if let Err(err) = write_events_sender.send(NetworkEvent::Error(
247 NetworkError::SocketWrite(err, out.to),
248 )) {
249 error!("Could not send error: {err}");
250 };
251
252 break;
253 }
254 }
255 }
256 }
257 }),
258 },
259 );
260
261 if let Err(err) = self.events.sender.send(NetworkEvent::Connected(id)) {
262 error!("Could not send connected event: {err}");
263 }
264 }
265
266 pub(crate) fn remove_client(&self, id: &ClientId) {
268 self.clients.remove(id);
269
270 info!("Client disconnected: {id:?}");
271
272 if let Err(err) = self.events.sender.send(NetworkEvent::Disconnected(*id)) {
273 error!("Could not send event: {err}");
274 }
275 }
276
277 pub(crate) fn send(&self, out: &Outbox) {
279 match &out.content {
280 Message::Text(text) => {
281 if let Some(client) = self.clients.get(&out.to) {
282 if let Err(err) = client.outbox.sender.send(Outbox {
283 to: out.to,
284 content: Message::Text(text.clone()),
285 }) {
286 error!("Could not send message: {err}");
287 }
288 }
289 }
290 Message::Command(command) => {
291 if let Some(client) = self.clients.get(&out.to) {
292 if let Err(err) = client.outbox.sender.send(Outbox {
293 to: out.to,
294 content: Message::Command(command.clone()),
295 }) {
296 error!("Could not send message: {err}");
297 }
298 }
299 }
300 Message::GMCP(payload) => {
301 if let Some(client) = self.clients.get(&out.to) {
302 if let Err(err) = client.outbox.sender.send(Outbox {
303 to: out.to,
304 content: Message::GMCP(payload.clone()),
305 }) {
306 error!("Could not send message: {err}");
307 }
308 }
309 }
310 }
311 }
312}