use crate::descriptor::{Descriptor, DestPileDescriptor, SourceDescriptor, VirtualSourceKind};
use crate::labelling::{label_node, load_labelling_rules, str_to_label, Label, State};
use crate::tree::{scan, FileTree};
use anyhow::anyhow;
use arc_interner::ArcIntern;
use chrono::{DateTime, Utc};
use log::{error, info, warn};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::io::Write;
use std::path::Path;
use std::process::{Child, Command, Stdio};
use yama::chunking::SENSIBLE_THRESHOLD;
use yama::commands::{load_pile_descriptor, open_pile, store_tree_node};
use yama::definitions::{
FilesystemOwnership, FilesystemPermissions, PointerData, RecursiveChunkRef, RootTreeNode,
TreeNode,
};
pub const POINTER_DATETIME_FORMAT: &'static str = "%F_%T";
pub const POINTER_FIELD_SEPARATOR: char = '+';
pub fn get_pointer_name_at(source_name: &str, datetime: DateTime<Utc>) -> String {
format!(
"{}{}{}",
source_name,
POINTER_FIELD_SEPARATOR,
datetime.format(POINTER_DATETIME_FORMAT).to_string()
)
}
pub fn open_stdout_backup_process(
extra_args: &HashMap<String, toml::Value>,
program_name: &str,
) -> anyhow::Result<Child> {
let mut child = Command::new(program_name)
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.stdin(Stdio::piped())
.spawn()?;
let mut child_stdin = child.stdin.as_mut().unwrap();
serde_json::to_writer(&mut child_stdin, extra_args)?;
child_stdin.flush()?;
child.stdin = None;
Ok(child)
}
pub fn backup_source_to_destination(
source: &SourceDescriptor,
dest: &DestPileDescriptor,
descriptor: &Descriptor,
desc_path: &Path,
source_name: &str,
dest_name: &str,
num_workers: u8,
) -> anyhow::Result<()> {
match source {
SourceDescriptor::DirectorySource {
hostname: _,
directory,
} => {
info!("Looking to backup {} to {}", source_name, dest_name);
info!("Scanning.");
let tree = scan(directory)?.ok_or_else(|| anyhow!("Source does not exist."))?;
info!("Labelling.");
let mut tree = tree.replace_meta(&None);
let labels = descriptor
.labels
.iter()
.map(|l| Label(ArcIntern::new(l.clone())))
.collect();
let rules = load_labelling_rules(desc_path, source_name)?;
label_node("".to_owned(), None, &mut tree, &labels, &rules)?;
let included_labels: HashSet<Label> =
dest.included_labels.iter().map(str_to_label).collect();
info!("Filtering.");
let mut unlabelled_included = false;
if !tree.filter_inclusive(&mut |node| {
match node.get_metadata().unwrap() {
None => {
unlabelled_included = true;
true
}
Some(State::Excluded) => {
false
}
Some(State::Labelled(label)) => {
included_labels.contains(&label)
}
Some(State::Split) => {
assert!(
node.is_dir(),
"Non-directories should not be labelled for Split!"
);
false
}
}
}) {
info!("Empty filter. Stopping.");
return Ok(());
}
if unlabelled_included {
warn!("Unlabelled nodes. They have been included for safety, but you should consider running\n\t'datman ilabel {}'\nat some point to assign labels.", source_name);
}
let absolute_source_path = desc_path.join(directory);
let absolute_dest_path = desc_path.join(&dest.path);
let pile_descriptor = load_pile_descriptor(&absolute_dest_path)?;
let pile = open_pile(&absolute_dest_path, &pile_descriptor)?;
let root = convert_filetree_to_yamatree(&tree);
let pointer_name = get_pointer_name_at(&source_name, Utc::now());
info!("Will write as pointer {:?}.", pointer_name);
info!("Searching for suitable parents.");
let mut parent: Option<String> = None;
let prefix = format!("{}+", source_name);
for pointer in pile.list_pointers()?.iter() {
if pointer.starts_with(&prefix) {
match parent.as_ref() {
None => {
parent = Some(pointer.to_owned());
}
Some(cur_parent) => {
if cur_parent < pointer {
parent = Some(pointer.to_owned());
}
}
}
}
}
match parent.as_ref() {
Some(parent) => {
info!("Using parent: {}", parent);
}
None => {
info!("No suitable parent found.");
}
}
info!("Storing using yama.");
yama::operations::storing::store_fully(
&pile,
&absolute_source_path,
&pointer_name,
root,
parent,
num_workers,
)?;
info!("Stored!");
}
SourceDescriptor::VirtualSource {
helper,
label,
kind: VirtualSourceKind::Stdout { filename },
extra_args,
} => {
if !dest.included_labels.contains(label) {
info!("Skipping because the source's label is not included in this destination!");
return Ok(());
}
info!("Starting up process and writing to yama store.");
let absolute_dest_path = desc_path.join(&dest.path);
let pile_descriptor = load_pile_descriptor(&absolute_dest_path)?;
let pile = open_pile(&absolute_dest_path, &pile_descriptor)?;
let pointer_name = get_pointer_name_at(&source_name, Utc::now());
info!("Will write as pointer {:?}.", pointer_name);
let mut chunker = yama::chunking::RecursiveChunker::new(SENSIBLE_THRESHOLD, &pile);
let mut process = open_stdout_backup_process(extra_args, helper)?;
info!("Storing. No progress bar is available for this style of backup yet.");
std::io::copy(process.stdout.as_mut().unwrap(), &mut chunker)?;
let exit_status = process.wait()?;
if !exit_status.success() {
error!(
"The process was not successful (exit code {}). Exiting.",
exit_status.code().unwrap()
);
}
let data_chunk_ref = chunker.finish()?;
eprintln!("Stored data! Now writing a pointer...");
let root = TreeNode::NormalFile {
mtime: Utc::now().timestamp_millis() as u64,
ownership: FilesystemOwnership {
uid: u16::MAX,
gid: u16::MAX,
},
permissions: FilesystemPermissions { mode: 0o600 },
content: data_chunk_ref,
};
let pointer_chunk_ref = store_tree_node(
&pile,
&RootTreeNode {
name: filename.to_owned(),
node: root,
},
)?;
let pointer_data = PointerData {
chunk_ref: pointer_chunk_ref,
parent_pointer: None,
uid_lookup: Default::default(),
gid_lookup: Default::default(),
};
pile.write_pointer(&pointer_name, &pointer_data)?;
eprintln!("Pointer saved!");
}
}
Ok(())
}
pub fn convert_filetree_to_yamatree<A, B, C, D>(
filetree: &FileTree<A, B, C, D>,
) -> yama::definitions::TreeNode
where
A: Debug + Clone + Eq + PartialEq,
B: Debug + Clone + Eq + PartialEq,
C: Debug + Clone + Eq + PartialEq,
D: Debug + Clone + Eq + PartialEq,
{
match filetree {
FileTree::NormalFile {
mtime,
ownership,
permissions,
meta: _,
} => TreeNode::NormalFile {
mtime: *mtime,
ownership: *ownership,
permissions: *permissions,
content: RecursiveChunkRef {
chunk_id: Default::default(),
depth: 0,
},
},
FileTree::Directory {
ownership,
permissions,
children,
meta: _,
} => TreeNode::Directory {
ownership: *ownership,
permissions: *permissions,
children: children
.iter()
.map(|(k, v)| (k.clone(), convert_filetree_to_yamatree(v)))
.collect(),
},
FileTree::SymbolicLink {
ownership,
target,
meta: _,
} => TreeNode::SymbolicLink {
ownership: *ownership,
target: target.clone(),
},
FileTree::Other(_) => {
panic!("Shouldn't be any Others in the tree.");
}
}
}
pub fn backup_all_sources_to_destination(
dest: &DestPileDescriptor,
descriptor: &Descriptor,
desc_path: &Path,
dest_name: &str,
num_workers: u8,
) -> anyhow::Result<()> {
for (source_name, source_descriptor) in descriptor.source.iter() {
backup_source_to_destination(
source_descriptor,
dest,
descriptor,
desc_path,
source_name.as_str(),
dest_name,
num_workers,
)?;
}
Ok(())
}