file_transfer_system/
server.rs

1use tokio::{
2    io::AsyncReadExt,
3    net::{TcpListener, TcpStream},
4    sync::{Mutex, Notify},
5};
6use std::{
7    net::{IpAddr, SocketAddr},
8    sync::Arc,
9};
10use bincode;
11use crate::{
12    file_transfer::{Connection, FileTransferProtocol, TransferError},
13    network::Request,
14};
15
16/// Represents a file server that listens for incoming connections and handles file transfer requests.
17#[derive(Clone)]
18pub struct Server {
19    /// Indicates if the server is currently running.
20    pub is_server_running: Arc<Mutex<bool>>,
21    /// The IP address on which the server listens.
22    pub ip: IpAddr,
23    /// The port on which the server listens.
24    pub port: u16,
25    /// The path to the directory where files are stored.
26    pub path: String,
27    /// Buffer size for file transfer operations.
28    pub buffer_size: u64,
29    /// Notification signal for stopping the server.
30    pub stop_signal: Arc<Notify>,
31}
32
33impl Server {
34    /// Creates a new instance of the `Server`.
35    ///
36    /// # Parameters
37    ///
38    /// - `ip`: IP address on which the server will listen.
39    /// - `port`: Port on which the server will listen.
40    /// - `path`: Directory path for file storage and retrieval.
41    /// - `buffer_size`: Size of the buffer used for file transfers.
42    pub fn new(ip: IpAddr, port: u16, path: &str, buffer_size: u64, stop_signal: Arc<Notify>) -> Self {
43        let is_server_running = Arc::new(Mutex::new(false));
44        Self {
45            is_server_running,
46            ip,
47            port,
48            path: path.to_owned(),
49            buffer_size,
50            stop_signal,
51        }
52    }
53
54    /// Starts the server, accepting and handling incoming connections.
55    pub async fn start_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
56        let listener = TcpListener::bind(SocketAddr::new(self.ip.to_owned(), self.port)).await?;
57        println!("Server running on {}", self.ip);
58
59        loop {
60            tokio::select! {
61                // Wait for an incoming connection
62                result = listener.accept() => {
63                    match result {
64                        Ok((socket, addr)) => {
65                            println!("New connection from: {}", addr);
66                            let stop_signal_clone = Arc::clone(&self.stop_signal);
67                            let self_clone = self.clone();
68                            tokio::spawn(async {
69                                if let Err(e) = self_clone.handle_request(socket, stop_signal_clone).await {
70                                    eprintln!("Error handling connection: {:?}", e);
71                                }
72                            });
73                        }
74                        Err(e) => {
75                            eprintln!("Failed to accept connection: {:?}", e);
76                        }
77                    }
78                },
79                _ = self.stop_signal.notified() => {
80                    println!("Stopping server...");
81                    break;
82                },
83            }
84        }
85        Ok(())
86    }
87
88    pub async fn stop_server(&self) {
89        self.stop_signal.notify_waiters();
90    }
91
92    /// Handles an incoming connection by reading and processing client requests.
93    async fn handle_request(
94        self,
95        mut socket: TcpStream,
96        shutdown_signal: Arc<Notify>,
97    ) -> Result<(), Box<dyn std::error::Error>> {
98        let mut buffer = [0; 1024];
99        loop {
100            tokio::select! {
101                _ = shutdown_signal.notified() => {
102                    println!("Shutdown signal received. Closing connection.");
103                    break;
104                }
105                bytes_read = socket.read(&mut buffer) => {
106                    match bytes_read {
107                        Ok(0) => {
108                            println!("Connection closed by client.");
109                            break;
110                        }
111                        Ok(bytes_read) => {
112                            let request: Request = match bincode::deserialize(&buffer[..bytes_read]) {
113                                Ok(req) => req,
114                                Err(e) => {
115                                    eprintln!("Failed to deserialize request in server: {:?}", e);
116                                    continue;
117                                }
118                            };
119                            self.match_request(&request, &mut socket).await.expect("failed to match request");
120                        }
121                        Err(e) => {
122                            eprintln!("Failed to read data from socket: {:?}", e);
123                            break;
124                        }
125                    }
126                }
127            }
128        }
129
130        Ok(())
131    }
132
133    /// Matches the incoming request to the appropriate action and executes it.
134    async fn match_request(&self, request: &Request, stream: &mut TcpStream) -> Result<(), TransferError> {
135        match request {
136            Request::Get(path) => {
137                FileTransferProtocol::new(path, 64 * 1024)
138                    .init_send(&mut Connection { stream })
139                    .await?;
140            }
141            Request::Upload => {
142                let ftp = FileTransferProtocol::new(&self.path, self.buffer_size);
143                ftp.receive(&mut Connection{ stream }).await?;
144            }
145        }
146        Ok(())
147    }
148}