file_transfer_system/
file_transfer.rs1use tokio::io::{AsyncReadExt, AsyncWriteExt};
2use tokio::net::TcpStream;
3use std::path::{Path, PathBuf};
4use std::io::Error as IoError;
5use serde::{Serialize, Deserialize};
6use futures::future::BoxFuture;
7use crate::compression::{start_compressing, unzip_file};
8
9#[derive(Debug, Serialize, Deserialize)]
11pub enum TransferError {
12 IoError(String),
14 ConnectionClosed,
16 FileNotFound,
18 FileCorrupted,
20 ChunkError,
22}
23
24impl From<IoError> for TransferError {
25 fn from(err: IoError) -> TransferError {
26 TransferError::IoError(err.to_string())
27 }
28}
29
30#[derive(Serialize, Deserialize, Debug, Clone)]
32pub enum PathType {
33 File,
35 Directory,
37}
38
39#[derive(Serialize, Deserialize, Debug)]
40pub struct FileMetadata {
41 pub name: String, pub size: u64, pub checksum: Option<String>, }
45
46impl FileMetadata {
47 pub fn new(path: &Path, size: u64) -> FileMetadata {
48 let name = path.file_name().unwrap().to_str().unwrap().to_owned();
49 FileMetadata {
50 name,
51 size,
52 checksum: None, }
54 }
55}
56
57pub struct Connection<'a> {
59 pub stream: &'a mut TcpStream,
60}
61
62impl<'a> Connection<'a> {
63 pub async fn write(&mut self, data: &[u8]) -> Result<(), TransferError> {
64 self.stream
65 .write_all(data)
66 .await
67 .map_err(|e| TransferError::IoError(e.to_string()))
68 }
69
70 pub async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, TransferError> {
71 self.stream
72 .read(buffer)
73 .await
74 .map_err(|e| TransferError::IoError(e.to_string()))
75 }
76}
77
78
79pub struct FileTransferProtocol {
81 pub path: PathBuf,
83 pub chunk_size: u64,
85}
86
87impl FileTransferProtocol {
88 pub fn new(path: &str, chunk_size: u64) -> Self {
90 FileTransferProtocol {
91 path: PathBuf::from(path),
92 chunk_size,
93 }
94 }
95
96 pub async fn init_send(&self, connection : &mut Connection<'_>) -> Result<(), TransferError> {
98 let is_file = self.path.is_file();
99 if is_file{
100 self.send_file(&self.path, connection).await.expect("Could not send file");
101 }
102 else{
103 self.send_dir(connection).await.unwrap();
104 }
105
106 Ok(())
107 }
108
109 pub async fn send_metadata(&self, connection: &mut Connection<'_>, metadata: &FileMetadata) -> Result<(), TransferError> {
110 let serialized_metadata = serde_json::to_vec(metadata)
111 .map_err(|e| TransferError::IoError(format!("Failed to serialize metadata: {}", e)))?;
112
113 connection.write(&(serialized_metadata.len() as u64).to_le_bytes()).await?; connection.write(&serialized_metadata).await?; Ok(())
118 }
119
120 pub async fn receive_metadata(&self, connection: &mut Connection<'_>) -> Result<FileMetadata, TransferError> {
121 let mut size_buffer = [0u8; 8];
122 connection.read(&mut size_buffer).await?; let metadata_size = u64::from_le_bytes(size_buffer);
124
125 let mut metadata_buffer = vec![0u8; metadata_size as usize];
126 connection.read(&mut metadata_buffer).await?; let metadata: FileMetadata = serde_json::from_slice(&metadata_buffer)
129 .map_err(|e| TransferError::IoError(format!("Failed to deserialize metadata: {}", e)))?;
130
131 Ok(metadata)
132 }
133
134
135
136 pub async fn send_file(&self, file_path: &Path, connection: &mut Connection<'_>) -> Result<(), TransferError> {
138 let mut file = tokio::fs::File::open(&file_path).await.map_err(|_| TransferError::FileNotFound)?;
139 let metadata = file.metadata().await?;
140
141 self.send_metadata(connection, &FileMetadata::new(&file_path, metadata.len())).await?;
143
144 let mut buffer = vec![0u8; 65536]; let mut total_bytes_sent = 0;
146
147 loop {
148 let n = file.read(&mut buffer).await.map_err(TransferError::from)?;
150 if n == 0 {
151 println!("End of file reached, file transfer complete.");
153 break;
154 }
155
156 connection.write(&buffer[..n]).await?;
158
159 total_bytes_sent += n as u64;
161 println!("Sent {} bytes so far. ", total_bytes_sent);
162 }
163 drop(file);
164 println!("Total bytes sent: {}", total_bytes_sent);
165 Ok(())
166 }
167
168 pub fn send_dir<'a>(
170 &'a self,
171 connection: &'a mut Connection<'_>,
172 ) -> BoxFuture<'a, Result<(), TransferError>> {
173 Box::pin(async move {
174 let path = self.path.clone(); let zip_path = path.with_extension("zip");
176 let zip_clone = zip_path.clone();
177 let handle = tokio::task::spawn_blocking( move || {
178 start_compressing(&path, &zip_path, zip::CompressionMethod::Stored).expect("Could not compress directory");
179 });
180 handle.await.unwrap();
181 self.send_file(&zip_clone, connection).await?;
182 tokio::fs::remove_file(&zip_clone).await?;
183 Ok(())
184 })
185 }
186
187 pub async fn receive_file(&self, file_path: &Path, connection: &mut Connection<'_>, expected_size: u64) -> Result<(), TransferError> {
189 let mut file = tokio::fs::File::create(file_path).await?;
190 let mut buffer = vec![0u8; 65536];
191 let mut total_bytes_received = 0;
192
193 while total_bytes_received < expected_size {
194 let n = connection.read(&mut buffer).await?;
195 if n == 0 {
196 return Err(TransferError::ConnectionClosed); }
198 file.write_all(&buffer[..n]).await?;
199 total_bytes_received += n as u64;
200 print!("\rReceived {} bytes so far", total_bytes_received);
201 }
202 drop(file);
203 println!("Total bytes received: {}", total_bytes_received);
204 Ok(())
205 }
206
207 pub async fn receive(&self, connection: &mut Connection<'_>) -> Result<(), TransferError> {
209 println!("Recieving directory to path: {:?}", self.path);
210 let metadata = self.receive_metadata(connection).await?;
211 println!("Metadata: {:?}", metadata);
212 let file_path = self.path.join(metadata.name);
213 println!("file path: {:?}", file_path);
214 self.receive_file(&file_path, connection, metadata.size).await?;
215 println!("file received");
216
217 if file_path.extension() == Some("zip".as_ref()) {
218 println!("uzipping...");
219 unzip_file(
220 file_path.to_str().unwrap(), file_path.with_extension("").to_str().unwrap())
222 .unwrap(); println!("file unzipped");
224 }
225 Ok(())
226 }
227}