use super::super::{op, DebouncedEvent};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{collections::VecDeque, sync::MutexGuard};
use debounce::{OperationsBuffer, OperationsBufferInner};
#[derive(PartialEq, Eq)]
struct ScheduledEvent {
id: u64,
when: Instant,
path: PathBuf,
}
#[derive(Default)]
struct WorkerSharedState {
is_stopped: bool,
events: VecDeque<ScheduledEvent>,
}
struct ScheduleWorker {
state: Arc<(Mutex<WorkerSharedState>, Condvar)>,
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
}
impl ScheduleWorker {
fn fire_due_events<'a>(
&'a self,
now: Instant,
state: MutexGuard<'a, WorkerSharedState>,
) -> (Option<Instant>, MutexGuard<'a, WorkerSharedState>) {
let mut state = Some(state);
let (mut state, mut op_buf) = loop {
let state = state.take().unwrap_or_else(|| self.state.0.lock().unwrap());
match self.operations_buffer.try_lock() {
Ok(op_buf) => break (state, op_buf),
Err(::std::sync::TryLockError::Poisoned { .. }) => return (None, state),
Err(::std::sync::TryLockError::WouldBlock) => {
drop(state);
::std::thread::yield_now();
}
}
};
while let Some(event) = state.events.pop_front() {
if event.when <= now {
self.fire_event(event, &mut op_buf)
} else {
let next_when = event.when;
state.events.push_front(event);
return (Some(next_when), state);
}
}
(None, state)
}
fn fire_event(
&self,
ev: ScheduledEvent,
op_buf: &mut impl DerefMut<Target = OperationsBufferInner>,
) {
let ScheduledEvent { path, .. } = ev;
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
}
}
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
}
} else {
}
}
fn run(&mut self) {
let mut state = self.state.0.lock().unwrap();
loop {
let now = Instant::now();
let (next_when, state_out) = self.fire_due_events(now, state);
state = state_out;
if state.is_stopped {
break;
}
state = if let Some(next_when) = next_when {
self.state.1.wait_timeout(state, next_when - now).unwrap().0
} else {
self.state.1.wait(state).unwrap()
};
}
}
}
pub struct WatchTimer {
counter: u64,
state: Arc<(Mutex<WorkerSharedState>, Condvar)>,
delay: Duration,
}
impl WatchTimer {
pub fn new(
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
delay: Duration,
) -> WatchTimer {
let state = Arc::new((Mutex::new(WorkerSharedState::default()), Condvar::new()));
let worker_state = state.clone();
thread::spawn(move || {
ScheduleWorker {
state: worker_state,
tx,
operations_buffer,
}
.run();
});
WatchTimer {
counter: 0,
state,
delay,
}
}
pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);
{
let mut state = self.state.0.lock().unwrap();
state.events.push_back(ScheduledEvent {
id: self.counter,
when: Instant::now() + self.delay,
path,
});
}
self.state.1.notify_one();
self.counter
}
pub fn ignore(&self, id: u64) {
let mut state = self.state.0.lock().unwrap();
let index = state.events.iter().rposition(|e| e.id == id);
if let Some(index) = index {
state.events.remove(index);
}
}
}
impl Drop for WatchTimer {
fn drop(&mut self) {
{
let mut state = self.state.0.lock().unwrap();
state.is_stopped = true;
}
self.state.1.notify_one();
}
}