use crate::{
StreamData,
endpoint::FtpEndpoint,
error::Error,
writer::{StreamWriter, WriteJob},
};
use async_std::channel::Receiver;
use async_trait::async_trait;
use ftp::FtpStream;
use std::{
io::Write,
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
fn get_directory(path: &str) -> Vec<String> {
let destination_path = Path::new(path);
destination_path
.parent()
.unwrap_or_else(|| Path::new("/"))
.iter()
.map(|item| item.to_os_string().to_str().unwrap().to_string())
.collect()
}
fn get_filename(path: &str) -> Result<String, Error> {
let destination_path = Path::new(path);
Ok(
destination_path
.file_name()
.ok_or_else(|| Error::Other("Cannot get destination filename.".to_string()))?
.to_str()
.ok_or_else(|| Error::Other("Cannot get destination filename as string.".to_string()))?
.to_string(),
)
}
impl FtpEndpoint {
async fn upload_file(
&self,
ftp_stream: Arc<Mutex<FtpStream>>,
path: &str,
receiver: Receiver<StreamData>,
job_and_notification: &dyn WriteJob,
) -> Result<(), Error> {
let absolute_path = self.absolute_path(path);
let destination_directory = get_directory(&absolute_path);
let filename = get_filename(&absolute_path)?;
let mut root_dir = PathBuf::from("/");
for folder in destination_directory.iter() {
if folder == "/" {
continue;
}
root_dir = root_dir.join(folder);
let pathname = root_dir.to_str().unwrap();
let mut ftp_stream = ftp_stream.lock().unwrap();
if ftp_stream.cwd(pathname).is_err() {
ftp_stream.mkdir(pathname)?;
}
}
ftp_stream.lock().unwrap().cwd(root_dir.to_str().unwrap())?;
let mut file_size = None;
let mut received_bytes = 0;
let mut prev_percent = 0;
log::debug!(target: &job_and_notification.get_str_job_id(), "Start FTP upload to file: {filename}, directory: {root_dir:?}.");
let mut stream = ftp_stream.lock().unwrap().start_put_file(&filename)?;
while let Ok(stream_data) = receiver.recv().await {
match stream_data {
StreamData::Size(size) => file_size = Some(size),
StreamData::Stop => break,
StreamData::Eof => {
stream.flush()?;
break;
}
StreamData::Data(ref data) => {
received_bytes += data.len();
if let Some(file_size) = file_size {
let percent = (received_bytes as f32 / file_size as f32 * 100.0) as u8;
if percent > prev_percent {
prev_percent = percent;
job_and_notification.progress(percent)?;
}
}
stream.write_all(data)?;
}
}
}
Ok(())
}
}
#[async_trait]
impl StreamWriter for FtpEndpoint {
async fn write_stream(
&self,
path: &str,
receiver: Receiver<StreamData>,
job_and_notification: &dyn WriteJob,
) -> Result<(), Error> {
let ftp_stream = self.connection();
self
.upload_file(ftp_stream.clone(), path, receiver, job_and_notification)
.await?;
log::info!(target: &job_and_notification.get_str_job_id(), "ending FTP data connection");
ftp_stream.lock().unwrap().finish_put_file()?;
log::info!(target: &job_and_notification.get_str_job_id(), "closing FTP connection");
ftp_stream.lock().unwrap().quit()?;
Ok(())
}
}
#[test]
pub fn test_get_directory() {
let path = "/path/to/directory/file.ext";
let directory = get_directory(path);
assert_eq!(
directory,
vec![
"/".to_string(),
"path".to_string(),
"to".to_string(),
"directory".to_string()
]
);
}
#[test]
pub fn test_get_filename() {
let path = "/path/to/directory/file.ext";
let filename = get_filename(path).unwrap();
assert_eq!(filename, "file.ext");
}