rusty_junctions/controller.rs
1//! Control structure started by any new `Junction`, running in a background thread
2//! to handle the coordination of Join Pattern creation and execution.
3
4use std::sync::mpsc::{Receiver, Sender};
5use std::thread;
6use std::{cmp::Ordering, collections::HashMap, collections::LinkedList, vec::Vec};
7
8use super::bag::Bag;
9use super::counter::Counter;
10use super::inverted_index::InvertedIndex;
11use super::types::ids::{ChannelId, JoinPatternId};
12use super::types::{ControllerHandle, JoinPattern, Message, Packet};
13
14/// Struct to handle `Packet`s sent from the user in the background.
15///
16/// This struct holds all the information required to store and fire
17/// `JoinPattern`s once all requirements have been met. It is created by a
18/// `Junction` in a separate control thread, where it continuously listens
19/// for `Packet`s sent by user code and reacts accordingly.
20pub(crate) struct Controller {
21 latest_channel_id: ChannelId,
22 latest_join_pattern_id: JoinPatternId,
23 /// Counter for how many messages have arrived since creation.
24 message_counter: Counter,
25 /// Collection of all currently available messages.
26 messages: Bag<ChannelId, Message>,
27 /// Collection of all available Join Patterns for the `Junction` associated with
28 /// this `Controller`.
29 join_patterns: HashMap<JoinPatternId, JoinPattern>,
30 /// Map of `JoinPatternId`s to the message count at which they were last
31 /// fired, `None` if the Join Pattern has never been fired. Used to
32 /// determine precedence of Join Patterns that have not been fired in a
33 /// while when needing to choose which of the alive Join Patterns to fire.
34 join_pattern_last_fired: HashMap<JoinPatternId, Option<Counter>>,
35 /// `InvertedIndex` matching `ChannelId`s to all Join Patterns they appear in.
36 /// Used to easily determine which Join Patterns are relevant any time a new
37 /// message comes in.
38 join_pattern_index: InvertedIndex<ChannelId, JoinPatternId>,
39}
40
41impl Controller {
42 pub(crate) fn new() -> Controller {
43 Controller {
44 latest_channel_id: ChannelId::default(),
45 latest_join_pattern_id: JoinPatternId::default(),
46 message_counter: Counter::default(),
47 messages: Bag::new(),
48 join_patterns: HashMap::new(),
49 join_pattern_last_fired: HashMap::new(),
50 join_pattern_index: InvertedIndex::new(),
51 }
52 }
53
54 /// Start thread to handle incoming `Packet`s from `Junction` user.
55 ///
56 /// Start new thread in the background to handle incoming `Packet`s sent from
57 /// the user of the `Junction` that created this `Controller`. Return a
58 /// `ControlThreadHandle` so that this control thread can be joint at any future
59 /// point.
60 pub(crate) fn start(
61 mut self,
62 sender: Sender<Packet>,
63 receiver: Receiver<Packet>,
64 ) -> ControllerHandle {
65 ControllerHandle::new(sender, thread::spawn(move || self.handle_packets(receiver)))
66 }
67
68 /// Handle incoming `Packet` from associated `Junction`.
69 ///
70 /// This function will continuously receive `Packet`s sent from structs
71 /// associated with the `Junction` that created and started this `Controller`
72 /// until a `Packet::ShutDownRequest` has been sent.
73 fn handle_packets(&mut self, receiver: Receiver<Packet>) {
74 use Packet::*;
75
76 while let Ok(packet) = receiver.recv() {
77 match packet {
78 Message { channel_id, msg } => self.handle_message(channel_id, msg),
79 NewChannelIdRequest { return_sender } => {
80 self.handle_new_channel_id_request(return_sender)
81 }
82 AddJoinPatternRequest { join_pattern } => {
83 self.handle_add_join_pattern_request(join_pattern)
84 }
85 ShutDownRequest => break,
86 }
87 }
88 }
89
90 /// Handle a received `Message` from a given channel.
91 ///
92 /// The first action taken in handling a `Message` is storing the received
93 /// message in the `Message` bag of the `Controller`.
94 ///
95 /// The second action is to start determining if any of the Join Patterns stored
96 /// with the `Controller` are alive and if so, which of these to fire.
97 fn handle_message(&mut self, channel_id: ChannelId, msg: Message) {
98 self.messages.add(channel_id, msg);
99 self.message_counter.increment();
100
101 self.handle_join_pattern_firing(channel_id);
102 }
103
104 /// Handle the firing of a `JoinPattern`, if possible.
105 ///
106 /// Determine which `JoinPattern`s contain the channel with the given
107 /// `ChannelId`. For these, check which ones have at least one `Message`
108 /// available for each of their channels, i.e. are alive, then select
109 /// one `JoinPattern` to be fired. If at any point during this process
110 /// no more `JoinPattern`s remain, nothing will be done.
111 fn handle_join_pattern_firing(&mut self, channel_id: ChannelId) {
112 let mut alive_join_patterns: Vec<JoinPatternId> = Vec::new();
113
114 if let Some(jp_ids) = self.relevant_join_patterns(channel_id) {
115 alive_join_patterns = self.alive_join_patterns(jp_ids);
116 }
117
118 if let Some(jp_id_to_fire) = self.select_to_fire(&mut alive_join_patterns) {
119 self.fire_join_pattern(*jp_id_to_fire);
120 self.reset_last_fired(*jp_id_to_fire);
121 }
122 }
123
124 /// Return the `JoinPatternId`s of relevant Join Patterns for given `ChannelId`.
125 ///
126 /// A Join Pattern is considered relevant for a given `ChannelId` if at least
127 /// one of its channels has the `ChannelId`.
128 fn relevant_join_patterns(&self, channel_id: ChannelId) -> Option<&LinkedList<JoinPatternId>> {
129 self.join_pattern_index.peek_all(&channel_id)
130 }
131
132 /// Return the `JoinPatternId`s of all alive `JoinPattern`s.
133 ///
134 /// A `JoinPattern` is considered alive if for each of the channels
135 /// involved in it, there is at least one `Message` available.
136 fn alive_join_patterns(
137 &self,
138 join_pattern_ids: &LinkedList<JoinPatternId>,
139 ) -> Vec<JoinPatternId> {
140 // We need clone the `JoinPatternId`s at this point to avoid
141 // because need to avoid the issue of `peek_all` borrowing mutably,
142 // but then needing to mutably borrow again later to update the
143 // latest fired `JoinPatternId`.
144 join_pattern_ids
145 .iter()
146 .filter(|&jp_id| self.is_alive(*jp_id))
147 .cloned()
148 .collect()
149 }
150
151 /// Select which `JoinPattern` should be fired.
152 ///
153 /// In order to avoid certain scenarious in which one `JoinPattern` would
154 /// block the execution of another, because it for instance has a subset of
155 /// the other's channels, we need to ensure that from the `JoinPattern`s
156 /// that are alive simultaneously, we select the one to be fired that has
157 /// been waiting for the longest time.
158 ///
159 /// Specifically, we record the `Counter` of the `Message` that each
160 /// `JoinPattern` has last been fired at. We use this as a pseudo-time and
161 /// simply order the `JoinPattern`s by their `Counter` values, then take
162 /// the one with the smallest.
163 ///
164 /// Note that this procedure should ensure a certain form of *fairness*,
165 /// by which if a `JoinPattern` has been alive an infinite amount of times,
166 /// it will fire at least once. In practice, this should amount to each
167 /// `JoinPattern` being incapable of getting deadlocked by others.
168 fn select_to_fire<'a>(
169 &self,
170 alive_jp_ids: &'a mut Vec<JoinPatternId>,
171 ) -> Option<&'a JoinPatternId> {
172 alive_jp_ids
173 .sort_unstable_by(|&jp_id_1, &jp_id_2| self.compare_last_fired(jp_id_1, jp_id_2));
174
175 alive_jp_ids.first()
176 }
177
178 /// Return `true` if Join Pattern with given `JoinPatternId` is alive.
179 ///
180 /// A Join Pattern is considered alive if there is at least one `Message` for
181 /// each of the channels involved in it.
182 fn is_alive(&self, join_pattern_id: JoinPatternId) -> bool {
183 use JoinPattern::*;
184
185 if let Some(join_pattern) = self.join_patterns.get(&join_pattern_id) {
186 match join_pattern {
187 UnarySend(jp) => self.is_unary_alive(jp.channel_id()),
188 UnaryRecv(jp) => self.is_unary_alive(jp.channel_id()),
189 UnaryBidir(jp) => self.is_unary_alive(jp.channel_id()),
190 BinarySend(jp) => {
191 self.is_binary_alive(jp.first_send_channel_id(), jp.second_send_channel_id())
192 }
193 BinaryRecv(jp) => self.is_binary_alive(jp.send_channel_id(), jp.recv_channel_id()),
194 BinaryBidir(jp) => {
195 self.is_binary_alive(jp.send_channel_id(), jp.bidir_channel_id())
196 }
197 TernarySend(jp) => self.is_ternary_alive(
198 jp.first_send_channel_id(),
199 jp.second_send_channel_id(),
200 jp.third_send_channel_id(),
201 ),
202 TernaryRecv(jp) => self.is_ternary_alive(
203 jp.first_send_channel_id(),
204 jp.second_send_channel_id(),
205 jp.recv_channel_id(),
206 ),
207 TernaryBidir(jp) => self.is_ternary_alive(
208 jp.first_send_channel_id(),
209 jp.second_send_channel_id(),
210 jp.bidir_channel_id(),
211 ),
212 }
213 } else {
214 false
215 }
216 }
217
218 /// Return `true` if *unary* Join Pattern with given `JoinPatternId` is alive.
219 ///
220 /// A Join Pattern is considered alive if there is at least one `Message` for
221 /// each of the channels involved in it.
222 fn is_unary_alive(&self, channel_id: ChannelId) -> bool {
223 self.messages.count_items(&channel_id) >= 1
224 }
225
226 /// Return `true` if *binary* Join Pattern with given `JoinPatternId` is alive.
227 ///
228 /// A Join Pattern is considered alive if there is at least one `Message` for
229 /// each of the channels involved in it.
230 ///
231 /// For binary Join Patterns, we need to ensure that should both channels
232 /// involved have the same `ChannelId`, we actually have at least two
233 /// `Message`s available.
234 fn is_binary_alive(&self, first_ch_id: ChannelId, second_ch_id: ChannelId) -> bool {
235 if first_ch_id == second_ch_id {
236 self.messages.count_items(&first_ch_id) >= 2
237 } else {
238 self.is_unary_alive(first_ch_id) && self.is_unary_alive(second_ch_id)
239 }
240 }
241
242 /// Return `true` if *ternary* Join Pattern with given `JoinPatternId` is alive.
243 ///
244 /// A Join Pattern is considered alive if there is at least one `Message` for
245 /// each of the channels involved in it.
246 ///
247 /// For ternary Join Patterns, we need to ensure that should more than one
248 /// channel involved have the same `ChannelId`, we actually have enough
249 /// `Message`s available.
250 fn is_ternary_alive(
251 &self,
252 first_ch_id: ChannelId,
253 second_ch_id: ChannelId,
254 third_ch_id: ChannelId,
255 ) -> bool {
256 if first_ch_id == second_ch_id && second_ch_id == third_ch_id {
257 self.messages.count_items(&first_ch_id) >= 3
258 } else {
259 self.is_binary_alive(first_ch_id, second_ch_id)
260 && self.is_binary_alive(first_ch_id, third_ch_id)
261 && self.is_binary_alive(second_ch_id, third_ch_id)
262 }
263 }
264
265 /// Compare when the Join Patterns with given `JoinPatternId`s were last alive at.
266 ///
267 /// Rules for Order:
268 /// 1. If neither `JoinPatternId` has a last alive `Counter`, then neither
269 /// has been fired yet, so they can be viewed as equal in this ordering.
270 /// 2. If only one `JoinPatternId` has no last alive `Counter`, then that
271 /// one has to be ordered as less than the other since having been fired
272 /// at least once will always be a later point of firing than not having
273 /// been fired yet.
274 /// 3. If both `JoinPatternId`s have last alive `Counter`s, use the ordering
275 /// of these.
276 ///
277 /// # Panics
278 ///
279 /// For simplicity, this function panics if the either of the given
280 /// `JoinPatternId`s is not registered in the internal map of `JoinPatternId`s
281 /// to `Instant`s that describe the last `Instant` at which a particular Join
282 /// Pattern was alive. That is to say, this function should only be called on
283 /// `JoinPatternId`s which are definitely stored in the calling `Controller`.
284 fn compare_last_fired(&self, jp_id_1: JoinPatternId, jp_id_2: JoinPatternId) -> Ordering {
285 // TODO: Can we sensibly use `Option::flatten` here?
286 let last_fired_1 = self.join_pattern_last_fired.get(&jp_id_1).unwrap();
287 let last_fired_2 = self.join_pattern_last_fired.get(&jp_id_2).unwrap();
288
289 if last_fired_1.is_none() && last_fired_2.is_none() {
290 Ordering::Equal
291 } else if last_fired_1.is_none() && last_fired_2.is_some() {
292 Ordering::Less
293 } else if last_fired_1.is_some() && last_fired_2.is_none() {
294 Ordering::Greater
295 } else {
296 last_fired_1.cmp(last_fired_2)
297 }
298 }
299
300 /// Fire the `JoinPattern` corresponding to the given `JoinPatternId`.
301 ///
302 /// The processs of firing a `JoinPattern` consists of first retrieving
303 /// a `Message` for each of the channels involved in the `JoinPattern`,
304 /// then passing these `Messages`s to the `JoinPattern` to handle the
305 /// firing.
306 ///
307 /// # Panics
308 ///
309 /// Panics when there is no `JoinPattern` stored for the given
310 /// `JoinPatternId`.
311 fn fire_join_pattern(&mut self, join_pattern_id: JoinPatternId) {
312 use JoinPattern::*;
313
314 let join_pattern = self.join_patterns.get(&join_pattern_id).unwrap();
315
316 match join_pattern {
317 UnarySend(jp) => {
318 let arg = self.messages.retrieve(&jp.channel_id()).unwrap();
319
320 jp.fire(arg);
321 }
322 UnaryRecv(jp) => {
323 let return_sender = self.messages.retrieve(&jp.channel_id()).unwrap();
324
325 jp.fire(return_sender);
326 }
327 UnaryBidir(jp) => {
328 let arg_and_sender = self.messages.retrieve(&jp.channel_id()).unwrap();
329
330 jp.fire(arg_and_sender);
331 }
332 BinarySend(jp) => {
333 let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
334 let arg_2 = self
335 .messages
336 .retrieve(&jp.second_send_channel_id())
337 .unwrap();
338
339 jp.fire(arg_1, arg_2);
340 }
341 BinaryRecv(jp) => {
342 let arg = self.messages.retrieve(&jp.send_channel_id()).unwrap();
343 let return_sender = self.messages.retrieve(&jp.recv_channel_id()).unwrap();
344
345 jp.fire(arg, return_sender);
346 }
347 BinaryBidir(jp) => {
348 let arg_1 = self.messages.retrieve(&jp.send_channel_id()).unwrap();
349 let arg_2_and_sender = self.messages.retrieve(&jp.bidir_channel_id()).unwrap();
350
351 jp.fire(arg_1, arg_2_and_sender);
352 }
353 TernarySend(jp) => {
354 let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
355 let arg_2 = self
356 .messages
357 .retrieve(&jp.second_send_channel_id())
358 .unwrap();
359 let arg_3 = self.messages.retrieve(&jp.third_send_channel_id()).unwrap();
360
361 jp.fire(arg_1, arg_2, arg_3);
362 }
363 TernaryRecv(jp) => {
364 let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
365 let arg_2 = self
366 .messages
367 .retrieve(&jp.second_send_channel_id())
368 .unwrap();
369 let return_sender = self.messages.retrieve(&jp.recv_channel_id()).unwrap();
370
371 jp.fire(arg_1, arg_2, return_sender);
372 }
373 TernaryBidir(jp) => {
374 let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
375 let arg_2 = self
376 .messages
377 .retrieve(&jp.second_send_channel_id())
378 .unwrap();
379 let arg_3_and_sender = self.messages.retrieve(&jp.bidir_channel_id()).unwrap();
380
381 jp.fire(arg_1, arg_2, arg_3_and_sender);
382 }
383 }
384 }
385
386 /// Reset the `Counter` at which the given Join Pattern has last been fired.
387 fn reset_last_fired(&mut self, join_pattern_id: JoinPatternId) {
388 self.join_pattern_last_fired
389 .insert(join_pattern_id, Some(self.message_counter.clone()));
390 }
391
392 /// Send new, *unique* `ChannelId` back to the requesting `Junction`.
393 ///
394 /// # Panics
395 ///
396 /// Panics if the new `ChannelId` could not be sent to the requesting `Junction`.
397 fn handle_new_channel_id_request(&mut self, return_sender: Sender<ChannelId>) {
398 return_sender.send(self.new_channel_id()).unwrap();
399 }
400
401 /// Add new Join Pattern to `Controller` storage.
402 fn handle_add_join_pattern_request(&mut self, join_pattern: JoinPattern) {
403 let jp_id = self.new_join_pattern_id();
404
405 self.initialize_last_fired(jp_id);
406
407 self.insert_join_pattern(jp_id, join_pattern);
408 }
409
410 /// Initialize the `Instant` at which Join Pattern was last alive.
411 fn initialize_last_fired(&mut self, join_pattern_id: JoinPatternId) {
412 self.join_pattern_last_fired.insert(join_pattern_id, None);
413 }
414
415 /// Insert Join Pattern into relevant internal storage.
416 ///
417 /// The given Join Pattern needs to be registered within the internal
418 /// `InvertedIndex` for future look-up operations and then stored in the
419 /// Join Pattern collection.
420 fn insert_join_pattern(&mut self, join_pattern_id: JoinPatternId, join_pattern: JoinPattern) {
421 use JoinPattern::*;
422
423 match join_pattern {
424 UnarySend(jp) => {
425 self.join_pattern_index
426 .insert_single(jp.channel_id(), join_pattern_id);
427
428 self.join_patterns.insert(join_pattern_id, UnarySend(jp));
429 }
430 UnaryRecv(jp) => {
431 self.join_pattern_index
432 .insert_single(jp.channel_id(), join_pattern_id);
433
434 self.join_patterns.insert(join_pattern_id, UnaryRecv(jp));
435 }
436 UnaryBidir(jp) => {
437 self.join_pattern_index
438 .insert_single(jp.channel_id(), join_pattern_id);
439
440 self.join_patterns.insert(join_pattern_id, UnaryBidir(jp));
441 }
442 BinarySend(jp) => {
443 self.join_pattern_index
444 .insert_single(jp.first_send_channel_id(), join_pattern_id);
445 self.join_pattern_index
446 .insert_single(jp.second_send_channel_id(), join_pattern_id);
447
448 self.join_patterns.insert(join_pattern_id, BinarySend(jp));
449 }
450 BinaryRecv(jp) => {
451 self.join_pattern_index
452 .insert_single(jp.send_channel_id(), join_pattern_id);
453 self.join_pattern_index
454 .insert_single(jp.recv_channel_id(), join_pattern_id);
455
456 self.join_patterns.insert(join_pattern_id, BinaryRecv(jp));
457 }
458 BinaryBidir(jp) => {
459 self.join_pattern_index
460 .insert_single(jp.send_channel_id(), join_pattern_id);
461 self.join_pattern_index
462 .insert_single(jp.bidir_channel_id(), join_pattern_id);
463
464 self.join_patterns.insert(join_pattern_id, BinaryBidir(jp));
465 }
466 TernarySend(jp) => {
467 self.join_pattern_index
468 .insert_single(jp.first_send_channel_id(), join_pattern_id);
469 self.join_pattern_index
470 .insert_single(jp.second_send_channel_id(), join_pattern_id);
471 self.join_pattern_index
472 .insert_single(jp.third_send_channel_id(), join_pattern_id);
473
474 self.join_patterns.insert(join_pattern_id, TernarySend(jp));
475 }
476 TernaryRecv(jp) => {
477 self.join_pattern_index
478 .insert_single(jp.first_send_channel_id(), join_pattern_id);
479 self.join_pattern_index
480 .insert_single(jp.second_send_channel_id(), join_pattern_id);
481 self.join_pattern_index
482 .insert_single(jp.recv_channel_id(), join_pattern_id);
483
484 self.join_patterns.insert(join_pattern_id, TernaryRecv(jp));
485 }
486 TernaryBidir(jp) => {
487 self.join_pattern_index
488 .insert_single(jp.first_send_channel_id(), join_pattern_id);
489 self.join_pattern_index
490 .insert_single(jp.second_send_channel_id(), join_pattern_id);
491 self.join_pattern_index
492 .insert_single(jp.bidir_channel_id(), join_pattern_id);
493
494 self.join_patterns.insert(join_pattern_id, TernaryBidir(jp));
495 }
496 }
497 }
498
499 /// Generate new, *unique* `ChannelId`.
500 fn new_channel_id(&mut self) -> ChannelId {
501 let ch_id = self.latest_channel_id;
502 self.latest_channel_id.increment();
503
504 ch_id
505 }
506
507 /// Generate new, *unique* `JoinPatternId`.
508 fn new_join_pattern_id(&mut self) -> JoinPatternId {
509 let jp_id = self.latest_join_pattern_id;
510 self.latest_join_pattern_id.increment();
511
512 jp_id
513 }
514}