sonet_rs/
client.rs

1use std::{sync::Arc, net::SocketAddr};
2
3use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::Mutex};
4
5use crate::{serializer::Codec, buffer::read::SonetReadBuf, packet::{Packet, PacketRegistry}};
6
7pub struct Client {
8    pub connection: Connection,
9    pub packet_handlers: Arc<Mutex<Vec<Box<dyn FnMut(&Box<dyn Packet + Send + Sync>, &mut Connection) + Send>>>>
10}
11
12pub struct Connection {
13    pub socket: TcpStream,
14    pub codec: Arc<Mutex<Codec>>
15}
16
17impl Client {
18    pub async fn new(port: i32, registry: PacketRegistry) -> Client {
19        Client {
20            connection: Connection {
21                socket: TcpStream::connect(format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap()).await.unwrap(),
22                codec: Arc::new(Mutex::new(Codec::new(registry)))
23            },
24            packet_handlers: Arc::new(Mutex::new(vec![]))
25        }
26    }
27
28    pub async fn handle(connection: &mut Connection, packet_handlers: Arc<Mutex<Vec<Box<dyn FnMut(&Box<dyn Packet + Send + Sync>, &mut Connection) + Send>>>>) -> Result<(), std::io::Error> {
29        loop {
30            let packet = connection.retrieve().await?;
31
32            for handler in packet_handlers.lock().await.iter_mut() {
33                handler(&packet, connection);
34            }
35        }
36    }
37
38    pub async fn initialize(&'static mut self) -> Result<(), std::io::Error> {
39        tokio::spawn(Self::handle(&mut self.connection, self.packet_handlers.clone()));
40
41        Ok(())
42    }
43
44    pub fn add_handler<T>(&mut self, closure: T) where T: FnMut(&Box<dyn Packet + Send + Sync>, &mut Connection) + Send + 'static {
45        futures::executor::block_on(async {
46            self.packet_handlers.lock().await.push(Box::new(closure));
47        })
48    }
49}
50
51impl Connection {
52    pub async fn retrieve(&mut self) -> Result<Box<dyn Packet + Send + Sync>, std::io::Error> {
53
54        self.socket.readable().await?;
55
56        // Header Buffer
57        let mut header_buffer = [0; 4];
58
59        // Read Header
60        self.socket.read(&mut header_buffer).await.unwrap();
61
62        // Body Size
63        let body_size = i32::from_be_bytes(header_buffer);
64
65        // The full body buffer
66        let mut full_body = Vec::new();
67
68        // Temporary Read Buffer
69        let mut body_buffer = Vec::new();
70
71        // Size Read
72        let mut read = 0;
73
74        loop {
75            // Read Body
76            match self.socket.read_buf(&mut body_buffer).await {
77                Ok(_) => {
78                    // Add all read data to the full body buffer
79                    for byte in body_buffer.clone() {
80                        full_body.push(byte);
81                        read += 1;
82                    }
83
84                    // Clear Temp Buffer
85                    body_buffer.clear();
86
87                    // End if fully read
88                    if read >= body_size {
89                        break;
90                    }
91                }
92                Err(e) => {
93                    panic!("Error: {}", e);
94                }
95            };
96        }
97
98        let safe_codec = self.codec.lock().await;
99
100        let mut buffer = SonetReadBuf::new(full_body);
101
102        Ok(safe_codec.deserialize(&mut buffer))
103    }
104
105    pub fn push_packet(&mut self, packet: Box<dyn Packet>) {
106        futures::executor::block_on(async {
107            self.socket.write_all(self.codec.lock().await
108                .serialize(&packet).data
109                .as_mut()).await.unwrap();
110        });
111    }
112}