file_transfer_system/
server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//! Contains logic for listening for incoming connections and handling file transfer requests.

use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
    sync::{Mutex, Notify},
};
use std::{
    net::{IpAddr, SocketAddr},
    path::{Path, PathBuf},
    sync::Arc,
};
use bincode;
use crate::{
    file_transfer::{Connection, FileTransferProtocol, TransferError},
    network::Request,
};

/// Represents a file server that listens for incoming connections and handles file transfer requests.
#[derive(Clone)]
pub struct Server {
    /// Indicates if the server is currently running.
    pub is_server_running: Arc<Mutex<bool>>,
    /// The IP address on which the server listens.
    pub ip: IpAddr,
    /// The port on which the server listens.
    pub port: u16,
    /// The path to the directory where files are stored.
    pub path: PathBuf,
    /// Buffer size for file transfer operations.
    pub buffer_size: u64,
    /// Notification signal for stopping the server.
    stop_signal: Arc<Notify>,
}

impl Server {
    /// Creates a new instance of the `Server`.
    ///
    /// # Parameters
    ///
    /// - `ip`: IP address on which the server will listen.
    /// - `port`: Port on which the server will listen.
    /// - `path`: Directory path for file storage and retrieval.
    /// - `buffer_size`: Size of the buffer used for file transfers.
    pub fn new(ip: IpAddr, port: u16, path: &Path, buffer_size: u64) -> Self {
        let stop_signal = Arc::new(Notify::new());
        let is_server_running = Arc::new(Mutex::new(false));
        Self {
            is_server_running,
            ip,
            port,
            path: path.to_owned(),
            buffer_size,
            stop_signal,
        }
    }

    /// Starts the server, accepting and handling incoming connections.
    pub async fn start_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        let listener = TcpListener::bind(SocketAddr::new(self.ip.to_owned(), self.port)).await?;
        println!("Server running on {}", self.ip);

        loop {
            tokio::select! {
                // Wait for an incoming connection
                result = listener.accept() => {
                    match result {
                        Ok((socket, addr)) => {
                            println!("New connection from: {}", addr);
                            let stop_signal_clone = Arc::clone(&self.stop_signal);
                            let self_clone = self.clone();
                            tokio::spawn(async {
                                if let Err(e) = self_clone.handle_request(socket, stop_signal_clone).await {
                                    eprintln!("Error handling connection: {:?}", e);
                                }
                            });
                        }
                        Err(e) => {
                            eprintln!("Failed to accept connection: {:?}", e);
                        }
                    }
                },
                _ = self.stop_signal.notified() => {
                    println!("Stopping server...");
                    break;
                },
            }
        }
        Ok(())
    }

    /// Handles an incoming connection by reading and processing client requests.
    async fn handle_request(
        self,
        mut socket: TcpStream,
        shutdown_signal: Arc<Notify>,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let mut buffer = [0; 1024];
        loop {
            tokio::select! {
                _ = shutdown_signal.notified() => {
                    println!("Shutdown signal received. Closing connection.");
                    break;
                }
                bytes_read = socket.read(&mut buffer) => {
                    match bytes_read {
                        Ok(0) => {
                            println!("Connection closed by client.");
                            break;
                        }
                        Ok(bytes_read) => {
                            let request: Request = match bincode::deserialize(&buffer[..bytes_read]) {
                                Ok(req) => req,
                                Err(e) => {
                                    eprintln!("Failed to deserialize request: {:?}", e);
                                    continue;
                                }
                            };
                            let response = self.match_request(&request, &mut socket).await;

                            let response = bincode::serialize(&response)?;
                            socket.write_all(&response).await?;
                        }
                        Err(e) => {
                            eprintln!("Failed to read data from socket: {:?}", e);
                            break;
                        }
                    }
                }
            }
        }

        Ok(())
    }

    /// Matches the incoming request to the appropriate action and executes it.
    async fn match_request(&self, request: &Request, stream: &mut TcpStream) -> Result<(), TransferError> {
        match request {
            Request::Get(path) => {
                FileTransferProtocol::new(path, 64 * 1024)
                    .init_send(&mut Connection { stream })
                    .await?;
            }
            Request::Upload(path_type) => {
                FileTransferProtocol::new(&self.path, self.buffer_size)
                    .init_receive(&mut Connection { stream }, path_type)
                    .await?;
            }
        }
        Ok(())
    }
}