use Builder;
use mpmc::Queue;
use wheel::{Token, Wheel};
use futures::task::Task;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use std::thread::{self, Thread};
#[derive(Clone)]
pub struct Worker {
tx: Arc<Tx>,
}
#[derive(Clone)]
struct Tx {
chan: Arc<Chan>,
worker: Thread,
tolerance: Duration,
max_timeout: Duration,
}
struct Chan {
run: AtomicBool,
set_timeouts: SetQueue,
mod_timeouts: ModQueue,
}
struct SetTimeout(Instant, Task);
enum ModTimeout {
Move(Token, Instant, Task),
Cancel(Token, Instant),
}
type SetQueue = Queue<SetTimeout, Token>;
type ModQueue = Queue<ModTimeout, ()>;
impl Worker {
pub fn spawn(mut wheel: Wheel, builder: &Builder) -> Worker {
let tolerance = builder.get_tick_duration();
let max_timeout = builder.get_max_timeout();
let capacity = builder.get_channel_capacity();
assert!(wheel.available() >= capacity);
let chan = Arc::new(Chan {
run: AtomicBool::new(true),
set_timeouts: Queue::with_capacity(capacity, || wheel.reserve().unwrap()),
mod_timeouts: Queue::with_capacity(capacity, || ()),
});
let chan2 = chan.clone();
let t = thread::spawn(move || run(chan2, wheel));
Worker {
tx: Arc::new(Tx {
chan: chan,
worker: t.thread().clone(),
tolerance: tolerance,
max_timeout: max_timeout,
}),
}
}
pub fn tolerance(&self) -> &Duration {
&self.tx.tolerance
}
pub fn max_timeout(&self) -> &Duration {
&self.tx.max_timeout
}
pub fn set_timeout(&self, when: Instant, task: Task) -> Result<Token, Task> {
self.tx.chan.set_timeouts.push(SetTimeout(when, task))
.and_then(|ret| {
self.tx.worker.unpark();
Ok(ret)
})
.map_err(|SetTimeout(_, task)| task)
}
pub fn move_timeout(&self, token: Token, when: Instant, task: Task) -> Result<(), Task> {
self.tx.chan.mod_timeouts.push(ModTimeout::Move(token, when, task))
.and_then(|ret| {
self.tx.worker.unpark();
Ok(ret)
})
.map_err(|v| {
match v {
ModTimeout::Move(_, _, task) => task,
_ => unreachable!(),
}
})
}
pub fn cancel_timeout(&self, token: Token, instant: Instant) {
let _ = self.tx.chan.mod_timeouts.push(ModTimeout::Cancel(token, instant));
}
}
fn run(chan: Arc<Chan>, mut wheel: Wheel) {
while chan.run.load(Ordering::Relaxed) {
let now = Instant::now();
while let Some(task) = wheel.poll(now) {
task.unpark();
}
while let Some(token) = wheel.reserve() {
match chan.set_timeouts.pop(token) {
Ok((SetTimeout(when, task), token)) => {
wheel.set_timeout(token, when, task);
}
Err(token) => {
wheel.release(token);
break;
}
}
}
loop {
match chan.mod_timeouts.pop(()) {
Ok((ModTimeout::Move(token, when, task), _)) => {
wheel.move_timeout(token, when, task);
}
Ok((ModTimeout::Cancel(token, when), _)) => {
wheel.cancel(token, when);
}
Err(_) => break,
}
}
let now = Instant::now();
if let Some(next) = wheel.next_timeout() {
if next > now {
thread::park_timeout(next - now);
}
} else {
thread::park();
}
}
}
impl Drop for Tx {
fn drop(&mut self) {
self.chan.run.store(false, Ordering::Relaxed);
self.worker.unpark();
}
}