file_transfer_system/
server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//! Contains logic for listening for incomming connections
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, sync::{Mutex, Notify}};
use std::{net::{IpAddr, SocketAddr}, path::{Path, PathBuf}};
use bincode;
use crate::{file_transfer::{Connection, FileTransferProtocol, TransferError}, network::Request};
use std::sync::Arc;

#[derive(Clone)]
pub struct Server{
    pub is_server_running: Arc<Mutex<bool>>,
    pub ip: IpAddr,
    pub port: u16,
    pub path: PathBuf,
    pub buffer_size: u64, 
    stop_signal: Arc<Notify>,
}
impl Server{
    /// Creates new instance of server
    pub fn new(ip: IpAddr, port: u16, path: &Path, buffer_size: u64) -> Self{
        let stop_signal = Arc::new(Notify::new());
        let is_server_running = Arc::new(Mutex::new(false));
        Self {
            is_server_running,
            ip,
            port,
            path: path.to_owned(),
            buffer_size,
            stop_signal,
        }
    }

    /// Starts server by listening for incomming connections
    pub async fn start_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        let listener = TcpListener::bind(SocketAddr::new(self.ip.to_owned(), self.port)).await?;
        println!("Server running on {}", self.ip);

        loop {
            tokio::select! {
                // Wait for an incoming connection
                result = listener.accept() => {
                    match result {
                        Ok((socket, addr)) => {
                            println!("New connection from: {}", addr);
                            let stop_signal_clone = Arc::clone(&self.stop_signal);
                            let self_clone = self.clone();
                            // handle connection in another thread
                            tokio::spawn(async {
                                if let Err(e) = self_clone.handle_request(socket, stop_signal_clone).await {
                                    eprintln!("Error handling connection: {:?}", e);
                                }
                            });
                        }
                        Err(e) => {
                            eprintln!("Failed to accept connection: {:?}", e);
                        }
                    }
                },

                // Wait for the stop signal
                _ = self.stop_signal.notified() => {
                    println!("Stopping server...");
                    break;
                },
            }
        }
        println!("loop broken");
        Ok(())
    }

    /// handles connections and reads the data transmited through the socket
    async fn handle_request(
        self,
        mut socket: TcpStream,
        shutdown_signal: Arc<Notify>
    ) -> Result<(), Box<dyn std::error::Error>> {
        let mut buffer = [0; 1024];
        loop {
            tokio::select! {
                // Check if we have received a shutdown signal
                _ = shutdown_signal.notified() => {
                    println!("Shutdown signal received. Closing connection.");
                    break;
                }

                // Read data from the socket
                bytes_read = socket.read(&mut buffer) => {
                    match bytes_read {
                        Ok(0) => {
                            // Connection was closed
                            println!("Connection closed by client.");
                            break;
                        }
                        Ok(bytes_read) => {
                            // Convert the buffer to a string (assuming UTF-8 encoded data)
                            let request: Request = match bincode::deserialize(&buffer[..bytes_read]) {
                                Ok(req) => req,
                                Err(e) => {
                                    eprintln!("Failed to deserialize request: {:?}", e);
                                    continue;
                                }
                            };
                            // Handle the request and generate a response
                            let response = self.match_request(&request, &mut socket).await;

                            // Serialize response
                            let response = bincode::serialize(&response)?;

                            // Send the response back to the client
                            socket.write_all(&response).await?;
                        }
                        Err(e) => {
                            eprintln!("Failed to read data from socket: {:?}", e);
                            break;
                        }
                    }
                }
            }
        }

        Ok(())
    }

    /// handle the request depeding on what the request is asking for
    async fn match_request(&self, request: &Request, stream: &mut TcpStream) -> Result<(), TransferError> {
        match request {
            // client requests to GET certain files and server sends them
            Request::Get(path) => { FileTransferProtocol::new(path, 64 * 1024).init_send(&mut Connection{stream}).await?; },
            // handles files/dir sent by client
            Request::Upload(path_type) => { FileTransferProtocol::new(&self.path, self.buffer_size).init_receive(&mut Connection{stream}, &path_type).await?; }
        }
        Ok(())
    }
}