timely_communication/allocator/zero_copy/
allocator.rs1use std::rc::Rc;
3use std::cell::RefCell;
4use std::collections::{VecDeque, HashMap, hash_map::Entry};
5use crossbeam_channel::{Sender, Receiver};
6
7use bytes::arc::Bytes;
8
9use crate::networking::MessageHeader;
10
11use crate::{Allocate, Message, Data, Push, Pull};
12use crate::allocator::AllocateBuilder;
13use crate::allocator::Event;
14use crate::allocator::canary::Canary;
15
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17use super::push_pull::{Pusher, PullerInner};
18
19pub struct TcpBuilder<A: AllocateBuilder> {
26 inner: A,
27 index: usize, peers: usize, futures: Vec<Receiver<MergeQueue>>, promises: Vec<Sender<MergeQueue>>, }
32
33pub fn new_vector<A: AllocateBuilder>(
46 allocators: Vec<A>,
47 my_process: usize,
48 processes: usize)
49-> (Vec<TcpBuilder<A>>,
50 Vec<Vec<Sender<MergeQueue>>>,
51 Vec<Vec<Receiver<MergeQueue>>>)
52{
53 let threads = allocators.len();
54
55 let (network_promises, worker_futures) = crate::promise_futures(processes-1, threads);
57 let (worker_promises, network_futures) = crate::promise_futures(threads, processes-1);
58
59 let builders =
60 allocators
61 .into_iter()
62 .zip(worker_promises)
63 .zip(worker_futures)
64 .enumerate()
65 .map(|(index, ((inner, promises), futures))| {
66 TcpBuilder {
67 inner,
68 index: my_process * threads + index,
69 peers: threads * processes,
70 promises,
71 futures,
72 }})
73 .collect();
74
75 (builders, network_promises, network_futures)
76}
77
78impl<A: AllocateBuilder> TcpBuilder<A> {
79
80 pub fn build(self) -> TcpAllocator<A::Allocator> {
82
83 let mut recvs = Vec::with_capacity(self.peers);
85 for promise in self.promises.into_iter() {
86 let buzzer = crate::buzzer::Buzzer::new();
87 let queue = MergeQueue::new(buzzer);
88 promise.send(queue.clone()).expect("Failed to send MergeQueue");
89 recvs.push(queue.clone());
90 }
91
92 let mut sends = Vec::with_capacity(self.peers);
94 for pusher in self.futures.into_iter() {
95 let queue = pusher.recv().expect("Failed to receive push queue");
96 let sendpoint = SendEndpoint::new(queue);
97 sends.push(Rc::new(RefCell::new(sendpoint)));
98 }
99
100 TcpAllocator {
104 inner: self.inner.build(),
105 index: self.index,
106 peers: self.peers,
107 canaries: Rc::new(RefCell::new(Vec::new())),
108 channel_id_bound: None,
109 staged: Vec::new(),
110 sends,
111 recvs,
112 to_local: HashMap::new(),
113 }
114 }
115}
116
117pub struct TcpAllocator<A: Allocate> {
119
120 inner: A, index: usize, peers: usize, staged: Vec<Bytes>, canaries: Rc<RefCell<Vec<usize>>>,
127
128 channel_id_bound: Option<usize>,
129
130 sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
135
136impl<A: Allocate> Allocate for TcpAllocator<A> {
137 fn index(&self) -> usize { self.index }
138 fn peers(&self) -> usize { self.peers }
139 fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
140
141 if let Some(bound) = self.channel_id_bound {
143 assert!(bound < identifier);
144 }
145 self.channel_id_bound = Some(identifier);
146
147 let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
149
150 let inner_peers = self.inner.peers();
152 let (mut inner_sends, inner_recv) = self.inner.allocate(identifier);
153
154 for target_index in 0 .. self.peers() {
155
156 let mut process_id = target_index / inner_peers;
158
159 if process_id == self.index / inner_peers {
160 pushes.push(inner_sends.remove(0));
161 }
162 else {
163 let header = MessageHeader {
165 channel: identifier,
166 source: self.index,
167 target: target_index,
168 length: 0,
169 seqno: 0,
170 };
171
172 if process_id > self.index / inner_peers { process_id -= 1; }
174 pushes.push(Box::new(Pusher::new(header, self.sends[process_id].clone())));
175 }
176 }
177
178 let channel =
179 self.to_local
180 .entry(identifier)
181 .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
182 .clone();
183
184 use crate::allocator::counters::Puller as CountPuller;
185 let canary = Canary::new(identifier, self.canaries.clone());
186 let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone()));
187
188 (pushes, puller, )
189 }
190
191 #[inline(never)]
193 fn receive(&mut self) {
194
195 let mut canaries = self.canaries.borrow_mut();
197 for dropped_channel in canaries.drain(..) {
198 let _dropped =
199 self.to_local
200 .remove(&dropped_channel)
201 .expect("non-existent channel dropped");
202 }
208 ::std::mem::drop(canaries);
209
210 self.inner.receive();
211
212 for recv in self.recvs.iter_mut() {
213 recv.drain_into(&mut self.staged);
214 }
215
216 let mut events = self.inner.events().borrow_mut();
217
218 for mut bytes in self.staged.drain(..) {
219
220 while bytes.len() > 0 {
223
224 if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
225
226 let mut peel = bytes.extract_to(header.required_bytes());
228 let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
229
230 events.push_back((header.channel, Event::Pushed(1)));
233
234 match self.to_local.entry(header.channel) {
236 Entry::Vacant(entry) => {
237 if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
239 entry.insert(Rc::new(RefCell::new(VecDeque::new())))
240 .borrow_mut()
241 .push_back(peel);
242 }
243 }
244 Entry::Occupied(mut entry) => {
245 entry.get_mut().borrow_mut().push_back(peel);
246 }
247 }
248 }
249 else {
250 println!("failed to read full header!");
251 }
252 }
253 }
254 }
255
256 fn release(&mut self) {
258 for send in self.sends.iter_mut() {
260 send.borrow_mut().publish();
261 }
262
263 }
272 fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
273 self.inner.events()
274 }
275 fn await_events(&self, duration: Option<std::time::Duration>) {
276 self.inner.await_events(duration);
277 }
278}