use crate::{
StreamData,
endpoint::SftpEndpoint,
error::Error,
writer::{StreamWriter, WriteJob},
};
use async_std::channel::Receiver;
use std::io::Write;
#[async_trait::async_trait]
impl StreamWriter for SftpEndpoint {
async fn write_stream(
&self,
path: &str,
receiver: Receiver<StreamData>,
job_and_notification: &dyn WriteJob,
) -> Result<(), Error> {
let connection = self.connection();
let absolute_path = self.absolute_path(path);
let mut sftp_writer = connection.lock().unwrap().write_over_sftp(&absolute_path)?;
let mut file_size = None;
let mut received_bytes = 0;
let mut prev_percent = 0;
while let Ok(stream_data) = receiver.recv().await {
match stream_data {
StreamData::Size(size) => file_size = Some(size),
StreamData::Stop => break,
StreamData::Eof => {
sftp_writer.flush()?;
break;
}
StreamData::Data(ref data) => {
log::debug!(target: &job_and_notification.get_str_job_id(), "Receive {} bytes to write...", data.len());
received_bytes += data.len();
if let Some(file_size) = file_size {
let percent = (received_bytes as f32 / file_size as f32 * 100.0) as u8;
if percent > prev_percent {
prev_percent = percent;
job_and_notification.progress(percent)?;
}
}
sftp_writer.write_all(data)?;
}
}
}
Ok(())
}
}