#[macro_use]
extern crate log;
extern crate atomicmonitor;
extern crate futures;
extern crate atom;
extern crate monitor;
extern crate time;
extern crate smallqueue;
extern crate atomic;
extern crate stopwatch;
extern crate through;
pub mod channel;
pub mod pool;
pub mod prelude;
pub mod run;
pub mod scoped;
pub mod timescheduler;
#[cfg(test)]
pub mod test;
pub mod push;
use channel::Channel;
use std::sync::Arc;
use std::fmt::{Debug, Formatter};
use std::fmt;
use atomicmonitor::AtomMonitor;
use atomicmonitor::atomic::{Atomic, Ordering};
use futures::Future;
use futures::executor::{spawn, Spawn};
use time::Duration;
pub struct RunningTask {
pub spawn: Spawn<Box<dyn Future<Item=(), Error=()> + Send + 'static>>,
pub close_counted: Atomic<bool>,
}
impl RunningTask {
fn new(future: impl Future<Item=(), Error=()> + Send + 'static) -> Self {
let future: Box<dyn Future<Item=(), Error=()> + Send + 'static> = Box::new(future);
RunningTask {
spawn: spawn(future),
close_counted: Atomic::new(false),
}
}
}
impl Debug for RunningTask {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
f.write_str("RunningTask")
}
}
pub struct StatusBit {
inner: Option<StatusBitInner>
}
struct StatusBitInner {
monitor: Arc<AtomMonitor<u64>>,
mask: u64
}
pub struct StatusBitIndexTooBig(pub usize);
impl StatusBit {
pub fn new() -> Self {
StatusBit {
inner: None
}
}
pub fn activate(&mut self, monitor: Arc<AtomMonitor<u64>>, index: usize) -> Result<(), StatusBitIndexTooBig> {
if index < 64 {
self.inner = Some(StatusBitInner {
monitor,
mask: 0x1 << index as u64
});
Ok(())
} else {
Err(StatusBitIndexTooBig(index))
}
}
pub fn set(&self, value: bool) {
if let Some(ref inner) = self.inner {
if value {
inner.monitor.mutate(|field| {
field.fetch_or(inner.mask, Ordering::SeqCst)
})
} else {
inner.monitor.mutate(|field| {
field.fetch_and(!inner.mask, Ordering::SeqCst)
})
};
}
}
}
pub trait PoolBehavior: Sized + Send + Sync + 'static {
type ChannelKey: Copy + Clone + Send + Sync + 'static;
fn config(&mut self) -> PoolConfig<Self>;
fn touch_channel<O>(&self, key: Self::ChannelKey, toucher: impl ChannelToucher<O>) -> O;
fn touch_channel_mut<O>(&mut self, key: Self::ChannelKey, toucher: impl ChannelToucherMut<O>) -> O;
fn followup(&self, from: Self::ChannelKey, task: RunningTask);
}
pub trait ChannelToucher<O>: Sized {
fn touch(&mut self, channel: & impl Channel) -> O;
}
pub trait ChannelToucherMut<O>: Sized {
fn touch_mut(&mut self, channel: &mut impl Channel) -> O;
}
pub struct PoolConfig<Behavior: PoolBehavior> {
pub threads: u32,
pub schedule: ScheduleAlgorithm,
pub levels: Vec<Vec<ChannelParams<Behavior>>>,
}
#[derive(Clone, Debug)]
pub enum ScheduleAlgorithm {
HighestFirst,
RoundRobin(Vec<Duration>)
}
pub struct ChannelParams<Behavior: PoolBehavior> {
pub key: Behavior::ChannelKey,
pub complete_on_close: bool,
}