timely_communication/allocator/
thread.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::time::Duration;
6use std::collections::VecDeque;
7
8use crate::allocator::{Allocate, AllocateBuilder, Event};
9use crate::allocator::counters::Pusher as CountPusher;
10use crate::allocator::counters::Puller as CountPuller;
11use crate::{Push, Pull, Message};
12
13pub struct ThreadBuilder;
15
16impl AllocateBuilder for ThreadBuilder {
17 type Allocator = Thread;
18 fn build(self) -> Self::Allocator { Thread::new() }
19}
20
21
22pub struct Thread {
24 events: Rc<RefCell<VecDeque<(usize, Event)>>>,
26}
27
28impl Allocate for Thread {
29 fn index(&self) -> usize { 0 }
30 fn peers(&self) -> usize { 1 }
31 fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
32 let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
33 (vec![Box::new(pusher)], Box::new(puller))
34 }
35 fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
36 &self.events
37 }
38 fn await_events(&self, duration: Option<Duration>) {
39 if self.events.borrow().is_empty() {
40 if let Some(duration) = duration {
41 std::thread::park_timeout(duration);
42 }
43 else {
44 std::thread::park();
45 }
46 }
47 }
48}
49
50pub type ThreadPusher<T> = CountPusher<T, Pusher<T>>;
52pub type ThreadPuller<T> = CountPuller<T, Puller<T>>;
54
55impl Thread {
56 pub fn new() -> Self {
58 Thread {
59 events: Rc::new(RefCell::new(VecDeque::new())),
60 }
61 }
62
63 pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>)
65 -> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
66 {
67 let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
68 let pusher = Pusher { target: shared.clone() };
69 let pusher = CountPusher::new(pusher, identifier, events.clone());
70 let puller = Puller { source: shared, current: None };
71 let puller = CountPuller::new(puller, identifier, events);
72 (pusher, puller)
73 }
74}
75
76
77pub struct Pusher<T> {
79 target: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
80}
81
82impl<T> Push<T> for Pusher<T> {
83 #[inline]
84 fn push(&mut self, element: &mut Option<T>) {
85 let mut borrow = self.target.borrow_mut();
86 if let Some(element) = element.take() {
87 borrow.0.push_back(element);
88 }
89 *element = borrow.1.pop_front();
90 }
91}
92
93pub struct Puller<T> {
95 current: Option<T>,
96 source: Rc<RefCell<(VecDeque<T>, VecDeque<T>)>>,
97}
98
99impl<T> Pull<T> for Puller<T> {
100 #[inline]
101 fn pull(&mut self) -> &mut Option<T> {
102 let mut borrow = self.source.borrow_mut();
103 self.current = borrow.0.pop_front();
110 &mut self.current
111 }
112}