file_transfer_system/
file_transfer.rsuse tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use std::path::{Path, PathBuf};
use std::io::Error as IoError;
use serde::{Serialize, Deserialize};
use futures::future::BoxFuture;
use crate::compression::{start_compressing, unzip_file};
#[derive(Debug, Serialize, Deserialize)]
pub enum TransferError {
IoError(String),
ConnectionClosed,
FileNotFound,
FileCorrupted,
ChunkError,
}
impl From<IoError> for TransferError {
fn from(err: IoError) -> TransferError {
TransferError::IoError(err.to_string())
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum PathType {
File,
Directory,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct FileMetadata {
pub name: String, pub size: u64, pub checksum: Option<String>, }
impl FileMetadata {
pub fn new(path: &Path, size: u64) -> FileMetadata {
let name = path.file_name().unwrap().to_str().unwrap().to_owned();
FileMetadata {
name,
size,
checksum: None, }
}
}
pub struct Connection<'a> {
pub stream: &'a mut TcpStream,
}
impl<'a> Connection<'a> {
pub async fn write(&mut self, data: &[u8]) -> Result<(), TransferError> {
self.stream
.write_all(data)
.await
.map_err(|e| TransferError::IoError(e.to_string()))
}
pub async fn read(&mut self, buffer: &mut [u8]) -> Result<usize, TransferError> {
self.stream
.read(buffer)
.await
.map_err(|e| TransferError::IoError(e.to_string()))
}
}
pub struct FileTransferProtocol {
pub path: PathBuf,
pub chunk_size: u64,
}
impl FileTransferProtocol {
pub fn new(path: &str, chunk_size: u64) -> Self {
FileTransferProtocol {
path: PathBuf::from(path),
chunk_size,
}
}
pub async fn init_send(&self, connection : &mut Connection<'_>) -> Result<(), TransferError> {
let is_file = self.path.is_file();
if is_file{
self.send_file(&self.path, connection).await.expect("Could not send file");
}
else{
self.send_dir(connection).await.unwrap();
}
Ok(())
}
pub async fn send_metadata(&self, connection: &mut Connection<'_>, metadata: &FileMetadata) -> Result<(), TransferError> {
let serialized_metadata = serde_json::to_vec(metadata)
.map_err(|e| TransferError::IoError(format!("Failed to serialize metadata: {}", e)))?;
connection.write(&(serialized_metadata.len() as u64).to_le_bytes()).await?; connection.write(&serialized_metadata).await?; Ok(())
}
pub async fn receive_metadata(&self, connection: &mut Connection<'_>) -> Result<FileMetadata, TransferError> {
let mut size_buffer = [0u8; 8];
connection.read(&mut size_buffer).await?; let metadata_size = u64::from_le_bytes(size_buffer);
let mut metadata_buffer = vec![0u8; metadata_size as usize];
connection.read(&mut metadata_buffer).await?; let metadata: FileMetadata = serde_json::from_slice(&metadata_buffer)
.map_err(|e| TransferError::IoError(format!("Failed to deserialize metadata: {}", e)))?;
Ok(metadata)
}
pub async fn send_file(&self, file_path: &Path, connection: &mut Connection<'_>) -> Result<(), TransferError> {
let mut file = tokio::fs::File::open(&file_path).await.map_err(|_| TransferError::FileNotFound)?;
let metadata = file.metadata().await?;
self.send_metadata(connection, &FileMetadata::new(&file_path, metadata.len())).await?;
let mut buffer = vec![0u8; 65536]; let mut total_bytes_sent = 0;
loop {
let n = file.read(&mut buffer).await.map_err(TransferError::from)?;
if n == 0 {
println!("End of file reached, file transfer complete.");
break;
}
connection.write(&buffer[..n]).await?;
total_bytes_sent += n as u64;
println!("Sent {} bytes so far. ", total_bytes_sent);
}
drop(file);
println!("Total bytes sent: {}", total_bytes_sent);
Ok(())
}
pub fn send_dir<'a>(
&'a self,
connection: &'a mut Connection<'_>,
) -> BoxFuture<'a, Result<(), TransferError>> {
Box::pin(async move {
let path = self.path.clone(); let zip_path = path.with_extension("zip");
let zip_clone = zip_path.clone();
let handle = tokio::task::spawn_blocking( move || {
start_compressing(&path, &zip_path, zip::CompressionMethod::Stored).expect("Could not compress directory");
});
handle.await.unwrap();
self.send_file(&zip_clone, connection).await?;
tokio::fs::remove_file(&zip_clone).await?;
Ok(())
})
}
pub async fn receive_file(&self, file_path: &Path, connection: &mut Connection<'_>, expected_size: u64) -> Result<(), TransferError> {
let mut file = tokio::fs::File::create(file_path).await?;
let mut buffer = vec![0u8; 65536];
let mut total_bytes_received = 0;
while total_bytes_received < expected_size {
let n = connection.read(&mut buffer).await?;
if n == 0 {
return Err(TransferError::ConnectionClosed); }
file.write_all(&buffer[..n]).await?;
total_bytes_received += n as u64;
print!("\rReceived {} bytes so far", total_bytes_received);
}
drop(file);
println!("Total bytes received: {}", total_bytes_received);
Ok(())
}
pub async fn receive(&self, connection: &mut Connection<'_>) -> Result<(), TransferError> {
println!("Recieving directory to path: {:?}", self.path);
let metadata = self.receive_metadata(connection).await?;
println!("Metadata: {:?}", metadata);
let file_path = self.path.join(metadata.name);
println!("file path: {:?}", file_path);
self.receive_file(&file_path, connection, metadata.size).await?;
println!("file received");
if file_path.extension() == Some("zip".as_ref()) {
println!("uzipping...");
unzip_file(
file_path.to_str().unwrap(), file_path.with_extension("").to_str().unwrap())
.unwrap(); println!("file unzipped");
}
Ok(())
}
}