use crate::stream::{EncStreamWriter, LastStreamElement, StreamChunk};
use crate::CryptrError;
use async_trait::async_trait;
use flume::Receiver;
use std::fmt::Formatter;
use tracing::debug;
#[derive(Debug)]
pub struct FileWriter<'a> {
pub path: &'a str,
pub overwrite_target: bool,
}
#[async_trait]
impl EncStreamWriter for FileWriter<'_> {
fn debug_writer(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FileWriter({}, overwrite_target: {})",
self.path, self.overwrite_target
)
}
async fn write(
&mut self,
rx: Receiver<Result<(LastStreamElement, StreamChunk), CryptrError>>,
) -> Result<(), CryptrError> {
use tokio::fs;
use tokio::fs::{File, OpenOptions};
use tokio::io::AsyncWriteExt;
let mut should_remove = false;
if let Ok(f) = File::open(&self.path).await {
let meta = f.metadata().await?;
if meta.is_dir() {
return Err(CryptrError::File("Target file is a directory"));
}
if self.overwrite_target {
should_remove = true;
} else {
return Err(CryptrError::File("Target file exists already"));
}
}
if should_remove {
fs::remove_file(&self.path).await?;
}
let mut opts = OpenOptions::new();
opts.append(true);
opts.create(true);
let mut file = opts.open(&self.path).await?;
let mut total = 0;
loop {
match rx.recv_async().await {
Ok(Ok((is_last, data))) => {
let payload = data.as_ref();
let length = file.write(payload).await?;
total += length;
if is_last == LastStreamElement::Yes {
debug!("Last payload received. Total bytes written: {}", total);
break;
}
}
Ok(Err(err)) => {
return Err(err);
}
Err(_) => {
return Err(CryptrError::Generic(
"Decryption task closed the channel".to_string(),
));
}
}
}
debug!("Writer exiting: {} bytes written", total);
Ok(())
}
}