timely_communication/allocator/zero_copy/
allocator_process.rs

1//! Zero-copy allocator for intra-process serialized communication.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::{VecDeque, HashMap, hash_map::Entry};
6use crossbeam_channel::{Sender, Receiver};
7
8use bytes::arc::Bytes;
9
10use crate::networking::MessageHeader;
11
12use crate::{Allocate, Message, Data, Push, Pull};
13use crate::allocator::{AllocateBuilder, Event};
14use crate::allocator::canary::Canary;
15
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17
18use super::push_pull::{Pusher, Puller};
19
20/// Builds an instance of a ProcessAllocator.
21///
22/// Builders are required because some of the state in a `ProcessAllocator` cannot be sent between
23/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
24/// shared between threads here, and then provide a method that will instantiate the non-movable
25/// members once in the destination thread.
26pub struct ProcessBuilder {
27    index:  usize,                      // number out of peers
28    peers:  usize,                      // number of peer allocators.
29    pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
30    pullers: Vec<Sender<MergeQueue>>,   // for pulling bytes from other workers.
31}
32
33impl ProcessBuilder {
34    /// Creates a vector of builders, sharing appropriate state.
35    ///
36    /// This method requires access to a byte exchanger, from which it mints channels.
37    pub fn new_vector(count: usize) -> Vec<ProcessBuilder> {
38
39        // Channels for the exchange of `MergeQueue` endpoints.
40        let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
41
42        pushers_vec
43            .into_iter()
44            .zip(pullers_vec)
45            .enumerate()
46            .map(|(index, (pushers, pullers))|
47                ProcessBuilder {
48                    index,
49                    peers: count,
50                    pushers,
51                    pullers,
52                }
53            )
54            .collect()
55    }
56
57    /// Builds a `ProcessAllocator`, instantiating `Rc<RefCell<_>>` elements.
58    pub fn build(self) -> ProcessAllocator {
59
60        // Fulfill puller obligations.
61        let mut recvs = Vec::with_capacity(self.peers);
62        for puller in self.pullers.into_iter() {
63            let buzzer = crate::buzzer::Buzzer::new();
64            let queue = MergeQueue::new(buzzer);
65            puller.send(queue.clone()).expect("Failed to send MergeQueue");
66            recvs.push(queue.clone());
67        }
68
69        // Extract pusher commitments.
70        let mut sends = Vec::with_capacity(self.peers);
71        for pusher in self.pushers.into_iter() {
72            let queue = pusher.recv().expect("Failed to receive MergeQueue");
73            let sendpoint = SendEndpoint::new(queue);
74            sends.push(Rc::new(RefCell::new(sendpoint)));
75        }
76
77        ProcessAllocator {
78            index: self.index,
79            peers: self.peers,
80            events: Rc::new(RefCell::new(VecDeque::new())),
81            canaries: Rc::new(RefCell::new(Vec::new())),
82            channel_id_bound: None,
83            staged: Vec::new(),
84            sends,
85            recvs,
86            to_local: HashMap::new(),
87        }
88    }
89}
90
91impl AllocateBuilder for ProcessBuilder {
92    type Allocator = ProcessAllocator;
93    /// Builds allocator, consumes self.
94    fn build(self) -> Self::Allocator {
95        self.build()
96    }
97
98}
99
100/// A serializing allocator for inter-thread intra-process communication.
101pub struct ProcessAllocator {
102
103    index:      usize,                              // number out of peers
104    peers:      usize,                              // number of peer allocators (for typed channel allocation).
105
106    events: Rc<RefCell<VecDeque<(usize, Event)>>>,
107
108    canaries: Rc<RefCell<Vec<usize>>>,
109
110    channel_id_bound: Option<usize>,
111
112    // sending, receiving, and responding to binary buffers.
113    staged:     Vec<Bytes>,
114    sends:      Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
115    recvs:      Vec<MergeQueue>,                            // recvs[x] <- from thread x.
116    to_local:   HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>,          // to worker-local typed pullers.
117}
118
119impl Allocate for ProcessAllocator {
120    fn index(&self) -> usize { self.index }
121    fn peers(&self) -> usize { self.peers }
122    fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
123
124        // Assume and enforce in-order identifier allocation.
125        if let Some(bound) = self.channel_id_bound {
126            assert!(bound < identifier);
127        }
128        self.channel_id_bound = Some(identifier);
129
130        let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers());
131
132        for target_index in 0 .. self.peers() {
133
134            // message header template.
135            let header = MessageHeader {
136                channel:    identifier,
137                source:     self.index,
138                target:     target_index,
139                length:     0,
140                seqno:      0,
141            };
142
143            // create, box, and stash new process_binary pusher.
144            pushes.push(Box::new(Pusher::new(header, self.sends[target_index].clone())));
145        }
146
147        let channel =
148        self.to_local
149            .entry(identifier)
150            .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
151            .clone();
152
153        use crate::allocator::counters::Puller as CountPuller;
154        let canary = Canary::new(identifier, self.canaries.clone());
155        let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, self.events().clone()));
156
157        (pushes, puller)
158    }
159
160    // Perform preparatory work, most likely reading binary buffers from self.recv.
161    #[inline(never)]
162    fn receive(&mut self) {
163
164        // Check for channels whose `Puller` has been dropped.
165        let mut canaries = self.canaries.borrow_mut();
166        for dropped_channel in canaries.drain(..) {
167            let _dropped =
168            self.to_local
169                .remove(&dropped_channel)
170                .expect("non-existent channel dropped");
171            // Borrowed channels may be non-empty, if the dataflow was forcibly
172            // dropped. The contract is that if a dataflow is dropped, all other
173            // workers will drop the dataflow too, without blocking indefinitely
174            // on events from it.
175            // assert!(dropped.borrow().is_empty());
176        }
177        std::mem::drop(canaries);
178
179        let mut events = self.events.borrow_mut();
180
181        for recv in self.recvs.iter_mut() {
182            recv.drain_into(&mut self.staged);
183        }
184
185        for mut bytes in self.staged.drain(..) {
186
187            // We expect that `bytes` contains an integral number of messages.
188            // No splitting occurs across allocations.
189            while bytes.len() > 0 {
190
191                if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
192
193                    // Get the header and payload, ditch the header.
194                    let mut peel = bytes.extract_to(header.required_bytes());
195                    let _ = peel.extract_to(40);
196
197                    // Increment message count for channel.
198                    // Safe to do this even if the channel has been dropped.
199                    events.push_back((header.channel, Event::Pushed(1)));
200
201                    // Ensure that a queue exists.
202                    match self.to_local.entry(header.channel) {
203                        Entry::Vacant(entry) => {
204                            // We may receive data before allocating, and shouldn't block.
205                            if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
206                                entry.insert(Rc::new(RefCell::new(VecDeque::new())))
207                                    .borrow_mut()
208                                    .push_back(peel);
209                            }
210                        }
211                        Entry::Occupied(mut entry) => {
212                            entry.get_mut().borrow_mut().push_back(peel);
213                        }
214                    }
215                }
216                else {
217                    println!("failed to read full header!");
218                }
219            }
220        }
221    }
222
223    // Perform postparatory work, most likely sending un-full binary buffers.
224    fn release(&mut self) {
225        // Publish outgoing byte ledgers.
226        for send in self.sends.iter_mut() {
227            send.borrow_mut().publish();
228        }
229
230        // OPTIONAL: Tattle on channels sitting on borrowed data.
231        // OPTIONAL: Perhaps copy borrowed data into owned allocation.
232        // for (index, list) in self.to_local.iter() {
233        //     let len = list.borrow_mut().len();
234        //     if len > 0 {
235        //         eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
236        //     }
237        // }
238    }
239
240    fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
241        &self.events
242    }
243    fn await_events(&self, duration: Option<std::time::Duration>) {
244        if self.events.borrow().is_empty() {
245            if let Some(duration) = duration {
246                std::thread::park_timeout(duration);
247            }
248            else {
249                std::thread::park();
250            }
251        }
252    }
253}