use crate::{
hashes::{hash_file, noatime_open, Estimate},
node::{into_tracker, NodeWriter, SureNode},
progress::Progress,
store::{Store, TempCleaner},
Error, Result,
};
use crossbeam::channel::{bounded, Sender};
use data_encoding::HEXLOWER;
use log::{debug, error};
use rusqlite::{types::ToSql, Connection};
use std::{
cmp::Ordering,
io::Write,
mem,
path::PathBuf,
sync::{mpsc::sync_channel, Arc, Mutex},
thread,
};
pub trait Source {
fn iter(&self) -> Result<Box<dyn Iterator<Item = Result<SureNode>> + Send>>;
}
pub struct HashUpdater<'n, S> {
source: S,
store: &'n dyn Store,
}
pub struct HashMerger<S> {
source: S,
conn: Connection,
_temp: Box<dyn TempCleaner>,
}
impl<'a, S: Source> HashUpdater<'a, S> {
pub fn new(source: S, store: &dyn Store) -> HashUpdater<S> {
HashUpdater { source, store }
}
pub fn compute(mut self, base: &str, estimate: &Estimate) -> Result<HashMerger<S>> {
let meter = Arc::new(Mutex::new(Progress::new(estimate.files, estimate.bytes)));
let (mut conn, temp) = self.setup_db()?;
let (tx, rx) = sync_channel(num_cpus::get());
let iter = into_tracker(self.source.iter()?, base);
let mut count = 0;
let meter2 = meter.clone();
thread::spawn(move || {
for entry in iter {
let entry = entry.unwrap();
if entry.node.needs_hash() {
let path = entry.path.unwrap();
match noatime_open(&path) {
Ok(mut fd) => match hash_file(&mut fd) {
Ok(ref h) => {
tx.send(Some(HashInfo {
id: count,
hash: h.as_ref().to_owned(),
}))
.unwrap();
}
Err(e) => {
error!("Unable to hash file: '{:?}' ({})", path, e);
}
},
Err(e) => {
error!("Unable to open '{:?}' for hashing ({})", path, e);
}
}
count += 1;
meter2.lock().unwrap().update(1, entry.node.size());
}
}
tx.send(None).unwrap();
});
let trans = conn.transaction()?;
while let Some(info) = rx.recv()? {
trans.execute(
"INSERT INTO hashes (id, hash) VALUES (?1, ?2)",
&[&info.id as &dyn ToSql, &info.hash as &dyn ToSql],
)?;
}
trans.commit()?;
meter.lock().unwrap().flush();
Ok(HashMerger {
source: self.source,
conn,
_temp: temp,
})
}
pub fn compute_parallel(mut self, base: &str, estimate: &Estimate) -> Result<HashMerger<S>> {
let meter = Arc::new(Mutex::new(Progress::new(estimate.files, estimate.bytes)));
let iter = into_tracker(self.source.iter()?, base);
let (mut conn, temp) = self.setup_db()?;
let trans = conn.transaction()?;
let meter2 = meter.clone();
crossbeam::scope(move |s| {
let ncpu = num_cpus::get();
let (work_send, work_recv) = bounded(ncpu);
let (result_send, result_recv) = bounded(ncpu);
s.spawn(move |_| {
let mut count = 0;
for entry in iter {
let entry = entry.unwrap(); if entry.node.needs_hash() {
let path = entry.path.unwrap();
work_send
.send(HashWork {
id: count,
path,
size: entry.node.size(),
})
.unwrap();
count += 1;
}
}
});
for _ in 0..ncpu {
let work_recv = work_recv.clone();
let result_send = result_send.clone();
let meter2 = meter2.clone();
s.spawn(move |_| {
for work in work_recv {
hash_one_file(&work, &result_send, &meter2);
}
});
}
drop(result_send);
for info in result_recv {
trans
.execute(
"INSERT INTO hashes (id, hash) VALUES (?1, ?2)",
&[&info.id as &dyn ToSql, &info.hash as &dyn ToSql],
)
.unwrap();
}
trans.commit()?;
ok_result()
})
.map_err(|e| Error::Hash(format!("{:?}", e)))??;
meter.lock().unwrap().flush();
Ok(HashMerger {
source: self.source,
conn,
_temp: temp,
})
}
fn setup_db(&mut self) -> Result<(Connection, Box<dyn TempCleaner>)> {
let tmp = self.store.make_temp()?.into_loader()?;
let conn = Connection::open(tmp.path_ref())?;
conn.execute(
"CREATE TABLE hashes (
id INTEGER PRIMARY KEY,
hash BLOB)",
[],
)?;
Ok((conn, tmp.into_cleaner()?))
}
}
fn hash_one_file(work: &HashWork, sender: &Sender<HashInfo>, meter: &Arc<Mutex<Progress>>) {
match noatime_open(&work.path) {
Ok(mut fd) => match hash_file(&mut fd) {
Ok(ref h) => {
sender
.send(HashInfo {
id: work.id,
hash: h.as_ref().to_owned(),
})
.unwrap();
}
Err(e) => {
error!("Unable to hash file: '{:?}' ({})", work.path, e);
}
},
Err(e) => {
error!("Unable to open '{:?}' for hashing ({})", work.path, e);
}
}
meter.lock().unwrap().update(1, work.size);
}
fn ok_result() -> Result<()> {
Ok(())
}
impl<S: Source> HashMerger<S> {
pub fn merge<W: Write>(self, writer: &mut NodeWriter<W>) -> Result<()> {
let mut stmt = self
.conn
.prepare("SELECT id, hash FROM hashes ORDER BY id")?;
let mut hash_iter = stmt
.query_map([], |row| {
Ok(HashInfo {
id: row.get(0)?,
hash: row.get(1)?,
})
})?
.peekable();
let mut count = 0;
for entry in self.source.iter()? {
let mut entry = entry?;
if entry.needs_hash() {
let hnode = loop {
match hash_iter.peek() {
Some(Ok(hnode)) => {
match count.cmp(&hnode.id) {
Ordering::Equal => {
let node = hash_iter.next().unwrap()?;
break Some(node);
}
Ordering::Less => {
break None;
}
_ => panic!("Out of sequence hash"),
}
}
Some(Err(e)) => {
return Err(Error::WrappedSql(format!("{:?}", e)));
}
None => break None,
}
};
if let Some(HashInfo { hash, .. }) = &hnode {
let hex = HEXLOWER.encode(hash);
entry.atts_mut().unwrap().insert("sha1".to_string(), hex);
}
count += 1;
}
writer.write_node(&entry)?;
}
Ok(())
}
}
#[derive(Debug)]
struct HashInfo {
id: i64,
hash: Vec<u8>,
}
#[derive(Debug)]
struct HashWork {
id: i64,
size: u64,
path: PathBuf,
}
pub struct HashCombiner<Iold: Iterator, Inew: Iterator> {
left: SureNode,
right: SureNode,
left_iter: Iold,
right_iter: Inew,
state: Vec<CombineState>,
seen_root: bool,
}
#[derive(Debug)]
enum CombineState {
LeftDirs,
RightDirs,
SameDirs,
SameFiles,
}
impl<Iold, Inew> HashCombiner<Iold, Inew>
where
Iold: Iterator<Item = Result<SureNode>>,
Inew: Iterator<Item = Result<SureNode>>,
{
pub fn new(mut left_iter: Iold, mut right_iter: Inew) -> Result<HashCombiner<Iold, Inew>> {
let left = match left_iter.next() {
None => return Err(Error::EmptyLeftIterator),
Some(Err(e)) => return Err(e),
Some(Ok(node)) => node,
};
let right = match right_iter.next() {
None => return Err(Error::EmptyRightIterator),
Some(Err(e)) => return Err(e),
Some(Ok(node)) => node,
};
Ok(HashCombiner {
left,
right,
left_iter,
right_iter,
state: vec![],
seen_root: false,
})
}
fn next_left(&mut self) -> Result<SureNode> {
let next = match self.left_iter.next() {
None => SureNode::Leave,
Some(Ok(node)) => node,
Some(Err(e)) => return Err(e),
};
Ok(mem::replace(&mut self.left, next))
}
fn next_right(&mut self) -> Result<SureNode> {
let next = match self.right_iter.next() {
None => SureNode::Leave,
Some(Ok(node)) => node,
Some(Err(e)) => return Err(e),
};
Ok(mem::replace(&mut self.right, next))
}
}
enum VisitResult {
Continue,
Node(SureNode),
}
macro_rules! vre {
($err:expr) => {
Err($err)
};
}
macro_rules! vro {
($result:expr) => {
Ok(VisitResult::Node($result))
};
}
impl<Iold, Inew> Iterator for HashCombiner<Iold, Inew>
where
Iold: Iterator<Item = Result<SureNode>>,
Inew: Iterator<Item = Result<SureNode>>,
{
type Item = Result<SureNode>;
fn next(&mut self) -> Option<Result<SureNode>> {
loop {
if self.seen_root && self.state.is_empty() {
return None;
}
let vr = match self.state.pop() {
None => self.visit_root(),
Some(CombineState::SameDirs) => self.visit_samedir(),
Some(CombineState::SameFiles) => self.visit_samefiles(),
Some(CombineState::RightDirs) => self.visit_rightdirs(),
Some(CombineState::LeftDirs) => self.visit_leftdirs(),
};
match vr {
Ok(VisitResult::Continue) => (),
Ok(VisitResult::Node(node)) => return Some(Ok(node)),
Err(e) => return Some(Err(e)),
}
}
}
}
impl<Iold, Inew> HashCombiner<Iold, Inew>
where
Iold: Iterator<Item = Result<SureNode>>,
Inew: Iterator<Item = Result<SureNode>>,
{
fn visit_root(&mut self) -> Result<VisitResult> {
if !self.left.is_enter() {
vre!(Error::UnexpectedLeftNode)
} else if !self.right.is_enter() {
vre!(Error::UnexpectedRightNode)
} else if self.left.name() != "__root__" || self.right.name() != "__root__" {
vre!(Error::IncorrectName)
} else {
let _ = self.next_left()?;
let rnode = self.next_right()?;
self.state.push(CombineState::SameDirs);
self.seen_root = true;
vro!(rnode)
}
}
fn visit_samedir(&mut self) -> Result<VisitResult> {
debug!("visit samedir: {:?}, {:?}", self.left, self.right);
match (self.left.is_sep(), self.right.is_sep()) {
(true, true) => {
let _ = self.next_left()?;
let rnode = self.next_right()?;
self.state.push(CombineState::SameFiles);
vro!(rnode)
}
(false, false) => {
match self.left.name().cmp(&self.right.name()) {
Ordering::Equal => {
self.state.push(CombineState::SameDirs);
self.state.push(CombineState::SameDirs);
let _ = self.next_left()?;
vro!(self.next_right()?)
}
Ordering::Less => {
let _ = self.next_left()?;
self.state.push(CombineState::SameDirs);
self.state.push(CombineState::LeftDirs);
Ok(VisitResult::Continue)
}
Ordering::Greater => {
self.state.push(CombineState::SameDirs);
self.state.push(CombineState::RightDirs);
vro!(self.next_right()?)
}
}
}
(false, true) => {
let _ = self.next_left()?;
self.state.push(CombineState::SameDirs);
self.state.push(CombineState::LeftDirs);
Ok(VisitResult::Continue)
}
(true, false) => {
self.state.push(CombineState::SameDirs);
self.state.push(CombineState::RightDirs);
vro!(self.next_right()?)
}
}
}
fn visit_samefiles(&mut self) -> Result<VisitResult> {
debug!("visit samefiles: {:?}, {:?}", self.left, self.right);
match (self.left.is_leave(), self.right.is_leave()) {
(true, true) => {
let _ = self.next_left()?;
vro!(self.next_right()?)
}
(true, false) => {
self.state.push(CombineState::SameFiles);
vro!(self.next_right()?)
}
(false, true) => {
self.state.push(CombineState::SameFiles);
let _ = self.next_left()?;
Ok(VisitResult::Continue)
}
(false, false) => {
self.state.push(CombineState::SameFiles);
match self.left.name().cmp(&self.right.name()) {
Ordering::Equal => {
let left = self.next_left()?;
let mut right = self.next_right()?;
maybe_copy_sha(&left, &mut right);
vro!(right)
}
Ordering::Less => {
let _ = self.next_left()?;
Ok(VisitResult::Continue)
}
Ordering::Greater => {
vro!(self.next_right()?)
}
}
}
}
}
fn visit_rightdirs(&mut self) -> Result<VisitResult> {
debug!("visit rightdirs: {:?}, {:?}", self.left, self.right);
if self.right.is_sep() {
self.state.push(CombineState::RightDirs);
} else if self.right.is_enter() {
self.state.push(CombineState::RightDirs);
self.state.push(CombineState::RightDirs);
} else if self.right.is_leave() {
} else {
self.state.push(CombineState::RightDirs);
}
vro!(self.next_right()?)
}
fn visit_leftdirs(&mut self) -> Result<VisitResult> {
debug!("visit rightdirs: {:?}, {:?}", self.left, self.right);
if self.left.is_sep() {
self.state.push(CombineState::LeftDirs);
} else if self.left.is_enter() {
self.state.push(CombineState::LeftDirs);
self.state.push(CombineState::LeftDirs);
} else if self.left.is_leave() {
} else {
self.state.push(CombineState::LeftDirs);
}
let _ = self.next_left()?;
Ok(VisitResult::Continue)
}
}
fn maybe_copy_sha(left: &SureNode, right: &mut SureNode) {
let latts = left.atts().unwrap();
let ratts = right.atts_mut().unwrap();
if ratts.contains_key("sha1") {
return;
}
if latts["kind"] != "file" || ratts["kind"] != "file" {
return;
}
if latts.get("ino") != ratts.get("ino") || latts.get("ctime") != ratts.get("ctime") {
return;
}
match latts.get("sha1") {
None => (),
Some(v) => {
ratts.insert("sha1".to_string(), v.to_string());
}
}
}