timely_communication/allocator/
thread.rs

1//! Intra-thread communication.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6use std::collections::VecDeque;
7
8use crate::allocator::{Allocate, AllocateBuilder, Event};
9use crate::allocator::counters::Pusher as CountPusher;
10use crate::allocator::counters::Puller as CountPuller;
11use crate::{Push, Pull, Message};
12
13/// Builder for single-threaded allocator.
14pub struct ThreadBuilder;
15
16impl AllocateBuilder for ThreadBuilder {
17    type Allocator = Thread;
18    fn build(self) -> Self::Allocator { Thread::new() }
19}
20
21
22/// An allocator for intra-thread communication.
23pub struct Thread {
24    /// Shared counts of messages in channels.
25    events: Rc<RefCell<VecDeque<(usize, Event)>>>,
26}
27
28impl Allocate for Thread {
29    fn index(&self) -> usize { 0 }
30    fn peers(&self) -> usize { 1 }
31    fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
32        let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
33        (vec![Box::new(pusher)], Box::new(puller))
34    }
35    fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
36        &self.events
37    }
38    fn await_events(&self, duration: Option<Duration>) {
39        if self.events.borrow().is_empty() {
40            if let Some(duration) = duration {
41                std::thread::park_timeout(duration);
42            }
43            else {
44                std::thread::park();
45            }
46        }
47    }
48}
49
50/// Thread-local counting channel push endpoint.
51pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
52/// Thread-local counting channel pull endpoint.
53pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;
54
55impl Thread {
56    /// Allocates a new thread-local channel allocator.
57    pub fn new() -> Self {
58        Thread {
59            events: Rc::new(RefCell::new(VecDeque::new())),
60        }
61    }
62
63    /// Creates a new thread-local channel from an identifier and shared counts.
64    pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>)
65        -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
66    {
67        let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
68        let pusher = Pusher { target: shared.clone() };
69        let pusher = CountPusher::new(pusher, identifier, events.clone());
70        let puller = Puller { source: shared, current: None };
71        let puller = CountPuller::new(puller, identifier, events);
72        (pusher, puller)
73    }
74}
75
76
77/// The push half of an intra-thread channel.
78pub struct Pusher<T> {
79    target: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
80}
81
82impl<T> Push<T> for Pusher<T> {
83    #[inline]
84    fn push(&mut self, element: &mut Option<T>) {
85        let mut borrow = self.target.borrow_mut();
86        if let Some(element) = element.take() {
87            borrow.0.push_back(element);
88        }
89        *element = borrow.1.pop_front();
90    }
91}
92
93/// The pull half of an intra-thread channel.
94pub struct Puller<T> {
95    current: Option<T>,
96    source: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
97}
98
99impl<T> Pull<T> for Puller<T> {
100    #[inline]
101    fn pull(&mut self) -> &mut Option<T> {
102        let mut borrow = self.source.borrow_mut();
103        // if let Some(element) = self.current.take() {
104        //     // TODO : Arbitrary constant.
105        //     if borrow.1.len() < 16 {
106        //         borrow.1.push_back(element);
107        //     }
108        // }
109        self.current = borrow.0.pop_front();
110        &mut self.current
111    }
112}