phoenix_rec/
server.rs

1use crate::client::PORT;
2use crate::{Data, SERVER};
3use bincode::deserialize;
4use lz4_compression::prelude::compress;
5use std::io::{Read, Write};
6use std::net::{Shutdown, TcpListener, TcpStream};
7use std::sync::atomic::AtomicBool;
8use std::sync::Mutex;
9use std::thread;
10use std::time::Duration;
11
12static DATA_QUEUE: Mutex<Vec<Data>> = Mutex::new(Vec::new());
13static STOP_SERVER: AtomicBool = AtomicBool::new(false);
14
15#[macro_export]
16macro_rules! debug {
17    ($($arg:tt)*) => {
18        if cfg!(debug_assertions) {
19            println!($($arg)*);
20        }
21    };
22}
23
24fn handle_client(mut stream: TcpStream) {
25    let mut data = [0 as u8; 50]; // using 50 byte buffer
26    loop {
27        stream
28            .set_read_timeout(Option::from(Duration::from_micros(10)))
29            .unwrap();
30        match stream.read(&mut data) {
31            Ok(n) => {
32                if n == 0 {
33                    break;
34                }
35                debug!("loop");
36                // print received data
37                let text = String::from_utf8_lossy(&data)
38                    .to_string()
39                    .trim_matches(char::from(0))
40                    .to_string();
41                debug!("Received data: {}, len: {}", text, text.len());
42                if text == "hello" {
43                    stream.write(b"hello").unwrap();
44                }
45
46                if text.contains("close") {
47                    println!("Terminating connection");
48                    stream.shutdown(Shutdown::Both).unwrap();
49                    SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
50                    break;
51                }
52            }
53            Err(_e) => {
54                if STOP_SERVER.load(std::sync::atomic::Ordering::SeqCst) {
55                    println!("Terminating connection");
56                    stream.write(&(0u32).to_le_bytes()).unwrap();
57                    stream.shutdown(Shutdown::Both).unwrap();
58                    SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
59                    break;
60                }
61                if DATA_QUEUE.lock().unwrap().is_empty() {
62                    continue;
63                }
64                // send all data from the queue
65                let mut queue = DATA_QUEUE.lock().unwrap().clone();
66                debug!("Queue: {:?}", queue);
67                let data = bincode::serialize(&queue);
68                let data_type: Vec<Data> =
69                    deserialize(&bincode::serialize(&queue).unwrap()).unwrap();
70                debug!("Data type: {:?}", data_type);
71                debug!("Sending data: {:?}", queue);
72                match data {
73                    Ok(d) => {
74                        debug!("length before compression: {}", d.len());
75                        let d = compress(&d);
76                        debug!("length after compression: {}", d.len());
77                        // first send the length of the data
78                        stream.write(&(d.len() as u32).to_le_bytes()).unwrap();
79                        debug!("len: {:?}", (d.len() as u32).to_le_bytes());
80                        stream.write(&d).unwrap();
81                        debug!("Sent data: {:?}", d);
82                        queue.clear();
83                        DATA_QUEUE.lock().unwrap().clear();
84                    }
85                    Err(e) => {
86                        debug!("Failed to serialize data: {}", e);
87                    }
88                }
89            }
90        };
91    }
92}
93
94pub fn create_server() {
95    if SERVER.load(std::sync::atomic::Ordering::SeqCst) {
96        println!("Server already running");
97        return;
98    }
99    SERVER.store(true, std::sync::atomic::Ordering::SeqCst);
100    STOP_SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
101    let listener = TcpListener::bind(format!("0.0.0.0:{}", PORT)).unwrap();
102    // accept connections and process them, spawning a new thread for each one
103    println!("Server listening on port 3333");
104    for stream in listener.incoming() {
105        println!("Incoming connection");
106        match stream {
107            Ok(stream) => {
108                println!("New connection: {}", stream.peer_addr().unwrap());
109                thread::spawn(move || {
110                    // connection succeeded
111                    handle_client(stream)
112                });
113                println!("Connection handled");
114                break;
115            }
116            Err(e) => {
117                println!("Error: {}", e);
118                /* connection failed */
119            }
120        }
121    }
122    println!("Can't accept any more connections");
123    // close the socket server
124    drop(listener);
125}
126
127pub fn add_data(data: Data) {
128    debug!("Adding data: {:?}", data);
129    DATA_QUEUE.lock().unwrap().push(data);
130}
131
132pub fn stop_server() {
133    SERVER.store(false, std::sync::atomic::Ordering::SeqCst);
134    STOP_SERVER.store(true, std::sync::atomic::Ordering::SeqCst);
135}