pub mod excludes;
pub mod modify;
pub mod rewrite;
use std::{
borrow::Cow,
cmp::Ordering,
collections::{BTreeMap, BTreeSet, BinaryHeap},
ffi::OsStr,
mem,
path::{Component, Path, PathBuf, Prefix},
str::{self, Utf8Error},
};
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use derive_setters::Setters;
use ignore::Match;
use ignore::overrides::Override;
use serde::{Deserialize, Deserializer};
use serde_derive::Serialize;
use crate::{
backend::{
decrypt::DecryptReadBackend,
node::{Metadata, Node, NodeType},
},
blob::{BlobType, tree::excludes::Excludes},
crypto::hasher::hash,
error::{ErrorKind, RusticError, RusticResult},
impl_blobid,
index::ReadGlobalIndex,
progress::Progress,
repofile::snapshotfile::SnapshotSummary,
};
#[derive(thiserror::Error, Debug, displaydoc::Display)]
#[non_exhaustive]
pub enum TreeErrorKind {
ContainsCurrentOrParentDirectory,
SerializingTreeFailed(serde_json::Error),
PathIsNotUtf8Conform(Utf8Error),
Channel {
kind: &'static str,
source: Box<dyn std::error::Error + Send + Sync>,
},
}
pub(crate) type TreeResult<T> = Result<T, TreeErrorKind>;
pub(super) mod constants {
pub(super) const MAX_TREE_LOADER: usize = 4;
}
pub(crate) type TreeStreamItem = RusticResult<(PathBuf, Tree)>;
type NodeStreamItem = RusticResult<(PathBuf, Node)>;
impl_blobid!(TreeId, BlobType::Tree);
#[derive(Default, Serialize, Deserialize, Clone, Debug)]
pub struct Tree {
#[serde(deserialize_with = "deserialize_null_default")]
pub nodes: Vec<Node>,
}
pub(crate) fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
T: Default + Deserialize<'de>,
D: Deserializer<'de>,
{
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
}
impl Tree {
#[must_use]
pub(crate) const fn new() -> Self {
Self { nodes: Vec::new() }
}
pub(crate) fn add(&mut self, node: Node) {
self.nodes.push(node);
}
pub fn serialize(&self) -> TreeResult<(Vec<u8>, TreeId)> {
let mut chunk = serde_json::to_vec(&self).map_err(TreeErrorKind::SerializingTreeFailed)?;
chunk.push(b'\n');
let id = hash(&chunk).into();
Ok((chunk, id))
}
pub(crate) fn from_backend(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
id: TreeId,
) -> RusticResult<Self> {
let data = index
.get_tree(&id)
.ok_or_else(|| {
RusticError::new(
ErrorKind::Internal,
"Tree ID `{tree_id}` not found in index",
)
.attach_context("tree_id", id.to_string())
})?
.read_data(be)?;
let tree = serde_json::from_slice(&data).map_err(|err| {
RusticError::with_source(
ErrorKind::Internal,
"Failed to deserialize tree from JSON.",
err,
)
.ask_report()
})?;
Ok(tree)
}
pub(crate) fn node_from_path(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
id: TreeId,
path: &Path,
) -> RusticResult<Node> {
let mut node = Node::new_node(OsStr::new(""), NodeType::Dir, Metadata::default());
node.subtree = Some(id);
for p in path.components() {
if let Some(p) = comp_to_osstr(p).map_err(|err| {
RusticError::with_source(
ErrorKind::Internal,
"Failed to convert Path component `{path}` to OsString.",
err,
)
.attach_context("path", path.display().to_string())
.ask_report()
})? {
let id = node.subtree.ok_or_else(|| {
RusticError::new(ErrorKind::Internal, "Node `{node}` is not a directory.")
.attach_context("node", p.to_string_lossy())
.ask_report()
})?;
let tree = Self::from_backend(be, index, id)?;
node = tree
.nodes
.into_iter()
.find(|node| node.name() == p)
.ok_or_else(|| {
RusticError::new(ErrorKind::Internal, "Node `{node}` not found in tree.")
.attach_context("node", p.to_string_lossy())
.ask_report()
})?;
}
}
Ok(node)
}
pub(crate) fn find_nodes_from_path(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
ids: impl IntoIterator<Item = TreeId>,
path: &Path,
) -> RusticResult<FindNode> {
fn find_node_from_component(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
tree_id: TreeId,
path_comp: &[Cow<'_, OsStr>],
results_cache: &mut [BTreeMap<TreeId, Option<usize>>],
nodes: &mut BTreeMap<Node, usize>,
idx: usize,
) -> RusticResult<Option<usize>> {
if let Some(result) = results_cache[idx].get(&tree_id) {
return Ok(*result);
}
let tree = Tree::from_backend(be, index, tree_id)?;
let result = if let Some(node) = tree
.nodes
.into_iter()
.find(|node| node.name() == path_comp[idx])
{
if idx == path_comp.len() - 1 {
let new_idx = nodes.len();
let node_idx = nodes.entry(node).or_insert(new_idx);
Some(*node_idx)
} else {
let id = node.subtree.ok_or_else(|| {
RusticError::new(
ErrorKind::Internal,
"Subtree ID not found for node `{node}`",
)
.attach_context("node", path_comp[idx].to_string_lossy())
.ask_report()
})?;
find_node_from_component(
be,
index,
id,
path_comp,
results_cache,
nodes,
idx + 1,
)?
}
} else {
None
};
_ = results_cache[idx].insert(tree_id, result);
Ok(result)
}
let path_comp: Vec<_> = path
.components()
.filter_map(|p| comp_to_osstr(p).transpose())
.collect::<TreeResult<_>>()
.map_err(|err| {
RusticError::with_source(
ErrorKind::Internal,
"Failed to convert Path component `{path}` to OsString.",
err,
)
.attach_context("path", path.display().to_string())
.ask_report()
})?;
let mut results_cache = vec![BTreeMap::new(); path_comp.len()];
let mut nodes = BTreeMap::new();
let matches: Vec<_> = ids
.into_iter()
.map(|id| {
find_node_from_component(
be,
index,
id,
&path_comp,
&mut results_cache,
&mut nodes,
0,
)
})
.collect::<RusticResult<_>>()?;
let mut nodes: Vec<_> = nodes.into_iter().collect();
nodes.sort_unstable_by_key(|n| n.1);
let nodes = nodes.into_iter().map(|n| n.0).collect();
Ok(FindNode { nodes, matches })
}
pub(crate) fn find_matching_nodes(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
ids: impl IntoIterator<Item = TreeId>,
matches: &impl Fn(&Path, &Node) -> bool,
) -> RusticResult<FindMatches> {
#[derive(Default)]
struct MatchInternalState {
cache: BTreeMap<(TreeId, PathBuf), Vec<(usize, usize)>>,
nodes: BTreeMap<Node, usize>,
paths: BTreeMap<PathBuf, usize>,
}
impl MatchInternalState {
fn insert_result(&mut self, path: PathBuf, node: Node) -> (usize, usize) {
let new_idx = self.nodes.len();
let node_idx = self.nodes.entry(node).or_insert(new_idx);
let new_idx = self.paths.len();
let node_path_idx = self.paths.entry(path).or_insert(new_idx);
(*node_path_idx, *node_idx)
}
}
fn find_matching_nodes_recursive(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
tree_id: TreeId,
path: &Path,
state: &mut MatchInternalState,
matches: &impl Fn(&Path, &Node) -> bool,
) -> RusticResult<Vec<(usize, usize)>> {
let mut result = Vec::new();
if let Some(result) = state.cache.get(&(tree_id, path.to_path_buf())) {
return Ok(result.clone());
}
let tree = Tree::from_backend(be, index, tree_id)?;
for node in tree.nodes {
let node_path = path.join(node.name());
if node.is_dir() {
let id = node.subtree.ok_or_else(|| {
RusticError::new(
ErrorKind::Internal,
"Subtree ID not found for node `{node}`",
)
.attach_context("node", node.name().to_string_lossy())
.ask_report()
})?;
result.append(&mut find_matching_nodes_recursive(
be, index, id, &node_path, state, matches,
)?);
}
if matches(&node_path, &node) {
result.push(state.insert_result(node_path, node));
}
}
_ = state
.cache
.insert((tree_id, path.to_path_buf()), result.clone());
Ok(result)
}
let mut state = MatchInternalState::default();
let initial_path = PathBuf::new();
let matches: Vec<_> = ids
.into_iter()
.map(|id| {
find_matching_nodes_recursive(be, index, id, &initial_path, &mut state, matches)
})
.collect::<RusticResult<_>>()?;
let mut paths: Vec<_> = state.paths.into_iter().collect();
paths.sort_unstable_by_key(|n| n.1);
let paths = paths.into_iter().map(|n| n.0).collect();
let mut nodes: Vec<_> = state.nodes.into_iter().collect();
nodes.sort_unstable_by_key(|n| n.1);
let nodes = nodes.into_iter().map(|n| n.0).collect();
Ok(FindMatches {
paths,
nodes,
matches,
})
}
}
#[derive(Debug, Serialize)]
pub struct FindNode {
pub nodes: Vec<Node>,
pub matches: Vec<Option<usize>>,
}
#[derive(Debug, Serialize)]
pub struct FindMatches {
pub paths: Vec<PathBuf>,
pub nodes: Vec<Node>,
pub matches: Vec<Vec<(usize, usize)>>,
}
pub(crate) fn comp_to_osstr(p: Component<'_>) -> TreeResult<Option<Cow<'_, OsStr>>> {
let s = match p {
Component::RootDir => None,
Component::Prefix(p) => match p.kind() {
Prefix::Verbatim(p) | Prefix::DeviceNS(p) => Some(Cow::Borrowed(p)),
Prefix::VerbatimUNC(_, q) | Prefix::UNC(_, q) => Some(Cow::Borrowed(q)),
Prefix::VerbatimDisk(p) | Prefix::Disk(p) => Some(Cow::Owned(
OsStr::new(str::from_utf8(&[p]).map_err(TreeErrorKind::PathIsNotUtf8Conform)?)
.to_os_string(),
)),
},
Component::Normal(p) => Some(Cow::Borrowed(p)),
_ => return Err(TreeErrorKind::ContainsCurrentOrParentDirectory),
};
Ok(s)
}
impl IntoIterator for Tree {
type Item = Node;
type IntoIter = std::vec::IntoIter<Node>;
fn into_iter(self) -> Self::IntoIter {
self.nodes.into_iter()
}
}
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[derive(Clone, Debug, Setters)]
#[setters(into)]
#[non_exhaustive]
pub struct TreeStreamerOptions {
#[cfg_attr(feature = "clap", clap(flatten, next_help_heading = "Exclude options"))]
pub excludes: Excludes,
#[cfg_attr(feature = "clap", clap(long))]
pub recursive: bool,
}
impl Default for TreeStreamerOptions {
fn default() -> Self {
Self {
excludes: Excludes::default(),
recursive: true,
}
}
}
#[derive(Debug, Clone)]
pub struct NodeStreamer<'a, BE, I>
where
BE: DecryptReadBackend,
I: ReadGlobalIndex,
{
open_iterators: Vec<std::vec::IntoIter<Node>>,
inner: std::vec::IntoIter<Node>,
path: PathBuf,
be: BE,
index: &'a I,
overrides: Option<Override>,
recursive: bool,
}
impl<'a, BE, I> NodeStreamer<'a, BE, I>
where
BE: DecryptReadBackend,
I: ReadGlobalIndex,
{
pub fn new(be: BE, index: &'a I, node: &Node) -> RusticResult<Self> {
Self::new_streamer(be, index, node, None, true)
}
fn new_streamer(
be: BE,
index: &'a I,
node: &Node,
overrides: Option<Override>,
recursive: bool,
) -> RusticResult<Self> {
let inner = if node.is_dir() {
Tree::from_backend(&be, index, node.subtree.unwrap())?
.nodes
.into_iter()
} else {
vec![node.clone()].into_iter()
};
Ok(Self {
inner,
open_iterators: Vec::new(),
path: PathBuf::new(),
be,
index,
overrides,
recursive,
})
}
pub fn new_with_glob(
be: BE,
index: &'a I,
node: &Node,
opts: &TreeStreamerOptions,
) -> RusticResult<Self> {
let overrides = opts.excludes.as_override()?;
Self::new_streamer(be, index, node, Some(overrides), opts.recursive)
}
}
impl<BE, I> Iterator for NodeStreamer<'_, BE, I>
where
BE: DecryptReadBackend,
I: ReadGlobalIndex,
{
type Item = NodeStreamItem;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.inner.next() {
Some(node) => {
let path = self.path.join(node.name());
if self.recursive
&& let Some(id) = node.subtree
{
self.path.push(node.name());
let be = self.be.clone();
let tree = match Tree::from_backend(&be, self.index, id) {
Ok(tree) => tree,
Err(err) => return Some(Err(err)),
};
let old_inner = mem::replace(&mut self.inner, tree.nodes.into_iter());
self.open_iterators.push(old_inner);
}
if let Some(overrides) = &self.overrides
&& let Match::Ignore(_) = overrides.matched(&path, false)
{
continue;
}
return Some(Ok((path, node)));
}
None => match self.open_iterators.pop() {
Some(it) => {
self.inner = it;
_ = self.path.pop();
}
None => return None,
},
}
}
}
}
#[derive(Debug)]
pub struct TreeStreamerOnce {
visited: BTreeSet<TreeId>,
queue_in: Option<Sender<(PathBuf, TreeId, usize)>>,
queue_out: Receiver<RusticResult<(PathBuf, Tree, usize)>>,
p: Progress,
counter: Vec<usize>,
finished_ids: usize,
}
impl TreeStreamerOnce {
pub fn new<BE: DecryptReadBackend, I: ReadGlobalIndex>(
be: &BE,
index: &I,
ids: Vec<TreeId>,
p: Progress,
) -> RusticResult<Self> {
p.set_length(ids.len() as u64);
let (out_tx, out_rx) = bounded(constants::MAX_TREE_LOADER);
let (in_tx, in_rx) = unbounded();
for _ in 0..constants::MAX_TREE_LOADER {
let be = be.clone();
let index = index.clone();
let in_rx = in_rx.clone();
let out_tx = out_tx.clone();
let _join_handle = std::thread::spawn(move || {
for (path, id, count) in in_rx {
out_tx
.send(Tree::from_backend(&be, &index, id).map(|tree| (path, tree, count)))
.unwrap();
}
});
}
let counter = vec![0; ids.len()];
let mut streamer = Self {
visited: BTreeSet::new(),
queue_in: Some(in_tx),
queue_out: out_rx,
p,
counter,
finished_ids: 0,
};
for (count, id) in ids.into_iter().enumerate() {
if !streamer
.add_pending(PathBuf::new(), id, count)
.map_err(|err| {
RusticError::with_source(
ErrorKind::Internal,
"Failed to add tree ID `{tree_id}` to unbounded pending queue (`{count}`).",
err,
)
.attach_context("tree_id", id.to_string())
.attach_context("count", count.to_string())
.ask_report()
})?
{
streamer.p.inc(1);
streamer.finished_ids += 1;
}
}
Ok(streamer)
}
fn add_pending(&mut self, path: PathBuf, id: TreeId, count: usize) -> TreeResult<bool> {
if self.visited.insert(id) {
self.queue_in
.as_ref()
.unwrap()
.send((path, id, count))
.map_err(|err| TreeErrorKind::Channel {
kind: "sending crossbeam message",
source: err.into(),
})?;
self.counter[count] += 1;
Ok(true)
} else {
Ok(false)
}
}
}
impl Iterator for TreeStreamerOnce {
type Item = TreeStreamItem;
fn next(&mut self) -> Option<Self::Item> {
if self.counter.len() == self.finished_ids {
drop(self.queue_in.take());
self.p.finish();
return None;
}
let (path, tree, count) = match self.queue_out.recv() {
Ok(Ok(res)) => res,
Err(err) => {
return Some(Err(RusticError::with_source(
ErrorKind::Internal,
"Failed to receive tree from crossbeam channel.",
err,
)
.attach_context("finished_ids", self.finished_ids.to_string())
.ask_report()));
}
Ok(Err(err)) => return Some(Err(err)),
};
for node in &tree.nodes {
if let Some(id) = node.subtree {
let mut path = path.clone();
path.push(node.name());
match self.add_pending(path.clone(), id, count) {
Ok(_) => {}
Err(err) => {
return Some(Err(err).map_err(|err| {
RusticError::with_source(
ErrorKind::Internal,
"Failed to add tree ID `{tree_id}` to pending queue (`{count}`).",
err,
)
.attach_context("path", path.display().to_string())
.attach_context("tree_id", id.to_string())
.attach_context("count", count.to_string())
.ask_report()
}));
}
}
}
}
self.counter[count] -= 1;
if self.counter[count] == 0 {
self.p.inc(1);
self.finished_ids += 1;
}
Some(Ok((path, tree)))
}
}
pub(crate) fn merge_trees(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
trees: &[TreeId],
cmp: &impl Fn(&Node, &Node) -> Ordering,
save: &impl Fn(Tree) -> RusticResult<(TreeId, u64)>,
summary: &mut SnapshotSummary,
) -> RusticResult<TreeId> {
struct SortedNode(Node, usize);
impl PartialEq for SortedNode {
fn eq(&self, other: &Self) -> bool {
self.0.name == other.0.name
}
}
impl PartialOrd for SortedNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for SortedNode {}
impl Ord for SortedNode {
fn cmp(&self, other: &Self) -> Ordering {
self.0.name.cmp(&other.0.name).reverse()
}
}
let mut tree_iters: Vec<_> = trees
.iter()
.map(|id| Tree::from_backend(be, index, *id).map(IntoIterator::into_iter))
.collect::<RusticResult<_>>()?;
let mut elems = BinaryHeap::new();
for (num, iter) in tree_iters.iter_mut().enumerate() {
if let Some(node) = iter.next() {
elems.push(SortedNode(node, num));
}
}
let mut tree = Tree::new();
let (mut node, mut num) = match elems.pop() {
None => {
let (id, size) = save(tree)?;
summary.dirs_unmodified += 1;
summary.total_dirs_processed += 1;
summary.total_dirsize_processed += size;
return Ok(id);
}
Some(SortedNode(node, num)) => (node, num),
};
let mut nodes = Vec::new();
loop {
if let Some(next_node) = tree_iters[num].next() {
elems.push(SortedNode(next_node, num));
}
match elems.pop() {
None => {
nodes.push(node);
tree.add(merge_nodes(be, index, nodes, cmp, save, summary)?);
break;
}
Some(SortedNode(new_node, new_num)) if node.name != new_node.name => {
nodes.push(node);
tree.add(merge_nodes(be, index, nodes, cmp, save, summary)?);
nodes = Vec::new();
(node, num) = (new_node, new_num);
}
Some(SortedNode(new_node, new_num)) => {
nodes.push(node);
(node, num) = (new_node, new_num);
}
}
}
let (id, size) = save(tree)?;
if trees.contains(&id) {
summary.dirs_unmodified += 1;
} else {
summary.dirs_changed += 1;
}
summary.total_dirs_processed += 1;
summary.total_dirsize_processed += size;
Ok(id)
}
pub(crate) fn merge_nodes(
be: &impl DecryptReadBackend,
index: &impl ReadGlobalIndex,
nodes: Vec<Node>,
cmp: &impl Fn(&Node, &Node) -> Ordering,
save: &impl Fn(Tree) -> RusticResult<(TreeId, u64)>,
summary: &mut SnapshotSummary,
) -> RusticResult<Node> {
let trees: Vec<_> = nodes
.iter()
.filter(|node| node.is_dir())
.map(|node| node.subtree.unwrap())
.collect();
let mut node = nodes.into_iter().max_by(|n1, n2| cmp(n1, n2)).unwrap();
if node.is_dir() {
node.subtree = Some(merge_trees(be, index, &trees, cmp, save, summary)?);
} else {
summary.files_unmodified += 1;
summary.total_files_processed += 1;
summary.total_bytes_processed += node.meta.size;
}
Ok(node)
}