timely_communication/allocator/zero_copy/
allocator_process.rs1use std::rc::Rc;
4use std::cell::RefCell;
5use std::collections::{VecDeque, HashMap, hash_map::Entry};
6use crossbeam_channel::{Sender, Receiver};
7
8use bytes::arc::Bytes;
9
10use crate::networking::MessageHeader;
11
12use crate::{Allocate, Message, Data, Push, Pull};
13use crate::allocator::{AllocateBuilder, Event};
14use crate::allocator::canary::Canary;
15
16use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
17
18use super::push_pull::{Pusher, Puller};
19
20pub struct ProcessBuilder {
27 index: usize, peers: usize, pushers: Vec<Receiver<MergeQueue>>, pullers: Vec<Sender<MergeQueue>>, }
32
33impl ProcessBuilder {
34 pub fn new_vector(count: usize) -> Vec<ProcessBuilder> {
38
39 let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
41
42 pushers_vec
43 .into_iter()
44 .zip(pullers_vec)
45 .enumerate()
46 .map(|(index, (pushers, pullers))|
47 ProcessBuilder {
48 index,
49 peers: count,
50 pushers,
51 pullers,
52 }
53 )
54 .collect()
55 }
56
57 pub fn build(self) -> ProcessAllocator {
59
60 let mut recvs = Vec::with_capacity(self.peers);
62 for puller in self.pullers.into_iter() {
63 let buzzer = crate::buzzer::Buzzer::new();
64 let queue = MergeQueue::new(buzzer);
65 puller.send(queue.clone()).expect("Failed to send MergeQueue");
66 recvs.push(queue.clone());
67 }
68
69 let mut sends = Vec::with_capacity(self.peers);
71 for pusher in self.pushers.into_iter() {
72 let queue = pusher.recv().expect("Failed to receive MergeQueue");
73 let sendpoint = SendEndpoint::new(queue);
74 sends.push(Rc::new(RefCell::new(sendpoint)));
75 }
76
77 ProcessAllocator {
78 index: self.index,
79 peers: self.peers,
80 events: Rc::new(RefCell::new(VecDeque::new())),
81 canaries: Rc::new(RefCell::new(Vec::new())),
82 channel_id_bound: None,
83 staged: Vec::new(),
84 sends,
85 recvs,
86 to_local: HashMap::new(),
87 }
88 }
89}
90
91impl AllocateBuilder for ProcessBuilder {
92 type Allocator = ProcessAllocator;
93 fn build(self) -> Self::Allocator {
95 self.build()
96 }
97
98}
99
100pub struct ProcessAllocator {
102
103 index: usize, peers: usize, events: Rc<RefCell<VecDeque<(usize, Event)>>>,
107
108 canaries: Rc<RefCell<Vec<usize>>>,
109
110 channel_id_bound: Option<usize>,
111
112 staged: Vec<Bytes>,
114 sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, recvs: Vec<MergeQueue>, to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, }
118
119impl Allocate for ProcessAllocator {
120 fn index(&self) -> usize { self.index }
121 fn peers(&self) -> usize { self.peers }
122 fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
123
124 if let Some(bound) = self.channel_id_bound {
126 assert!(bound < identifier);
127 }
128 self.channel_id_bound = Some(identifier);
129
130 let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers());
131
132 for target_index in 0 .. self.peers() {
133
134 let header = MessageHeader {
136 channel: identifier,
137 source: self.index,
138 target: target_index,
139 length: 0,
140 seqno: 0,
141 };
142
143 pushes.push(Box::new(Pusher::new(header, self.sends[target_index].clone())));
145 }
146
147 let channel =
148 self.to_local
149 .entry(identifier)
150 .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
151 .clone();
152
153 use crate::allocator::counters::Puller as CountPuller;
154 let canary = Canary::new(identifier, self.canaries.clone());
155 let puller = Box::new(CountPuller::new(Puller::new(channel, canary), identifier, self.events().clone()));
156
157 (pushes, puller)
158 }
159
160 #[inline(never)]
162 fn receive(&mut self) {
163
164 let mut canaries = self.canaries.borrow_mut();
166 for dropped_channel in canaries.drain(..) {
167 let _dropped =
168 self.to_local
169 .remove(&dropped_channel)
170 .expect("non-existent channel dropped");
171 }
177 std::mem::drop(canaries);
178
179 let mut events = self.events.borrow_mut();
180
181 for recv in self.recvs.iter_mut() {
182 recv.drain_into(&mut self.staged);
183 }
184
185 for mut bytes in self.staged.drain(..) {
186
187 while bytes.len() > 0 {
190
191 if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
192
193 let mut peel = bytes.extract_to(header.required_bytes());
195 let _ = peel.extract_to(40);
196
197 events.push_back((header.channel, Event::Pushed(1)));
200
201 match self.to_local.entry(header.channel) {
203 Entry::Vacant(entry) => {
204 if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) {
206 entry.insert(Rc::new(RefCell::new(VecDeque::new())))
207 .borrow_mut()
208 .push_back(peel);
209 }
210 }
211 Entry::Occupied(mut entry) => {
212 entry.get_mut().borrow_mut().push_back(peel);
213 }
214 }
215 }
216 else {
217 println!("failed to read full header!");
218 }
219 }
220 }
221 }
222
223 fn release(&mut self) {
225 for send in self.sends.iter_mut() {
227 send.borrow_mut().publish();
228 }
229
230 }
239
240 fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
241 &self.events
242 }
243 fn await_events(&self, duration: Option<std::time::Duration>) {
244 if self.events.borrow().is_empty() {
245 if let Some(duration) = duration {
246 std::thread::park_timeout(duration);
247 }
248 else {
249 std::thread::park();
250 }
251 }
252 }
253}