1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#[macro_use]
extern crate log;
extern crate atomicmonitor;
extern crate futures;
extern crate atom;
extern crate monitor;
extern crate time;
extern crate smallqueue;
extern crate atomic;
extern crate stopwatch;
extern crate through;
pub mod channel;
pub mod pool;
pub mod prelude;
pub mod run;
pub mod scoped;
pub mod timescheduler;
#[cfg(test)]
pub mod test;
pub mod push;
use channel::Channel;
use std::sync::Arc;
use std::fmt::{Debug, Formatter};
use std::fmt;
use atomicmonitor::AtomMonitor;
use atomicmonitor::atomic::{Atomic, Ordering};
use futures::Future;
use futures::executor::{spawn, Spawn};
use time::Duration;
pub struct RunningTask {
pub spawn: Spawn<Box<dyn Future<Item=(), Error=()> + Send + 'static>>,
pub close_counted: Atomic<bool>,
}
impl RunningTask {
fn new(future: impl Future<Item=(), Error=()> + Send + 'static) -> Self {
let future: Box<dyn Future<Item=(), Error=()> + Send + 'static> = Box::new(future);
RunningTask {
spawn: spawn(future),
close_counted: Atomic::new(false),
}
}
}
impl Debug for RunningTask {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
f.write_str("RunningTask")
}
}
pub struct StatusBit {
inner: Option<StatusBitInner>
}
struct StatusBitInner {
monitor: Arc<AtomMonitor<u64>>,
mask: u64
}
pub struct StatusBitIndexTooBig(pub usize);
impl StatusBit {
pub fn new() -> Self {
StatusBit {
inner: None
}
}
pub fn activate(&mut self, monitor: Arc<AtomMonitor<u64>>, index: usize) -> Result<(), StatusBitIndexTooBig> {
if index < 64 {
self.inner = Some(StatusBitInner {
monitor,
mask: 0x1 << index as u64
});
Ok(())
} else {
Err(StatusBitIndexTooBig(index))
}
}
pub fn set(&self, value: bool) {
if let Some(ref inner) = self.inner {
if value {
inner.monitor.mutate(|field| {
field.fetch_or(inner.mask, Ordering::SeqCst)
})
} else {
inner.monitor.mutate(|field| {
field.fetch_and(!inner.mask, Ordering::SeqCst)
})
};
}
}
}
pub trait PoolBehavior: Sized + Send + Sync + 'static {
type ChannelKey: Copy + Clone + Send + Sync + 'static;
fn config(&mut self) -> PoolConfig<Self>;
fn touch_channel<O>(&self, key: Self::ChannelKey, toucher: impl ChannelToucher<O>) -> O;
fn touch_channel_mut<O>(&mut self, key: Self::ChannelKey, toucher: impl ChannelToucherMut<O>) -> O;
fn followup(&self, from: Self::ChannelKey, task: RunningTask);
}
pub trait ChannelToucher<O>: Sized {
fn touch(&mut self, channel: & impl Channel) -> O;
}
pub trait ChannelToucherMut<O>: Sized {
fn touch_mut(&mut self, channel: &mut impl Channel) -> O;
}
pub struct PoolConfig<Behavior: PoolBehavior> {
pub threads: u32,
pub schedule: ScheduleAlgorithm,
pub levels: Vec<Vec<ChannelParams<Behavior>>>,
}
#[derive(Clone, Debug)]
pub enum ScheduleAlgorithm {
HighestFirst,
RoundRobin(Vec<Duration>)
}
pub struct ChannelParams<Behavior: PoolBehavior> {
pub key: Behavior::ChannelKey,
pub complete_on_close: bool,
}