file_transfer_system/
file_transfer.rs

1use tokio::io::{AsyncReadExt, AsyncWriteExt};
2use tokio::net::TcpStream;
3use std::path::{Path, PathBuf};
4use std::io::Error as IoError;
5use serde::{Serialize, Deserialize};
6use futures::future::BoxFuture;
7use crate::compression::{start_compressing, unzip_file};
8
9/// Represents various errors that can occur during file transfer operations.
10#[derive(Debug, Serialize, Deserialize)]
11pub enum TransferError {
12    /// An I/O error, storing the error message as a `String`.
13    IoError(String),
14    /// Error indicating that the connection has closed unexpectedly.
15    ConnectionClosed,
16    /// Error when the specified file is not found.
17    FileNotFound,
18    /// Error indicating file corruption.
19    FileCorrupted,
20    /// Error encountered when handling chunks.
21    ChunkError,
22}
23
24impl From<IoError> for TransferError {
25    fn from(err: IoError) -> TransferError {
26        TransferError::IoError(err.to_string())
27    }
28}
29
30/// Specifies whether a path is a file or a directory.
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub enum PathType {
33    /// Represents a file.
34    File,
35    /// Represents a directory.
36    Directory,
37}
38
39#[derive(Serialize, Deserialize, Debug)]
40pub struct FileMetadata {
41    pub name: String,      // Name of the file or directory (relative path)
42    pub size: u64,         // Size of the file (in bytes, 0 for directories)
43    pub checksum: Option<String>, // Optional checksum to verify integrity
44}
45
46impl FileMetadata {
47    pub fn new(path: &Path, size: u64) -> FileMetadata {
48        let name = path.file_name().unwrap().to_str().unwrap().to_owned();
49        FileMetadata {
50            name,
51            size,
52            checksum: None, // Can be calculated if needed
53        }
54    }
55}
56
57/// Represents a connection over a TCP stream.
58pub struct Connection<'a> {
59    pub stream: &'a mut TcpStream,
60}
61
62impl<'a> Connection<'a> {
63    pub async fn write(&mut self, data: &[u8]) -> Result<(), TransferError> {
64        self.stream
65            .write_all(data)
66            .await
67            .map_err(|e| TransferError::IoError(e.to_string()))
68    }
69
70    pub async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, TransferError> {
71        self.stream
72            .read(buffer)
73            .await
74            .map_err(|e| TransferError::IoError(e.to_string()))
75    }
76}
77
78
79/// Defines the file transfer protocol, allowing files and directories to be transferred.
80pub struct FileTransferProtocol {
81    /// The path of the file or directory to transfer.
82    pub path: PathBuf,
83    /// The chunk size in bytes for data transfer.
84    pub chunk_size: u64,
85}
86
87impl FileTransferProtocol {
88    /// Creates a new instance of `FileTransferProtocol`.
89    pub fn new(path: &str, chunk_size: u64) -> Self {
90        FileTransferProtocol {
91            path: PathBuf::from(path),
92            chunk_size,
93        }
94    }
95
96    /// Initiates sending a file or directory based on the `path` provided.
97    pub async fn init_send(&self, connection : &mut Connection<'_>) -> Result<(), TransferError> {
98        let is_file = self.path.is_file();
99        if is_file{
100            self.send_file(&self.path, connection).await.expect("Could not send file");
101        }
102        else{
103            self.send_dir(connection).await.unwrap();
104        }
105
106        Ok(())
107    }
108
109    pub async fn send_metadata(&self, connection: &mut Connection<'_>, metadata: &FileMetadata) -> Result<(), TransferError> {
110        let serialized_metadata = serde_json::to_vec(metadata)
111            .map_err(|e| TransferError::IoError(format!("Failed to serialize metadata: {}", e)))?;
112        
113        // Send the metadata (size first, then the actual metadata)
114        connection.write(&(serialized_metadata.len() as u64).to_le_bytes()).await?; // Send size of metadata
115        connection.write(&serialized_metadata).await?; // Send the metadata
116        
117        Ok(())
118    }
119
120    pub async fn receive_metadata(&self, connection: &mut Connection<'_>) -> Result<FileMetadata, TransferError> {
121        let mut size_buffer = [0u8; 8];
122        connection.read(&mut size_buffer).await?;  // Read the size of the metadata
123        let metadata_size = u64::from_le_bytes(size_buffer);
124
125        let mut metadata_buffer = vec![0u8; metadata_size as usize];
126        connection.read(&mut metadata_buffer).await?;  // Read the metadata itself
127
128        let metadata: FileMetadata = serde_json::from_slice(&metadata_buffer)
129            .map_err(|e| TransferError::IoError(format!("Failed to deserialize metadata: {}", e)))?;
130
131        Ok(metadata)
132    }
133
134
135
136    /// Sends a single file in chunks over the TCP connection using a fixed-size buffer.
137    pub async fn send_file(&self, file_path: &Path, connection: &mut Connection<'_>) -> Result<(), TransferError> {
138        let mut file = tokio::fs::File::open(&file_path).await.map_err(|_| TransferError::FileNotFound)?;
139        let metadata = file.metadata().await?;
140        
141        // send file metadata to receiving side
142        self.send_metadata(connection, &FileMetadata::new(&file_path, metadata.len())).await?;
143        
144        let mut buffer = vec![0u8; 65536]; // Allocates on the heap
145        let mut total_bytes_sent = 0;
146
147        loop {
148            // Read a chunk from the file into the fixed-size buffer
149            let n = file.read(&mut buffer).await.map_err(TransferError::from)?;
150            if n == 0 {
151                // End-of-file; file read is complete
152                println!("End of file reached, file transfer complete.");
153                break;
154            }
155
156            // Write the buffer contents to the connection
157            connection.write(&buffer[..n]).await?;
158
159            // Track total bytes sent and print progress
160            total_bytes_sent += n as u64;
161            println!("Sent {} bytes so far. ", total_bytes_sent);
162        }
163        drop(file);
164        println!("Total bytes sent: {}", total_bytes_sent);
165        Ok(())
166    }
167
168    /// Sends a directory and its contents recursively over the TCP connection.
169    pub fn send_dir<'a>(
170        &'a self,
171        connection: &'a mut Connection<'_>,
172    ) -> BoxFuture<'a, Result<(), TransferError>> {
173        Box::pin(async move {
174            let path = self.path.clone();  // Clone the path here
175            let zip_path = path.with_extension("zip");
176            let zip_clone = zip_path.clone();
177            let handle = tokio::task::spawn_blocking( move || {
178                start_compressing(&path, &zip_path, zip::CompressionMethod::Stored).expect("Could not compress directory");
179            });
180            handle.await.unwrap();
181            self.send_file(&zip_clone, connection).await?;
182            tokio::fs::remove_file(&zip_clone).await?;
183            Ok(())
184        })
185    }
186
187    /// Receives a file in chunks and writes it to disk.
188    pub async fn receive_file(&self, file_path: &Path, connection: &mut Connection<'_>, expected_size: u64) -> Result<(), TransferError> {
189        let mut file = tokio::fs::File::create(file_path).await?;
190        let mut buffer = vec![0u8; 65536];
191        let mut total_bytes_received = 0;
192
193        while total_bytes_received < expected_size {
194            let n = connection.read(&mut buffer).await?;
195            if n == 0 {
196                return Err(TransferError::ConnectionClosed); // Handle unexpected disconnection
197            }
198            file.write_all(&buffer[..n]).await?;
199            total_bytes_received += n as u64;
200            print!("\rReceived {} bytes so far", total_bytes_received);
201        }
202        drop(file);
203        println!("Total bytes received: {}", total_bytes_received);
204        Ok(())
205    }
206
207    /// Receives a directory and its contents recursively from the TCP connection.
208    pub async fn receive(&self, connection: &mut Connection<'_>) -> Result<(), TransferError> {
209        println!("Recieving directory to path: {:?}", self.path);
210        let metadata = self.receive_metadata(connection).await?;
211        println!("Metadata: {:?}", metadata);
212        let file_path = self.path.join(metadata.name);
213        println!("file path: {:?}", file_path);
214        self.receive_file(&file_path, connection, metadata.size).await?;
215        println!("file received");
216
217        if file_path.extension() == Some("zip".as_ref()) {
218            println!("uzipping...");
219            unzip_file(
220            file_path.to_str().unwrap(), // foo.zip
221            file_path.with_extension("").to_str().unwrap())
222            .unwrap(); // foo
223            println!("file unzipped");
224        }
225        Ok(())
226    }
227}