1use std::{sync::Arc, net::SocketAddr};
2
3use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::Mutex};
4
5use crate::{serializer::Codec, buffer::read::SonetReadBuf, packet::{Packet, PacketRegistry}};
6
7pub struct Client {
8 pub connection: Connection,
9 pub packet_handlers: Arc<Mutex<Vec<Box<dyn FnMut(&Box<dyn Packet + Send + Sync>, &mut Connection) + Send>>>>
10}
11
12pub struct Connection {
13 pub socket: TcpStream,
14 pub codec: Arc<Mutex<Codec>>
15}
16
17impl Client {
18 pub async fn new(port: i32, registry: PacketRegistry) -> Client {
19 Client {
20 connection: Connection {
21 socket: TcpStream::connect(format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap()).await.unwrap(),
22 codec: Arc::new(Mutex::new(Codec::new(registry)))
23 },
24 packet_handlers: Arc::new(Mutex::new(vec![]))
25 }
26 }
27
28 pub async fn handle(connection: &mut Connection, packet_handlers: Arc<Mutex<Vec<Box<dyn FnMut(&Box<dyn Packet + Send + Sync>, &mut Connection) + Send>>>>) -> Result<(), std::io::Error> {
29 loop {
30 let packet = connection.retrieve().await?;
31
32 for handler in packet_handlers.lock().await.iter_mut() {
33 handler(&packet, connection);
34 }
35 }
36 }
37
38 pub async fn initialize(&'static mut self) -> Result<(), std::io::Error> {
39 tokio::spawn(Self::handle(&mut self.connection, self.packet_handlers.clone()));
40
41 Ok(())
42 }
43
44 pub fn add_handler<T>(&mut self, closure: T) where T: FnMut(&Box<dyn Packet + Send + Sync>, &mut Connection) + Send + 'static {
45 futures::executor::block_on(async {
46 self.packet_handlers.lock().await.push(Box::new(closure));
47 })
48 }
49}
50
51impl Connection {
52 pub async fn retrieve(&mut self) -> Result<Box<dyn Packet + Send + Sync>, std::io::Error> {
53
54 self.socket.readable().await?;
55
56 let mut header_buffer = [0; 4];
58
59 self.socket.read(&mut header_buffer).await.unwrap();
61
62 let body_size = i32::from_be_bytes(header_buffer);
64
65 let mut full_body = Vec::new();
67
68 let mut body_buffer = Vec::new();
70
71 let mut read = 0;
73
74 loop {
75 match self.socket.read_buf(&mut body_buffer).await {
77 Ok(_) => {
78 for byte in body_buffer.clone() {
80 full_body.push(byte);
81 read += 1;
82 }
83
84 body_buffer.clear();
86
87 if read >= body_size {
89 break;
90 }
91 }
92 Err(e) => {
93 panic!("Error: {}", e);
94 }
95 };
96 }
97
98 let safe_codec = self.codec.lock().await;
99
100 let mut buffer = SonetReadBuf::new(full_body);
101
102 Ok(safe_codec.deserialize(&mut buffer))
103 }
104
105 pub fn push_packet(&mut self, packet: Box<dyn Packet>) {
106 futures::executor::block_on(async {
107 self.socket.write_all(self.codec.lock().await
108 .serialize(&packet).data
109 .as_mut()).await.unwrap();
110 });
111 }
112}