timely/scheduling/
activate.rs

1//! Parking and unparking timely fibers.
2
3use std::rc::Rc;
4use std::sync::Arc;
5use std::cell::RefCell;
6use std::thread::Thread;
7use std::collections::BinaryHeap;
8use std::time::{Duration, Instant};
9use std::cmp::Reverse;
10use crossbeam_channel::{Sender, Receiver};
11use futures_util::task::ArcWake;
12
13/// Methods required to act as a timely scheduler.
14///
15/// The core methods are the activation of "paths", sequences of integers, and
16/// the enumeration of active paths by prefix. A scheduler may delay the report
17/// of a path indefinitely, but it should report at least one extension for the
18/// empty path `&[]` or risk parking the worker thread without a certain unpark.
19///
20/// There is no known harm to "spurious wake-ups" where a not-active path is
21/// returned through `extensions()`.
22pub trait Scheduler {
23    /// Mark a path as immediately scheduleable.
24    fn activate(&mut self, path: &[usize]);
25    /// Populates `dest` with next identifiers on active extensions of `path`.
26    ///
27    /// This method is where a scheduler is allowed to exercise some discretion,
28    /// in that it does not need to present *all* extensions, but it can instead
29    /// present only those that the runtime should schedule.
30    fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
31}
32
33// Trait objects can be schedulers too.
34impl Scheduler for Box<dyn Scheduler> {
35    fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
36    fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
37}
38
39/// Allocation-free activation tracker.
40#[derive(Debug)]
41pub struct Activations {
42    clean: usize,
43    /// `(offset, length)`
44    bounds: Vec<(usize, usize)>,
45    slices: Vec<usize>,
46    buffer: Vec<usize>,
47
48    // Inter-thread activations.
49    tx: Sender<Vec<usize>>,
50    rx: Receiver<Vec<usize>>,
51
52    // Delayed activations.
53    timer: Instant,
54    queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
55}
56
57impl Activations {
58
59    /// Creates a new activation tracker.
60    pub fn new(timer: Instant) -> Self {
61        let (tx, rx) = crossbeam_channel::unbounded();
62        Self {
63            clean: 0,
64            bounds: Vec::new(),
65            slices: Vec::new(),
66            buffer: Vec::new(),
67            tx,
68            rx,
69            timer,
70            queue: BinaryHeap::new(),
71        }
72    }
73
74    /// Activates the task addressed by `path`.
75    pub fn activate(&mut self, path: &[usize]) {
76        self.bounds.push((self.slices.len(), path.len()));
77        self.slices.extend(path);
78    }
79
80    /// Schedules a future activation for the task addressed by `path`.
81    pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
82        // TODO: We could have a minimum delay and immediately schedule anything less than that delay.
83        if delay == Duration::new(0, 0) {
84            self.activate(path);
85        }
86        else {
87            let moment = self.timer.elapsed() + delay;
88            self.queue.push(Reverse((moment, path.to_vec())));
89        }
90    }
91
92    /// Discards the current active set and presents the next active set.
93    pub fn advance(&mut self) {
94
95        // Drain inter-thread activations.
96        while let Ok(path) = self.rx.try_recv() {
97            self.activate(&path[..])
98        }
99
100        // Drain timer-based activations.
101        let now = self.timer.elapsed();
102        while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
103            let Reverse((_time, path)) = self.queue.pop().unwrap();
104            self.activate(&path[..]);
105        }
106
107        self.bounds.drain(.. self.clean);
108
109        {   // Scoped, to allow borrow to drop.
110            let slices = &self.slices[..];
111            self.bounds.sort_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
112            self.bounds.dedup_by_key(|x| &slices[x.0 .. (x.0 + x.1)]);
113        }
114
115        // Compact the slices.
116        self.buffer.clear();
117        for (offset, length) in self.bounds.iter_mut() {
118            self.buffer.extend(&self.slices[*offset .. (*offset + *length)]);
119            *offset = self.buffer.len() - *length;
120        }
121        ::std::mem::swap(&mut self.buffer, &mut self.slices);
122
123        self.clean = self.bounds.len();
124    }
125
126    /// Maps a function across activated paths.
127    pub fn map_active(&self, logic: impl Fn(&[usize])) {
128        for (offset, length) in self.bounds.iter() {
129            logic(&self.slices[*offset .. (*offset + *length)]);
130        }
131    }
132
133    /// Sets as active any symbols that follow `path`.
134    pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
135
136        let position =
137        self.bounds[..self.clean]
138            .binary_search_by_key(&path, |x| &self.slices[x.0 .. (x.0 + x.1)]);
139        let position = match position {
140            Ok(x) => x,
141            Err(x) => x,
142        };
143
144        let mut previous = None;
145        self.bounds
146            .iter()
147            .cloned()
148            .skip(position)
149            .map(|(offset, length)| &self.slices[offset .. (offset + length)])
150            .take_while(|x| x.starts_with(path))
151            .for_each(|x| {
152                // push non-empty, non-duplicate extensions.
153                if let Some(extension) = x.get(path.len()) {
154                    if previous != Some(*extension) {
155                        action(*extension);
156                        previous = Some(*extension);
157                    }
158                }
159            });
160    }
161
162    /// Constructs a thread-safe `SyncActivations` handle to this activator.
163    pub fn sync(&self) -> SyncActivations {
164        SyncActivations {
165            tx: self.tx.clone(),
166            thread: std::thread::current(),
167        }
168    }
169
170    /// Time until next scheduled event.
171    ///
172    /// This method should be used before putting a worker thread to sleep, as it
173    /// indicates the amount of time before the thread should be unparked for the
174    /// next scheduled activation.
175    pub fn empty_for(&self) -> Option<Duration> {
176        if !self.bounds.is_empty() {
177            Some(Duration::new(0,0))
178        }
179        else {
180            self.queue.peek().map(|Reverse((t,_a))| {
181                let elapsed = self.timer.elapsed();
182                if t < &elapsed { Duration::new(0,0) }
183                else { *t - elapsed }
184            })
185        }
186    }
187}
188
189/// A thread-safe handle to an `Activations`.
190#[derive(Clone, Debug)]
191pub struct SyncActivations {
192    tx: Sender<Vec<usize>>,
193    thread: Thread,
194}
195
196impl SyncActivations {
197    /// Unparks the task addressed by `path` and unparks the associated worker
198    /// thread.
199    pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
200        self.activate_batch(std::iter::once(path))
201    }
202
203    /// Unparks the tasks addressed by `paths` and unparks the associated worker
204    /// thread.
205    ///
206    /// This method can be more efficient than calling `activate` repeatedly, as
207    /// it only unparks the worker thread after sending all of the activations.
208    pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
209    where
210        I: IntoIterator<Item = Vec<usize>>
211    {
212        for path in paths.into_iter() {
213            self.tx.send(path).map_err(|_| SyncActivationError)?;
214        }
215        self.thread.unpark();
216        Ok(())
217    }
218}
219
220/// A capability to activate a specific path.
221#[derive(Clone, Debug)]
222pub struct Activator {
223    path: Vec<usize>,
224    queue: Rc<RefCell<Activations>>,
225}
226
227impl Activator {
228    /// Creates a new activation handle
229    pub fn new(path: &[usize], queue: Rc<RefCell<Activations>>) -> Self {
230        Self {
231            path: path.to_vec(),
232            queue,
233        }
234    }
235    /// Activates the associated path.
236    pub fn activate(&self) {
237        self.queue
238            .borrow_mut()
239            .activate(&self.path[..]);
240    }
241
242    /// Activates the associated path after a specified duration.
243    pub fn activate_after(&self, delay: Duration) {
244        if delay == Duration::new(0, 0) {
245            self.activate();
246        }
247        else {
248            self.queue
249                .borrow_mut()
250                .activate_after(&self.path[..], delay);
251        }
252    }
253}
254
255/// A thread-safe version of `Activator`.
256#[derive(Clone, Debug)]
257pub struct SyncActivator {
258    path: Vec<usize>,
259    queue: SyncActivations,
260}
261
262impl SyncActivator {
263    /// Creates a new thread-safe activation handle.
264    pub fn new(path: &[usize], queue: SyncActivations) -> Self {
265        Self {
266            path: path.to_vec(),
267            queue,
268        }
269    }
270
271    /// Activates the associated path and unparks the associated worker thread.
272    pub fn activate(&self) -> Result<(), SyncActivationError> {
273        self.queue.activate(self.path.clone())
274    }
275}
276
277impl ArcWake for SyncActivator {
278    fn wake_by_ref(arc_self: &Arc<Self>) {
279        arc_self.activate().unwrap();
280    }
281}
282
283/// The error returned when activation fails across thread boundaries because
284/// the receiving end has hung up.
285#[derive(Clone, Copy, Debug)]
286pub struct SyncActivationError;
287
288impl std::fmt::Display for SyncActivationError {
289    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
290        f.write_str("sync activation error in timely")
291    }
292}
293
294impl std::error::Error for SyncActivationError {}
295
296/// A wrapper that unparks on drop.
297#[derive(Clone, Debug)]
298pub struct ActivateOnDrop<T>  {
299    wrapped: T,
300    address: Rc<Vec<usize>>,
301    activator: Rc<RefCell<Activations>>,
302}
303
304use std::ops::{Deref, DerefMut};
305
306impl<T> ActivateOnDrop<T> {
307    /// Wraps an element so that it is unparked on drop.
308    pub fn new(wrapped: T, address: Rc<Vec<usize>>, activator: Rc<RefCell<Activations>>) -> Self {
309        Self { wrapped, address, activator }
310    }
311}
312
313impl<T> Deref for ActivateOnDrop<T> {
314    type Target = T;
315    fn deref(&self) -> &Self::Target {
316        &self.wrapped
317    }
318}
319
320impl<T> DerefMut for ActivateOnDrop<T> {
321    fn deref_mut(&mut self) -> &mut Self::Target {
322        &mut self.wrapped
323    }
324}
325
326impl<T> Drop for ActivateOnDrop<T> {
327    fn drop(&mut self) {
328        self.activator.borrow_mut().activate(&self.address[..]);
329    }
330}