declarative_dataflow/server/
scheduler.rs1use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::rc::Weak;
6use std::time::{Duration, Instant};
7
8use timely::scheduling::Activator;
9
10#[derive(Default)]
17pub struct Scheduler {
18 activator_queue: BinaryHeap<TimedActivator>,
19}
20
21impl Scheduler {
22 pub fn new() -> Self {
24 Scheduler {
25 activator_queue: BinaryHeap::new(),
26 }
27 }
28
29 pub fn has_pending(&self) -> bool {
32 if let Some(ref timed_activator) = self.activator_queue.peek() {
33 timed_activator.is_ready()
34 } else {
35 false
36 }
37 }
38
39 pub fn until_next(&self) -> Option<Duration> {
43 if let Some(ref timed_activator) = self.activator_queue.peek() {
44 Some(timed_activator.until_ready())
45 } else {
46 None
47 }
48 }
49
50 pub fn schedule_at(&mut self, at: Instant, activator: Weak<Activator>) {
53 self.activator_queue.push(TimedActivator {
54 at,
55 activator,
56 event: None,
57 });
58 }
59
60 pub fn schedule_now(&mut self, activator: Weak<Activator>) {
63 self.schedule_at(Instant::now(), activator);
64 }
65
66 pub fn schedule_after(&mut self, after: Duration, activator: Weak<Activator>) {
69 self.schedule_at(Instant::now() + after, activator);
70 }
71
72 pub fn event_at(&mut self, at: Instant, event: Event) {
75 self.activator_queue.push(TimedActivator {
76 at,
77 activator: Weak::new(),
78 event: Some(event),
79 });
80 }
81
82 pub fn event_after(&mut self, after: Duration, event: Event) {
85 self.event_at(Instant::now() + after, event);
86 }
87}
88
89impl Iterator for Scheduler {
90 type Item = TimedActivator;
91 fn next(&mut self) -> Option<TimedActivator> {
92 if self.has_pending() {
93 Some(self.activator_queue.pop().unwrap())
94 } else {
95 None
96 }
97 }
98}
99
100#[derive(PartialEq, Eq, Debug)]
102pub enum Event {
103 Tick,
105}
106
107pub struct TimedActivator {
110 at: Instant,
111 activator: Weak<Activator>,
112 event: Option<Event>,
113}
114
115impl TimedActivator {
116 fn is_ready(&self) -> bool {
117 Instant::now() >= self.at
118 }
119
120 fn until_ready(&self) -> Duration {
121 let now = Instant::now();
122 if self.at > now {
123 self.at.duration_since(now)
124 } else {
125 Duration::from_millis(0)
126 }
127 }
128
129 pub fn schedule(self) -> Option<Event> {
131 if let Some(activator) = self.activator.upgrade() {
132 activator.activate();
133 }
134
135 self.event
136 }
137}
138
139impl Ord for TimedActivator {
141 fn cmp(&self, other: &TimedActivator) -> Ordering {
142 other.at.cmp(&self.at)
143 }
144}
145
146impl PartialOrd for TimedActivator {
147 fn partial_cmp(&self, other: &TimedActivator) -> Option<Ordering> {
148 Some(self.cmp(other))
149 }
150}
151
152impl PartialEq for TimedActivator {
153 fn eq(&self, other: &TimedActivator) -> bool {
154 self.at.eq(&other.at)
155 }
156}
157
158impl Eq for TimedActivator {}