file_transfer_system/server.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
//! Contains logic for listening for incomming connections
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, sync::{Mutex, Notify}};
use std::{net::{IpAddr, SocketAddr}, path::{Path, PathBuf}};
use bincode;
use crate::{file_transfer::{Connection, FileTransferProtocol, TransferError}, network::Request};
use std::sync::Arc;
#[derive(Clone)]
pub struct Server{
pub is_server_running: Arc<Mutex<bool>>,
pub ip: IpAddr,
pub port: u16,
pub path: PathBuf,
pub buffer_size: u64,
stop_signal: Arc<Notify>,
}
impl Server{
/// Creates new instance of server
pub fn new(ip: IpAddr, port: u16, path: &Path, buffer_size: u64) -> Self{
let stop_signal = Arc::new(Notify::new());
let is_server_running = Arc::new(Mutex::new(false));
Self {
is_server_running,
ip,
port,
path: path.to_owned(),
buffer_size,
stop_signal,
}
}
/// Starts server by listening for incomming connections
pub async fn start_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(SocketAddr::new(self.ip.to_owned(), self.port)).await?;
println!("Server running on {}", self.ip);
loop {
tokio::select! {
// Wait for an incoming connection
result = listener.accept() => {
match result {
Ok((socket, addr)) => {
println!("New connection from: {}", addr);
let stop_signal_clone = Arc::clone(&self.stop_signal);
let self_clone = self.clone();
// handle connection in another thread
tokio::spawn(async {
if let Err(e) = self_clone.handle_request(socket, stop_signal_clone).await {
eprintln!("Error handling connection: {:?}", e);
}
});
}
Err(e) => {
eprintln!("Failed to accept connection: {:?}", e);
}
}
},
// Wait for the stop signal
_ = self.stop_signal.notified() => {
println!("Stopping server...");
break;
},
}
}
println!("loop broken");
Ok(())
}
/// handles connections and reads the data transmited through the socket
async fn handle_request(
self,
mut socket: TcpStream,
shutdown_signal: Arc<Notify>
) -> Result<(), Box<dyn std::error::Error>> {
let mut buffer = [0; 1024];
loop {
tokio::select! {
// Check if we have received a shutdown signal
_ = shutdown_signal.notified() => {
println!("Shutdown signal received. Closing connection.");
break;
}
// Read data from the socket
bytes_read = socket.read(&mut buffer) => {
match bytes_read {
Ok(0) => {
// Connection was closed
println!("Connection closed by client.");
break;
}
Ok(bytes_read) => {
// Convert the buffer to a string (assuming UTF-8 encoded data)
let request: Request = match bincode::deserialize(&buffer[..bytes_read]) {
Ok(req) => req,
Err(e) => {
eprintln!("Failed to deserialize request: {:?}", e);
continue;
}
};
// Handle the request and generate a response
let response = self.match_request(&request, &mut socket).await;
// Serialize response
let response = bincode::serialize(&response)?;
// Send the response back to the client
socket.write_all(&response).await?;
}
Err(e) => {
eprintln!("Failed to read data from socket: {:?}", e);
break;
}
}
}
}
}
Ok(())
}
/// handle the request depeding on what the request is asking for
async fn match_request(&self, request: &Request, stream: &mut TcpStream) -> Result<(), TransferError> {
match request {
// client requests to GET certain files and server sends them
Request::Get(path) => { FileTransferProtocol::new(path, 64 * 1024).init_send(&mut Connection{stream}).await?; },
// handles files/dir sent by client
Request::Upload(path_type) => { FileTransferProtocol::new(&self.path, self.buffer_size).init_receive(&mut Connection{stream}, &path_type).await?; }
}
Ok(())
}
}