timely_communication/allocator/
counters.rs

1//! Push and Pull wrappers to maintain counts of messages in channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use crate::{Push, Pull};
8use crate::allocator::Event;
9
10/// The push half of an intra-thread channel.
11pub struct Pusher<T, P: Push<T>> {
12    index: usize,
13    // count: usize,
14    events: Rc<RefCell<VecDeque<(usize, Event)>>>,
15    pusher: P,
16    phantom: ::std::marker::PhantomData<T>,
17}
18
19impl<T, P: Push<T>>  Pusher<T, P> {
20    /// Wraps a pusher with a message counter.
21    pub fn new(pusher: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
22        Pusher {
23            index,
24            // count: 0,
25            events,
26            pusher,
27            phantom: ::std::marker::PhantomData,
28        }
29    }
30}
31
32impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
33    #[inline]
34    fn push(&mut self, element: &mut Option<T>) {
35        // if element.is_none() {
36        //     if self.count != 0 {
37        //         self.events
38        //             .borrow_mut()
39        //             .push_back((self.index, Event::Pushed(self.count)));
40        //         self.count = 0;
41        //     }
42        // }
43        // else {
44        //     self.count += 1;
45        // }
46        // TODO: Version above is less chatty, but can be a bit late in
47        //       moving information along. Better, but needs cooperation.
48        self.events
49            .borrow_mut()
50            .push_back((self.index, Event::Pushed(1)));
51
52        self.pusher.push(element)
53    }
54}
55
56use crossbeam_channel::Sender;
57
58/// The push half of an intra-thread channel.
59pub struct ArcPusher<T, P: Push<T>> {
60    index: usize,
61    // count: usize,
62    events: Sender<(usize, Event)>,
63    pusher: P,
64    phantom: ::std::marker::PhantomData<T>,
65    buzzer: crate::buzzer::Buzzer,
66}
67
68impl<T, P: Push<T>>  ArcPusher<T, P> {
69    /// Wraps a pusher with a message counter.
70    pub fn new(pusher: P, index: usize, events: Sender<(usize, Event)>, buzzer: crate::buzzer::Buzzer) -> Self {
71        ArcPusher {
72            index,
73            // count: 0,
74            events,
75            pusher,
76            phantom: ::std::marker::PhantomData,
77            buzzer,
78        }
79    }
80}
81
82impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
83    #[inline]
84    fn push(&mut self, element: &mut Option<T>) {
85        // if element.is_none() {
86        //     if self.count != 0 {
87        //         self.events
88        //             .send((self.index, Event::Pushed(self.count)))
89        //             .expect("Failed to send message count");
90        //         self.count = 0;
91        //     }
92        // }
93        // else {
94        //     self.count += 1;
95        // }
96
97        // These three calls should happen in this order, to ensure that
98        // we first enqueue data, second enqueue interest in the channel,
99        // and finally awaken the thread. Other orders are defective when
100        // multiple threads are involved.
101        self.pusher.push(element);
102        let _ = self.events.send((self.index, Event::Pushed(1)));
103            // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
104            // .expect("Failed to send message count");
105        self.buzzer.buzz();
106    }
107}
108
109/// The pull half of an intra-thread channel.
110pub struct Puller<T, P: Pull<T>> {
111    index: usize,
112    count: usize,
113    events: Rc<RefCell<VecDeque<(usize, Event)>>>,
114    puller: P,
115    phantom: ::std::marker::PhantomData<T>,
116}
117
118impl<T, P: Pull<T>>  Puller<T, P> {
119    /// Wraps a puller with a message counter.
120    pub fn new(puller: P, index: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>) -> Self {
121        Puller {
122            index,
123            count: 0,
124            events,
125            puller,
126            phantom: ::std::marker::PhantomData,
127        }
128    }
129}
130impl<T, P: Pull<T>> Pull<T> for Puller<T, P> {
131    #[inline]
132    fn pull(&mut self) -> &mut Option<T> {
133        let result = self.puller.pull();
134        if result.is_none() {
135            if self.count != 0 {
136                self.events
137                    .borrow_mut()
138                    .push_back((self.index, Event::Pulled(self.count)));
139                self.count = 0;
140            }
141        }
142        else {
143            self.count += 1;
144        }
145
146        result
147    }
148}