1use crate::client::PORT;
2use crate::{Data, SERVER};
3use bincode::deserialize;
4use lz4_compression::prelude::compress;
5use std::io::{Read, Write};
6use std::net::{Shutdown, TcpListener, TcpStream};
7use std::sync::atomic::AtomicBool;
8use std::sync::Mutex;
9use std::thread;
10use std::time::Duration;
11
12static DATA_QUEUE: Mutex<Vec<Data>> = Mutex::new(Vec::new());
13static STOP_SERVER: AtomicBool = AtomicBool::new(false);
14
15#[macro_export]
16macro_rules! debug {
17 ($($arg:tt)*) => {
18 if cfg!(debug_assertions) {
19 println!($($arg)*);
20 }
21 };
22}
23
24fn handle_client(mut stream: TcpStream) {
25 let mut data = [0 as u8; 50]; loop {
27 stream
28 .set_read_timeout(Option::from(Duration::from_micros(10)))
29 .unwrap();
30 match stream.read(&mut data) {
31 Ok(n) => {
32 if n == 0 {
33 break;
34 }
35 debug!("loop");
36 let text = String::from_utf8_lossy(&data)
38 .to_string()
39 .trim_matches(char::from(0))
40 .to_string();
41 debug!("Received data: {}, len: {}", text, text.len());
42 if text == "hello" {
43 stream.write(b"hello").unwrap();
44 }
45
46 if text.contains("close") {
47 println!("Terminating connection");
48 stream.shutdown(Shutdown::Both).unwrap();
49 SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
50 break;
51 }
52 }
53 Err(_e) => {
54 if STOP_SERVER.load(std::sync::atomic::Ordering::SeqCst) {
55 println!("Terminating connection");
56 stream.write(&(0u32).to_le_bytes()).unwrap();
57 stream.shutdown(Shutdown::Both).unwrap();
58 SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
59 break;
60 }
61 if DATA_QUEUE.lock().unwrap().is_empty() {
62 continue;
63 }
64 let mut queue = DATA_QUEUE.lock().unwrap().clone();
66 debug!("Queue: {:?}", queue);
67 let data = bincode::serialize(&queue);
68 let data_type: Vec<Data> =
69 deserialize(&bincode::serialize(&queue).unwrap()).unwrap();
70 debug!("Data type: {:?}", data_type);
71 debug!("Sending data: {:?}", queue);
72 match data {
73 Ok(d) => {
74 debug!("length before compression: {}", d.len());
75 let d = compress(&d);
76 debug!("length after compression: {}", d.len());
77 stream.write(&(d.len() as u32).to_le_bytes()).unwrap();
79 debug!("len: {:?}", (d.len() as u32).to_le_bytes());
80 stream.write(&d).unwrap();
81 debug!("Sent data: {:?}", d);
82 queue.clear();
83 DATA_QUEUE.lock().unwrap().clear();
84 }
85 Err(e) => {
86 debug!("Failed to serialize data: {}", e);
87 }
88 }
89 }
90 };
91 }
92}
93
94pub fn create_server() {
95 if SERVER.load(std::sync::atomic::Ordering::SeqCst) {
96 println!("Server already running");
97 return;
98 }
99 SERVER.store(true, std::sync::atomic::Ordering::SeqCst);
100 STOP_SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
101 let listener = TcpListener::bind(format!("0.0.0.0:{}", PORT)).unwrap();
102 println!("Server listening on port 3333");
104 for stream in listener.incoming() {
105 println!("Incoming connection");
106 match stream {
107 Ok(stream) => {
108 println!("New connection: {}", stream.peer_addr().unwrap());
109 thread::spawn(move || {
110 handle_client(stream)
112 });
113 println!("Connection handled");
114 break;
115 }
116 Err(e) => {
117 println!("Error: {}", e);
118 }
120 }
121 }
122 println!("Can't accept any more connections");
123 drop(listener);
125}
126
127pub fn add_data(data: Data) {
128 debug!("Adding data: {:?}", data);
129 DATA_QUEUE.lock().unwrap().push(data);
130}
131
132pub fn stop_server() {
133 SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
134 STOP_SERVER.store(true, std::sync::atomic::Ordering::SeqCst);
135}