use failure::{bail, Fallible};
use log::{debug, warn};
use notify::{RawEvent, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::{HashMap, HashSet};
use std::io::{stdin, BufRead};
use std::path::PathBuf;
use std::process::exit;
use std::sync::mpsc::channel;
use std::thread;
fn encode(s: &str) -> impl AsRef<str> {
percent_encoding::utf8_percent_encode(s, percent_encoding::SIMPLE_ENCODE_SET).to_string()
}
fn decode<'a>(s: &'a str) -> impl AsRef<str> + 'a {
percent_encoding::percent_decode(s.as_bytes()).decode_utf8_lossy()
}
fn send_cmd(cmd: &str, args: &[&str]) {
let mut output = cmd.to_owned();
for arg in args {
output += " ";
output += &encode(arg).as_ref();
}
debug!(">> {}", output);
println!("{}", output);
}
fn send_ack() {
send_cmd("OK", &[]);
}
fn send_changes(replica: &str) {
send_cmd("CHANGES", &[replica]);
}
fn send_recursive(path: &str) {
send_cmd("RECURSIVE", &[path]);
}
fn send_done() {
send_cmd("DONE", &[]);
}
fn send_error(msg: &str) {
send_cmd("ERROR", &[msg]);
exit(1);
}
fn parse_input(input: &str) -> Fallible<(String, Vec<String>)> {
let mut cmd = String::new();
let mut args = vec![];
for (idx, word) in input.split_whitespace().enumerate() {
if idx == 0 {
cmd = word.to_owned();
} else {
args.push(decode(word).as_ref().to_owned())
}
}
Ok((cmd, args))
}
#[derive(Debug)]
enum Event {
Input(String),
FSEvent(RawEvent),
}
fn main() -> Fallible<()> {
env_logger::init();
let mut replicas: HashMap<_, _> = HashMap::new();
let mut link_map: HashMap<_, HashSet<_>> = HashMap::new();
let mut pending_changes: HashMap<_, HashSet<PathBuf>> = HashMap::new();
let (tx, rx) = channel();
let tx_clone = tx.clone();
thread::spawn(move || -> Fallible<()> {
let stdin = stdin();
let mut handle = stdin.lock();
loop {
let mut input = String::new();
handle.read_line(&mut input)?;
tx_clone.send(Event::Input(input))?;
}
});
let (fsevent_tx, fsevent_rx) = channel();
let mut watcher: RecommendedWatcher = Watcher::new_raw(fsevent_tx)?;
let tx_clone = tx.clone();
thread::spawn(move || -> Fallible<()> {
loop {
tx_clone.send(Event::FSEvent(fsevent_rx.recv()?))?;
}
});
send_cmd("VERSION", &["1"]);
let mut replica_path = PathBuf::new();
loop {
let event = rx.recv()?;
match event {
Event::Input(input) => {
debug!("<< {}", input.trim());
let (cmd, args) = parse_input(&input)?;
match cmd.as_str() {
"VERSION" => {
let version = &args[0];
if version != "1" {
bail!("Unexpected version: {:?}", version);
}
}
"START" => {
let replica_id = args[0].clone();
replica_path = PathBuf::from(&args[1]);
if let Some(dir) = args.get(2) {
replica_path = replica_path.join(dir);
} else {
watcher.watch(&replica_path, RecursiveMode::Recursive)?;
replicas.insert(replica_id, replica_path.clone());
debug!("replicas: {:?}", replicas);
}
send_ack();
}
"DIR" => {
send_ack();
}
"LINK" => {
let path = replica_path.join(args.get(0).cloned().unwrap_or_default());
let realpath = path.canonicalize()?;
watcher.watch(&realpath, RecursiveMode::Recursive)?;
link_map.entry(realpath).or_default().insert(path);
debug!("link_map: {:?}", link_map);
send_ack();
}
"WAIT" => {
let replica_id = &args[0];
if !replicas.contains_key(replica_id) {
send_error(&format!("Unknown replica: {}", replica_id));
}
}
"CHANGES" => {
let replica = &args[0];
for c in pending_changes.remove(replica).unwrap_or_default() {
send_recursive(&c.to_string_lossy());
}
debug!("pending_changes: {:?}", pending_changes);
send_done();
}
"RESET" => {
let replica_id = &args[0];
if let Some(path) = replicas.remove(replica_id) {
watcher.unwatch(&path)?;
}
debug!("replicas: {:?}", replicas);
}
"DEBUG" | "DONE" => {
}
_ => {
send_error(&format!("Unexpected cmd: {}", cmd));
}
}
}
Event::FSEvent(fsevent) => {
debug!("FS event: {:?}", fsevent);
let mut matched_replica_ids = HashSet::new();
if let Some(path) = fsevent.path {
let mut paths = vec![path.clone()];
for (realpath, links) in &link_map {
if let Ok(postfix) = path.strip_prefix(realpath) {
for link in links {
paths.push(link.join(postfix));
}
}
}
for path in paths {
for (id, replica) in &replicas {
if path.starts_with(replica) {
matched_replica_ids.insert(id);
pending_changes
.entry(id.clone())
.or_default()
.insert(path.strip_prefix(replica)?.into());
}
}
}
}
if matched_replica_ids.is_empty() {
warn!("No replica found for event!")
}
for id in &matched_replica_ids {
send_changes(id);
}
}
}
}
}