use std::fmt::{Display, Formatter, Write};
use std::process::Stdio;
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use crate::asynchronous::execute_borg;
use crate::common::{create_fmt_args, create_parse_output, CommonOptions, CreateOptions};
use crate::errors::CreateError;
use crate::output::create::Create;
use crate::output::logging::{LevelName, LoggingMessage, MessageId};
pub async fn create(
options: &CreateOptions,
common_options: &CommonOptions,
) -> Result<Create, CreateError> {
let local_path = common_options.local_path.as_ref().map_or("borg", |x| x);
let args = create_fmt_args(options, common_options, false);
debug!("Calling borg: {local_path} {args}");
let args = shlex::split(&args).ok_or(CreateError::ShlexError)?;
let res = execute_borg(local_path, args, &options.passphrase).await?;
let stats = create_parse_output(res)?;
info!("Finished creating archive");
Ok(stats)
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum CreateProgress {
Progress {
original_size: u64,
compressed_size: u64,
deduplicated_size: u64,
nfiles: u64,
path: String,
},
Finished,
}
impl Display for CreateProgress {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
CreateProgress::Progress {
original_size,
compressed_size,
deduplicated_size,
nfiles,
path,
} => {
write!(
f,
"O {original_size} C {compressed_size} D {deduplicated_size} N {nfiles} {path}",
)
}
CreateProgress::Finished => write!(f, "Finished"),
}
}
}
pub async fn create_progress(
options: &CreateOptions,
common_options: &CommonOptions,
progress_channel: tokio::sync::mpsc::Sender<CreateProgress>,
) -> Result<Create, CreateError> {
let local_path = common_options.local_path.as_ref().map_or("borg", |x| x);
let args = create_fmt_args(options, common_options, true);
debug!("Calling borg: {local_path} {args}");
let args = shlex::split(&args).ok_or(CreateError::ShlexError)?;
let mut child = if let Some(passphrase) = &options.passphrase {
tokio::process::Command::new(local_path)
.env("BORG_PASSPHRASE", passphrase)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()?
} else {
tokio::process::Command::new(local_path)
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()?
};
let mut stdout = child.stdout.take().ok_or(CreateError::PipeFailed)?;
let stderr = child.stderr.take().ok_or(CreateError::PipeFailed)?;
let mut stderr_reader = BufReader::new(stderr).lines();
let mut output = String::new();
loop {
tokio::select! {
result = stderr_reader.next_line() => match result {
Ok(Some(line)) => {
writeln!(output, "{line}").unwrap();
let res: LoggingMessage = serde_json::from_str(&line)?;
if let LoggingMessage::ArchiveProgress {
original_size,
compressed_size,
deduplicated_size,
nfiles,
path,
finished,
..
} = res {
if finished {
trace!("Progress: finished");
if let Err(err) = progress_channel.send(CreateProgress::Finished).await {
error!("Could not send to progress channel: {err}");
}
continue;
}
if let Err(err) = progress_channel.send(CreateProgress::Progress {
original_size: original_size.unwrap(),
compressed_size: compressed_size.unwrap(),
deduplicated_size: deduplicated_size.unwrap(),
nfiles: nfiles.unwrap(),
path: path.unwrap(),
}).await {
error!("Could not send to progress channel: {err}");
}
} else if let LoggingMessage::LogMessage {
name,
message,
level_name,
time,
msg_id,
} = res {
match level_name {
LevelName::Debug => debug!("{time} {name}: {message}"),
LevelName::Info => info!("{time} {name}: {message}"),
LevelName::Warning => warn!("{time} {name}: {message}"),
LevelName::Error => error!("{time} {name}: {message}"),
LevelName::Critical => error!("{time} {name}: {message}"),
}
if let Some(MessageId::RepositoryAlreadyExists) = msg_id {
return Err(CreateError::ArchiveAlreadyExists);
}
}
},
Err(_) => break,
_ => (),
},
result = child.wait() => {
if let Ok(exit_code) = result {
debug!("Child process exited with {exit_code}");
if exit_code.code().unwrap() > 1 {
return Err(CreateError::Unknown(output));
}
}
break }
}
}
let mut stdout_str = String::new();
stdout
.read_to_string(&mut stdout_str)
.await
.map_err(CreateError::InvalidBorgOutput)?;
trace!("Parsing stats: {stdout_str}");
let stats: Create = serde_json::from_str(&stdout_str)?;
info!("Finished creating archive");
Ok(stats)
}