use std::rc::Rc;
use std::cell::RefCell;
use std::thread::Thread;
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};
use std::cmp::Reverse;
use std::sync::mpsc::{Sender, Receiver};
pub trait Scheduler {
fn activate(&mut self, path: &[usize]);
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
}
impl Scheduler for Box<dyn Scheduler> {
fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
}
#[derive(Debug)]
pub struct Activations {
clean: usize,
bounds: Vec<(usize, usize)>,
slices: Vec<usize>,
buffer: Vec<usize>,
tx: Sender<Vec<usize>>,
rx: Receiver<Vec<usize>>,
timer: Option<Instant>,
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
}
impl Activations {
pub fn new(timer: Option<Instant>) -> Self {
let (tx, rx) = std::sync::mpsc::channel();
Self {
clean: 0,
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
tx,
rx,
timer,
queue: BinaryHeap::new(),
}
}
pub fn activate(&mut self, path: &[usize]) {
self.bounds.push((self.slices.len(), path.len()));
self.slices.extend(path);
}
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
if let Some(timer) = self.timer {
if delay == Duration::new(0, 0) {
self.activate(path);
}
else {
let moment = timer.elapsed() + delay;
self.queue.push(Reverse((moment, path.to_vec())));
}
}
else {
self.activate(path);
}
}
pub fn advance(&mut self) {
while let Ok(path) = self.rx.try_recv() {
self.activate(&path[..])
}
if let Some(timer) = self.timer {
if !self.queue.is_empty() {
let now = timer.elapsed();
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
let Reverse((_time, path)) = self.queue.pop().unwrap();
self.activate(&path[..]);
}
}
}
self.bounds.drain(.. self.clean);
{ let slices = &self.slices[..];
self.bounds.sort_unstable_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
}
self.buffer.clear();
for (offset, length) in self.bounds.iter_mut() {
self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
*offset = self.buffer.len() - *length;
}
::std::mem::swap(&mut self.buffer, &mut self.slices);
self.clean = self.bounds.len();
}
pub fn map_active(&self, logic: impl Fn(&[usize])) {
for (offset, length) in self.bounds.iter() {
logic(&self.slices[*offset .. (*offset + *length)]);
}
}
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
let position =
self.bounds[..self.clean]
.binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
let position = match position {
Ok(x) => x,
Err(x) => x,
};
let mut previous = None;
self.bounds
.iter()
.cloned()
.skip(position)
.map(|(offset, length)| &self.slices[offset .. (offset + length)])
.take_while(|x| x.starts_with(path))
.for_each(|x| {
if let Some(extension) = x.get(path.len()) {
if previous != Some(*extension) {
action(*extension);
previous = Some(*extension);
}
}
});
}
pub fn sync(&self) -> SyncActivations {
SyncActivations {
tx: self.tx.clone(),
thread: std::thread::current(),
}
}
pub fn empty_for(&self) -> Option<Duration> {
if !self.bounds.is_empty() || self.timer.is_none() {
Some(Duration::new(0,0))
}
else {
self.queue.peek().map(|Reverse((t,_a))| {
let elapsed = self.timer.unwrap().elapsed();
if t < &elapsed { Duration::new(0,0) }
else { *t - elapsed }
})
}
}
}
#[derive(Clone, Debug)]
pub struct SyncActivations {
tx: Sender<Vec<usize>>,
thread: Thread,
}
impl SyncActivations {
pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
self.activate_batch(std::iter::once(path))
}
pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
where
I: IntoIterator<Item = Vec<usize>>
{
for path in paths.into_iter() {
self.tx.send(path).map_err(|_| SyncActivationError)?;
}
self.thread.unpark();
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct Activator {
path: Rc<[usize]>,
queue: Rc<RefCell<Activations>>,
}
impl Activator {
pub fn new(path: Rc<[usize]>, queue: Rc<RefCell<Activations>>) -> Self {
Self {
path,
queue,
}
}
pub fn activate(&self) {
self.queue
.borrow_mut()
.activate(&self.path[..]);
}
pub fn activate_after(&self, delay: Duration) {
if delay == Duration::new(0, 0) {
self.activate();
}
else {
self.queue
.borrow_mut()
.activate_after(&self.path[..], delay);
}
}
}
#[derive(Clone, Debug)]
pub struct SyncActivator {
path: Vec<usize>,
queue: SyncActivations,
}
impl SyncActivator {
pub fn new(path: Vec<usize>, queue: SyncActivations) -> Self {
Self {
path,
queue,
}
}
pub fn activate(&self) -> Result<(), SyncActivationError> {
self.queue.activate(self.path.clone())
}
}
#[derive(Clone, Copy, Debug)]
pub struct SyncActivationError;
impl std::fmt::Display for SyncActivationError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("sync activation error in timely")
}
}
impl std::error::Error for SyncActivationError {}
#[derive(Clone, Debug)]
pub struct ActivateOnDrop<T> {
wrapped: T,
address: Rc<[usize]>,
activator: Rc<RefCell<Activations>>,
}
use std::ops::{Deref, DerefMut};
impl<T> ActivateOnDrop<T> {
pub fn new(wrapped: T, address: Rc<[usize]>, activator: Rc<RefCell<Activations>>) -> Self {
Self { wrapped, address, activator }
}
}
impl<T> Deref for ActivateOnDrop<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.wrapped
}
}
impl<T> DerefMut for ActivateOnDrop<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.wrapped
}
}
impl<T> Drop for ActivateOnDrop<T> {
fn drop(&mut self) {
self.activator.borrow_mut().activate(&self.address[..]);
}
}