file_transfer_system/
server.rs1use tokio::{
2 io::AsyncReadExt,
3 net::{TcpListener, TcpStream},
4 sync::{Mutex, Notify},
5};
6use std::{
7 net::{IpAddr, SocketAddr},
8 sync::Arc,
9};
10use bincode;
11use crate::{
12 file_transfer::{Connection, FileTransferProtocol, TransferError},
13 network::Request,
14};
15
16#[derive(Clone)]
18pub struct Server {
19 pub is_server_running: Arc<Mutex<bool>>,
21 pub ip: IpAddr,
23 pub port: u16,
25 pub path: String,
27 pub buffer_size: u64,
29 pub stop_signal: Arc<Notify>,
31}
32
33impl Server {
34 pub fn new(ip: IpAddr, port: u16, path: &str, buffer_size: u64, stop_signal: Arc<Notify>) -> Self {
43 let is_server_running = Arc::new(Mutex::new(false));
44 Self {
45 is_server_running,
46 ip,
47 port,
48 path: path.to_owned(),
49 buffer_size,
50 stop_signal,
51 }
52 }
53
54 pub async fn start_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
56 let listener = TcpListener::bind(SocketAddr::new(self.ip.to_owned(), self.port)).await?;
57 println!("Server running on {}", self.ip);
58
59 loop {
60 tokio::select! {
61 result = listener.accept() => {
63 match result {
64 Ok((socket, addr)) => {
65 println!("New connection from: {}", addr);
66 let stop_signal_clone = Arc::clone(&self.stop_signal);
67 let self_clone = self.clone();
68 tokio::spawn(async {
69 if let Err(e) = self_clone.handle_request(socket, stop_signal_clone).await {
70 eprintln!("Error handling connection: {:?}", e);
71 }
72 });
73 }
74 Err(e) => {
75 eprintln!("Failed to accept connection: {:?}", e);
76 }
77 }
78 },
79 _ = self.stop_signal.notified() => {
80 println!("Stopping server...");
81 break;
82 },
83 }
84 }
85 Ok(())
86 }
87
88 pub async fn stop_server(&self) {
89 self.stop_signal.notify_waiters();
90 }
91
92 async fn handle_request(
94 self,
95 mut socket: TcpStream,
96 shutdown_signal: Arc<Notify>,
97 ) -> Result<(), Box<dyn std::error::Error>> {
98 let mut buffer = [0; 1024];
99 loop {
100 tokio::select! {
101 _ = shutdown_signal.notified() => {
102 println!("Shutdown signal received. Closing connection.");
103 break;
104 }
105 bytes_read = socket.read(&mut buffer) => {
106 match bytes_read {
107 Ok(0) => {
108 println!("Connection closed by client.");
109 break;
110 }
111 Ok(bytes_read) => {
112 let request: Request = match bincode::deserialize(&buffer[..bytes_read]) {
113 Ok(req) => req,
114 Err(e) => {
115 eprintln!("Failed to deserialize request in server: {:?}", e);
116 continue;
117 }
118 };
119 self.match_request(&request, &mut socket).await.expect("failed to match request");
120 }
121 Err(e) => {
122 eprintln!("Failed to read data from socket: {:?}", e);
123 break;
124 }
125 }
126 }
127 }
128 }
129
130 Ok(())
131 }
132
133 async fn match_request(&self, request: &Request, stream: &mut TcpStream) -> Result<(), TransferError> {
135 match request {
136 Request::Get(path) => {
137 FileTransferProtocol::new(path, 64 * 1024)
138 .init_send(&mut Connection { stream })
139 .await?;
140 }
141 Request::Upload => {
142 let ftp = FileTransferProtocol::new(&self.path, self.buffer_size);
143 ftp.receive(&mut Connection{ stream }).await?;
144 }
145 }
146 Ok(())
147 }
148}