timely_communication/allocator/zero_copy/
allocator.rs

1//! Zero-copy allocator based on TCP.
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::collections::{VecDeque, HashMap, hash_map::Entry};
5use crossbeam_channel::{Sender, Receiver};
6
7use bytes::arc::Bytes;
8
9use crate::networking::MessageHeader;
10
11use crate::{Allocate, Message, Data, Push, Pull};
12use crate::allocator::AllocateBuilder;
13use crate::allocator::Event;
14use crate::allocator::canary::Canary;
15
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17use super::push_pull::{Pusher, PullerInner};
18
19/// Builds an instance of a TcpAllocator.
20///
21/// Builders are required because some of the state in a `TcpAllocator` cannot be sent between
22/// threads (specifically, the `Rc<RefCell<_>>` local channels). So, we must package up the state
23/// shared between threads here, and then provide a method that will instantiate the non-movable
24/// members once in the destination thread.
25pub struct TcpBuilder<A: AllocateBuilder> {
26    inner:  A,
27    index:  usize,                      // number out of peers
28    peers:  usize,                      // number of peer allocators.
29    futures:   Vec<Receiver<MergeQueue>>,  // to receive queues to each network thread.
30    promises:   Vec<Sender<MergeQueue>>,    // to send queues from each network thread.
31}
32
33/// Creates a vector of builders, sharing appropriate state.
34///
35/// `threads` is the number of workers in a single process, `processes` is the
36/// total number of processes.
37/// The returned tuple contains
38/// ```ignore
39/// (
40///   AllocateBuilder for local threads,
41///   info to spawn egress comm threads,
42///   info to spawn ingress comm thresds,
43/// )
44/// ```
45pub fn new_vector<A: AllocateBuilder>(
46    allocators: Vec<A>,
47    my_process: usize,
48    processes: usize)
49-> (Vec<TcpBuilder<A>>,
50    Vec<Vec<Sender<MergeQueue>>>,
51    Vec<Vec<Receiver<MergeQueue>>>)
52{
53    let threads = allocators.len();
54
55    // For queues from worker threads to network threads, and vice versa.
56    let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
57    let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
58
59    let builders =
60    allocators
61        .into_iter()
62        .zip(worker_promises)
63        .zip(worker_futures)
64        .enumerate()
65        .map(|(index, ((inner, promises), futures))| {
66            TcpBuilder {
67                inner,
68                index: my_process * threads + index,
69                peers: threads * processes,
70                promises,
71                futures,
72            }})
73        .collect();
74
75    (builders, network_promises, network_futures)
76}
77
78impl<A: AllocateBuilder> TcpBuilder<A> {
79
80    /// Builds a `TcpAllocator`, instantiating `Rc<RefCell<_>>` elements.
81    pub fn build(self) -> TcpAllocator<A::Allocator> {
82
83        // Fulfill puller obligations.
84        let mut recvs = Vec::with_capacity(self.peers);
85        for promise in self.promises.into_iter() {
86            let buzzer = crate::buzzer::Buzzer::new();
87            let queue = MergeQueue::new(buzzer);
88            promise.send(queue.clone()).expect("Failed to send MergeQueue");
89            recvs.push(queue.clone());
90        }
91
92        // Extract pusher commitments.
93        let mut sends = Vec::with_capacity(self.peers);
94        for pusher in self.futures.into_iter() {
95            let queue = pusher.recv().expect("Failed to receive push queue");
96            let sendpoint = SendEndpoint::new(queue);
97            sends.push(Rc::new(RefCell::new(sendpoint)));
98        }
99
100        // let sends: Vec<_> = self.sends.into_iter().map(
101        //     |send| Rc::new(RefCell::new(SendEndpoint::new(send)))).collect();
102
103        TcpAllocator {
104            inner: self.inner.build(),
105            index: self.index,
106            peers: self.peers,
107            canaries: Rc::new(RefCell::new(Vec::new())),
108            channel_id_bound: None,
109            staged: Vec::new(),
110            sends,
111            recvs,
112            to_local: HashMap::new(),
113        }
114    }
115}
116
117/// A TCP-based allocator for inter-process communication.
118pub struct TcpAllocator<A: Allocate> {
119
120    inner:      A,                                  // A non-serialized inner allocator for process-local peers.
121
122    index:      usize,                              // number out of peers
123    peers:      usize,                              // number of peer allocators (for typed channel allocation).
124
125    staged:     Vec<Bytes>,                         // staging area for incoming Bytes
126    canaries:   Rc<RefCell<Vec<usize>>>,
127
128    channel_id_bound: Option<usize>,
129
130    // sending, receiving, and responding to binary buffers.
131    sends:      Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>,     // sends[x] -> goes to process x.
132    recvs:      Vec<MergeQueue>,                                // recvs[x] <- from process x.
133    to_local:   HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>,   // to worker-local typed pullers.
134}
135
136impl<A: Allocate> Allocate for TcpAllocator<A> {
137    fn index(&self) -> usize { self.index }
138    fn peers(&self) -> usize { self.peers }
139    fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
140
141        // Assume and enforce in-order identifier allocation.
142        if let Some(bound) = self.channel_id_bound {
143            assert!(bound < identifier);
144        }
145        self.channel_id_bound = Some(identifier);
146
147        // Result list of boxed pushers.
148        let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
149
150        // Inner exchange allocations.
151        let inner_peers = self.inner.peers();
152        let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
153
154        for target_index in 0 .. self.peers() {
155
156            // TODO: crappy place to hardcode this rule.
157            let mut process_id = target_index / inner_peers;
158
159            if process_id == self.index / inner_peers {
160                pushes.push(inner_sends.remove(0));
161            }
162            else {
163                // message header template.
164                let header = MessageHeader {
165                    channel:    identifier,
166                    source:     self.index,
167                    target:     target_index,
168                    length:     0,
169                    seqno:      0,
170                };
171
172                // create, box, and stash new process_binary pusher.
173                if process_id > self.index / inner_peers { process_id -= 1; }
174                pushes.push(Box::new(Pusher::new(header, self.sends[process_id].clone())));
175            }
176        }
177
178        let channel =
179        self.to_local
180            .entry(identifier)
181            .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
182            .clone();
183
184        use crate::allocator::counters::Puller as CountPuller;
185        let canary = Canary::new(identifier, self.canaries.clone());
186        let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone()));
187
188        (pushes, puller, )
189    }
190
191    // Perform preparatory work, most likely reading binary buffers from self.recv.
192    #[inline(never)]
193    fn receive(&mut self) {
194
195        // Check for channels whose `Puller` has been dropped.
196        let mut canaries = self.canaries.borrow_mut();
197        for dropped_channel in canaries.drain(..) {
198            let _dropped =
199            self.to_local
200                .remove(&dropped_channel)
201                .expect("non-existent channel dropped");
202            // Borrowed channels may be non-empty, if the dataflow was forcibly
203            // dropped. The contract is that if a dataflow is dropped, all other
204            // workers will drop the dataflow too, without blocking indefinitely
205            // on events from it.
206            // assert!(dropped.borrow().is_empty());
207        }
208        ::std::mem::drop(canaries);
209
210        self.inner.receive();
211
212        for recv in self.recvs.iter_mut() {
213            recv.drain_into(&mut self.staged);
214        }
215
216        let mut events = self.inner.events().borrow_mut();
217
218        for mut bytes in self.staged.drain(..) {
219
220            // We expect that `bytes` contains an integral number of messages.
221            // No splitting occurs across allocations.
222            while bytes.len() > 0 {
223
224                if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
225
226                    // Get the header and payload, ditch the header.
227                    let mut peel = bytes.extract_to(header.required_bytes());
228                    let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
229
230                    // Increment message count for channel.
231                    // Safe to do this even if the channel has been dropped.
232                    events.push_back((header.channel, Event::Pushed(1)));
233
234                    // Ensure that a queue exists.
235                    match self.to_local.entry(header.channel) {
236                        Entry::Vacant(entry) => {
237                            // We may receive data before allocating, and shouldn't block.
238                            if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
239                                entry.insert(Rc::new(RefCell::new(VecDeque::new())))
240                                    .borrow_mut()
241                                    .push_back(peel);
242                            }
243                        }
244                        Entry::Occupied(mut entry) => {
245                            entry.get_mut().borrow_mut().push_back(peel);
246                        }
247                    }
248                }
249                else {
250                    println!("failed to read full header!");
251                }
252            }
253        }
254    }
255
256    // Perform postparatory work, most likely sending un-full binary buffers.
257    fn release(&mut self) {
258        // Publish outgoing byte ledgers.
259        for send in self.sends.iter_mut() {
260            send.borrow_mut().publish();
261        }
262
263        // OPTIONAL: Tattle on channels sitting on borrowed data.
264        // OPTIONAL: Perhaps copy borrowed data into owned allocation.
265        // for (index, list) in self.to_local.iter() {
266        //     let len = list.borrow_mut().len();
267        //     if len > 0 {
268        //         eprintln!("Warning: worker {}, undrained channel[{}].len() = {}", self.index, index, len);
269        //     }
270        // }
271    }
272    fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
273        self.inner.events()
274    }
275    fn await_events(&self, duration: Option<std::time::Duration>) {
276        self.inner.await_events(duration);
277    }
278}