file_transfer_system/server.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
use tokio::{
io::AsyncReadExt,
net::{TcpListener, TcpStream},
sync::{Mutex, Notify},
};
use std::{
net::{IpAddr, SocketAddr},
sync::Arc,
};
use bincode;
use crate::{
file_transfer::{Connection, FileTransferProtocol, TransferError},
network::Request,
};
/// Represents a file server that listens for incoming connections and handles file transfer requests.
#[derive(Clone)]
pub struct Server {
/// Indicates if the server is currently running.
pub is_server_running: Arc<Mutex<bool>>,
/// The IP address on which the server listens.
pub ip: IpAddr,
/// The port on which the server listens.
pub port: u16,
/// The path to the directory where files are stored.
pub path: String,
/// Buffer size for file transfer operations.
pub buffer_size: u64,
/// Notification signal for stopping the server.
pub stop_signal: Arc<Notify>,
}
impl Server {
/// Creates a new instance of the `Server`.
///
/// # Parameters
///
/// - `ip`: IP address on which the server will listen.
/// - `port`: Port on which the server will listen.
/// - `path`: Directory path for file storage and retrieval.
/// - `buffer_size`: Size of the buffer used for file transfers.
pub fn new(ip: IpAddr, port: u16, path: &str, buffer_size: u64, stop_signal: Arc<Notify>) -> Self {
let is_server_running = Arc::new(Mutex::new(false));
Self {
is_server_running,
ip,
port,
path: path.to_owned(),
buffer_size,
stop_signal,
}
}
/// Starts the server, accepting and handling incoming connections.
pub async fn start_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(SocketAddr::new(self.ip.to_owned(), self.port)).await?;
println!("Server running on {}", self.ip);
loop {
tokio::select! {
// Wait for an incoming connection
result = listener.accept() => {
match result {
Ok((socket, addr)) => {
println!("New connection from: {}", addr);
let stop_signal_clone = Arc::clone(&self.stop_signal);
let self_clone = self.clone();
tokio::spawn(async {
if let Err(e) = self_clone.handle_request(socket, stop_signal_clone).await {
eprintln!("Error handling connection: {:?}", e);
}
});
}
Err(e) => {
eprintln!("Failed to accept connection: {:?}", e);
}
}
},
_ = self.stop_signal.notified() => {
println!("Stopping server...");
break;
},
}
}
Ok(())
}
pub async fn stop_server(&self) {
self.stop_signal.notify_waiters();
}
/// Handles an incoming connection by reading and processing client requests.
async fn handle_request(
self,
mut socket: TcpStream,
shutdown_signal: Arc<Notify>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut buffer = [0; 1024];
loop {
tokio::select! {
_ = shutdown_signal.notified() => {
println!("Shutdown signal received. Closing connection.");
break;
}
bytes_read = socket.read(&mut buffer) => {
match bytes_read {
Ok(0) => {
println!("Connection closed by client.");
break;
}
Ok(bytes_read) => {
let request: Request = match bincode::deserialize(&buffer[..bytes_read]) {
Ok(req) => req,
Err(e) => {
eprintln!("Failed to deserialize request in server: {:?}", e);
continue;
}
};
self.match_request(&request, &mut socket).await.expect("failed to match request");
}
Err(e) => {
eprintln!("Failed to read data from socket: {:?}", e);
break;
}
}
}
}
}
Ok(())
}
/// Matches the incoming request to the appropriate action and executes it.
async fn match_request(&self, request: &Request, stream: &mut TcpStream) -> Result<(), TransferError> {
match request {
Request::Get(path) => {
FileTransferProtocol::new(path, 64 * 1024)
.init_send(&mut Connection { stream })
.await?;
}
Request::Upload => {
let ftp = FileTransferProtocol::new(&self.path, self.buffer_size);
ftp.receive(&mut Connection{ stream }).await?;
}
}
Ok(())
}
}