rusty_junctions/
controller.rs

1//! Control structure started by any new `Junction`, running in a background thread
2//! to handle the coordination of Join Pattern creation and execution.
3
4use std::sync::mpsc::{Receiver, Sender};
5use std::thread;
6use std::{cmp::Ordering, collections::HashMap, collections::LinkedList, vec::Vec};
7
8use super::bag::Bag;
9use super::counter::Counter;
10use super::inverted_index::InvertedIndex;
11use super::types::ids::{ChannelId, JoinPatternId};
12use super::types::{ControllerHandle, JoinPattern, Message, Packet};
13
14/// Struct to handle `Packet`s sent from the user in the background.
15///
16/// This struct holds all the information required to store and fire
17/// `JoinPattern`s once all requirements have been met. It is created by a
18/// `Junction` in a separate control thread, where it continuously listens
19/// for `Packet`s sent by user code and reacts accordingly.
20pub(crate) struct Controller {
21    latest_channel_id: ChannelId,
22    latest_join_pattern_id: JoinPatternId,
23    /// Counter for how many messages have arrived since creation.
24    message_counter: Counter,
25    /// Collection of all currently available messages.
26    messages: Bag<ChannelId, Message>,
27    /// Collection of all available Join Patterns for the `Junction` associated with
28    /// this `Controller`.
29    join_patterns: HashMap<JoinPatternId, JoinPattern>,
30    /// Map of `JoinPatternId`s to the message count at which they were last
31    /// fired, `None` if the Join Pattern has never been fired. Used to
32    /// determine precedence of Join Patterns that have not been fired in a
33    /// while when needing to choose which of the alive Join Patterns to fire.
34    join_pattern_last_fired: HashMap<JoinPatternId, Option<Counter>>,
35    /// `InvertedIndex` matching `ChannelId`s to all Join Patterns they appear in.
36    /// Used to easily determine which Join Patterns are relevant any time a new
37    /// message comes in.
38    join_pattern_index: InvertedIndex<ChannelId, JoinPatternId>,
39}
40
41impl Controller {
42    pub(crate) fn new() -> Controller {
43        Controller {
44            latest_channel_id: ChannelId::default(),
45            latest_join_pattern_id: JoinPatternId::default(),
46            message_counter: Counter::default(),
47            messages: Bag::new(),
48            join_patterns: HashMap::new(),
49            join_pattern_last_fired: HashMap::new(),
50            join_pattern_index: InvertedIndex::new(),
51        }
52    }
53
54    /// Start thread to handle incoming `Packet`s from `Junction` user.
55    ///
56    /// Start new thread in the background to handle incoming `Packet`s sent from
57    /// the user of the `Junction` that created this `Controller`. Return a
58    /// `ControlThreadHandle` so that this control thread can be joint at any future
59    /// point.
60    pub(crate) fn start(
61        mut self,
62        sender: Sender<Packet>,
63        receiver: Receiver<Packet>,
64    ) -> ControllerHandle {
65        ControllerHandle::new(sender, thread::spawn(move || self.handle_packets(receiver)))
66    }
67
68    /// Handle incoming `Packet` from associated `Junction`.
69    ///
70    /// This function will continuously receive `Packet`s sent from structs
71    /// associated with the `Junction` that created and started this `Controller`
72    /// until a `Packet::ShutDownRequest` has been sent.
73    fn handle_packets(&mut self, receiver: Receiver<Packet>) {
74        use Packet::*;
75
76        while let Ok(packet) = receiver.recv() {
77            match packet {
78                Message { channel_id, msg } => self.handle_message(channel_id, msg),
79                NewChannelIdRequest { return_sender } => {
80                    self.handle_new_channel_id_request(return_sender)
81                }
82                AddJoinPatternRequest { join_pattern } => {
83                    self.handle_add_join_pattern_request(join_pattern)
84                }
85                ShutDownRequest => break,
86            }
87        }
88    }
89
90    /// Handle a received `Message` from a given channel.
91    ///
92    /// The first action taken in handling a `Message` is storing the received
93    /// message in the `Message` bag of the `Controller`.
94    ///
95    /// The second action is to start determining if any of the Join Patterns stored
96    /// with the `Controller` are alive and if so, which of these to fire.
97    fn handle_message(&mut self, channel_id: ChannelId, msg: Message) {
98        self.messages.add(channel_id, msg);
99        self.message_counter.increment();
100
101        self.handle_join_pattern_firing(channel_id);
102    }
103
104    /// Handle the firing of a `JoinPattern`, if possible.
105    ///
106    /// Determine which `JoinPattern`s contain the channel with the given
107    /// `ChannelId`. For these, check which ones have at least one `Message`
108    /// available for each of their channels, i.e. are alive, then select
109    /// one `JoinPattern` to be fired. If at any point during this process
110    /// no more `JoinPattern`s remain, nothing will be done.
111    fn handle_join_pattern_firing(&mut self, channel_id: ChannelId) {
112        let mut alive_join_patterns: Vec<JoinPatternId> = Vec::new();
113
114        if let Some(jp_ids) = self.relevant_join_patterns(channel_id) {
115            alive_join_patterns = self.alive_join_patterns(jp_ids);
116        }
117
118        if let Some(jp_id_to_fire) = self.select_to_fire(&mut alive_join_patterns) {
119            self.fire_join_pattern(*jp_id_to_fire);
120            self.reset_last_fired(*jp_id_to_fire);
121        }
122    }
123
124    /// Return the `JoinPatternId`s of relevant Join Patterns for given `ChannelId`.
125    ///
126    /// A Join Pattern is considered relevant for a given `ChannelId` if at least
127    /// one of its channels has the `ChannelId`.
128    fn relevant_join_patterns(&self, channel_id: ChannelId) -> Option<&LinkedList<JoinPatternId>> {
129        self.join_pattern_index.peek_all(&channel_id)
130    }
131
132    /// Return the `JoinPatternId`s of all alive `JoinPattern`s.
133    ///
134    /// A `JoinPattern` is considered alive if for each of the channels
135    /// involved in it, there is at least one `Message` available.
136    fn alive_join_patterns(
137        &self,
138        join_pattern_ids: &LinkedList<JoinPatternId>,
139    ) -> Vec<JoinPatternId> {
140        // We need clone the `JoinPatternId`s at this point to avoid
141        // because need to avoid the issue of `peek_all` borrowing mutably,
142        // but then needing to mutably borrow again later to update the
143        // latest fired `JoinPatternId`.
144        join_pattern_ids
145            .iter()
146            .filter(|&jp_id| self.is_alive(*jp_id))
147            .cloned()
148            .collect()
149    }
150
151    /// Select which `JoinPattern` should be fired.
152    ///
153    /// In order to avoid certain scenarious in which one `JoinPattern` would
154    /// block the execution of another, because it for instance has a subset of
155    /// the other's channels, we need to ensure that from the `JoinPattern`s
156    /// that are alive simultaneously, we select the one to be fired that has
157    /// been waiting for the longest time.
158    ///
159    /// Specifically, we record the `Counter` of the `Message` that each
160    /// `JoinPattern` has last been fired at. We use this as a pseudo-time and
161    /// simply order the `JoinPattern`s by their `Counter` values, then take
162    /// the one with the smallest.
163    ///
164    /// Note that this procedure should ensure a certain form of *fairness*,
165    /// by which if a `JoinPattern` has been alive an infinite amount of times,
166    /// it will fire at least once. In practice, this should amount to each
167    /// `JoinPattern` being incapable of getting deadlocked by others.
168    fn select_to_fire<'a>(
169        &self,
170        alive_jp_ids: &'a mut Vec<JoinPatternId>,
171    ) -> Option<&'a JoinPatternId> {
172        alive_jp_ids
173            .sort_unstable_by(|&jp_id_1, &jp_id_2| self.compare_last_fired(jp_id_1, jp_id_2));
174
175        alive_jp_ids.first()
176    }
177
178    /// Return `true` if Join Pattern with given `JoinPatternId` is alive.
179    ///
180    /// A Join Pattern is considered alive if there is at least one `Message` for
181    /// each of the channels involved in it.
182    fn is_alive(&self, join_pattern_id: JoinPatternId) -> bool {
183        use JoinPattern::*;
184
185        if let Some(join_pattern) = self.join_patterns.get(&join_pattern_id) {
186            match join_pattern {
187                UnarySend(jp) => self.is_unary_alive(jp.channel_id()),
188                UnaryRecv(jp) => self.is_unary_alive(jp.channel_id()),
189                UnaryBidir(jp) => self.is_unary_alive(jp.channel_id()),
190                BinarySend(jp) => {
191                    self.is_binary_alive(jp.first_send_channel_id(), jp.second_send_channel_id())
192                }
193                BinaryRecv(jp) => self.is_binary_alive(jp.send_channel_id(), jp.recv_channel_id()),
194                BinaryBidir(jp) => {
195                    self.is_binary_alive(jp.send_channel_id(), jp.bidir_channel_id())
196                }
197                TernarySend(jp) => self.is_ternary_alive(
198                    jp.first_send_channel_id(),
199                    jp.second_send_channel_id(),
200                    jp.third_send_channel_id(),
201                ),
202                TernaryRecv(jp) => self.is_ternary_alive(
203                    jp.first_send_channel_id(),
204                    jp.second_send_channel_id(),
205                    jp.recv_channel_id(),
206                ),
207                TernaryBidir(jp) => self.is_ternary_alive(
208                    jp.first_send_channel_id(),
209                    jp.second_send_channel_id(),
210                    jp.bidir_channel_id(),
211                ),
212            }
213        } else {
214            false
215        }
216    }
217
218    /// Return `true` if *unary* Join Pattern with given `JoinPatternId` is alive.
219    ///
220    /// A Join Pattern is considered alive if there is at least one `Message` for
221    /// each of the channels involved in it.
222    fn is_unary_alive(&self, channel_id: ChannelId) -> bool {
223        self.messages.count_items(&channel_id) >= 1
224    }
225
226    /// Return `true` if *binary* Join Pattern with given `JoinPatternId` is alive.
227    ///
228    /// A Join Pattern is considered alive if there is at least one `Message` for
229    /// each of the channels involved in it.
230    ///
231    /// For binary Join Patterns, we need to ensure that should both channels
232    /// involved have the same `ChannelId`, we actually have at least two
233    /// `Message`s available.
234    fn is_binary_alive(&self, first_ch_id: ChannelId, second_ch_id: ChannelId) -> bool {
235        if first_ch_id == second_ch_id {
236            self.messages.count_items(&first_ch_id) >= 2
237        } else {
238            self.is_unary_alive(first_ch_id) && self.is_unary_alive(second_ch_id)
239        }
240    }
241
242    /// Return `true` if *ternary* Join Pattern with given `JoinPatternId` is alive.
243    ///
244    /// A Join Pattern is considered alive if there is at least one `Message` for
245    /// each of the channels involved in it.
246    ///
247    /// For ternary Join Patterns, we need to ensure that should more than one
248    /// channel involved have the same `ChannelId`, we actually have enough
249    /// `Message`s available.
250    fn is_ternary_alive(
251        &self,
252        first_ch_id: ChannelId,
253        second_ch_id: ChannelId,
254        third_ch_id: ChannelId,
255    ) -> bool {
256        if first_ch_id == second_ch_id && second_ch_id == third_ch_id {
257            self.messages.count_items(&first_ch_id) >= 3
258        } else {
259            self.is_binary_alive(first_ch_id, second_ch_id)
260                && self.is_binary_alive(first_ch_id, third_ch_id)
261                && self.is_binary_alive(second_ch_id, third_ch_id)
262        }
263    }
264
265    /// Compare when the Join Patterns with given `JoinPatternId`s were last alive at.
266    ///
267    /// Rules for Order:
268    /// 1. If neither `JoinPatternId` has a last alive `Counter`, then neither
269    /// has been fired yet, so they can be viewed as equal in this ordering.
270    /// 2. If only one `JoinPatternId` has no last alive `Counter`, then that
271    /// one has to be ordered as less than the other since having been fired
272    /// at least once will always be a later point of firing than not having
273    /// been fired yet.
274    /// 3. If both `JoinPatternId`s have last alive `Counter`s, use the ordering
275    /// of these.
276    ///
277    /// # Panics
278    ///
279    /// For simplicity, this function panics if the either of the given
280    /// `JoinPatternId`s is not registered in the internal map of `JoinPatternId`s
281    /// to `Instant`s that describe the last `Instant` at which a particular Join
282    /// Pattern was alive. That is to say, this function should only be called on
283    /// `JoinPatternId`s which are definitely stored in the calling `Controller`.
284    fn compare_last_fired(&self, jp_id_1: JoinPatternId, jp_id_2: JoinPatternId) -> Ordering {
285        // TODO: Can we sensibly use `Option::flatten` here?
286        let last_fired_1 = self.join_pattern_last_fired.get(&jp_id_1).unwrap();
287        let last_fired_2 = self.join_pattern_last_fired.get(&jp_id_2).unwrap();
288
289        if last_fired_1.is_none() && last_fired_2.is_none() {
290            Ordering::Equal
291        } else if last_fired_1.is_none() && last_fired_2.is_some() {
292            Ordering::Less
293        } else if last_fired_1.is_some() && last_fired_2.is_none() {
294            Ordering::Greater
295        } else {
296            last_fired_1.cmp(last_fired_2)
297        }
298    }
299
300    /// Fire the `JoinPattern` corresponding to the given `JoinPatternId`.
301    ///
302    /// The processs of firing a `JoinPattern` consists of first retrieving
303    /// a `Message` for each of the channels involved in the `JoinPattern`,
304    /// then passing these `Messages`s to the `JoinPattern` to handle the
305    /// firing.
306    ///
307    /// # Panics
308    ///
309    /// Panics when there is no `JoinPattern` stored for the given
310    /// `JoinPatternId`.
311    fn fire_join_pattern(&mut self, join_pattern_id: JoinPatternId) {
312        use JoinPattern::*;
313
314        let join_pattern = self.join_patterns.get(&join_pattern_id).unwrap();
315
316        match join_pattern {
317            UnarySend(jp) => {
318                let arg = self.messages.retrieve(&jp.channel_id()).unwrap();
319
320                jp.fire(arg);
321            }
322            UnaryRecv(jp) => {
323                let return_sender = self.messages.retrieve(&jp.channel_id()).unwrap();
324
325                jp.fire(return_sender);
326            }
327            UnaryBidir(jp) => {
328                let arg_and_sender = self.messages.retrieve(&jp.channel_id()).unwrap();
329
330                jp.fire(arg_and_sender);
331            }
332            BinarySend(jp) => {
333                let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
334                let arg_2 = self
335                    .messages
336                    .retrieve(&jp.second_send_channel_id())
337                    .unwrap();
338
339                jp.fire(arg_1, arg_2);
340            }
341            BinaryRecv(jp) => {
342                let arg = self.messages.retrieve(&jp.send_channel_id()).unwrap();
343                let return_sender = self.messages.retrieve(&jp.recv_channel_id()).unwrap();
344
345                jp.fire(arg, return_sender);
346            }
347            BinaryBidir(jp) => {
348                let arg_1 = self.messages.retrieve(&jp.send_channel_id()).unwrap();
349                let arg_2_and_sender = self.messages.retrieve(&jp.bidir_channel_id()).unwrap();
350
351                jp.fire(arg_1, arg_2_and_sender);
352            }
353            TernarySend(jp) => {
354                let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
355                let arg_2 = self
356                    .messages
357                    .retrieve(&jp.second_send_channel_id())
358                    .unwrap();
359                let arg_3 = self.messages.retrieve(&jp.third_send_channel_id()).unwrap();
360
361                jp.fire(arg_1, arg_2, arg_3);
362            }
363            TernaryRecv(jp) => {
364                let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
365                let arg_2 = self
366                    .messages
367                    .retrieve(&jp.second_send_channel_id())
368                    .unwrap();
369                let return_sender = self.messages.retrieve(&jp.recv_channel_id()).unwrap();
370
371                jp.fire(arg_1, arg_2, return_sender);
372            }
373            TernaryBidir(jp) => {
374                let arg_1 = self.messages.retrieve(&jp.first_send_channel_id()).unwrap();
375                let arg_2 = self
376                    .messages
377                    .retrieve(&jp.second_send_channel_id())
378                    .unwrap();
379                let arg_3_and_sender = self.messages.retrieve(&jp.bidir_channel_id()).unwrap();
380
381                jp.fire(arg_1, arg_2, arg_3_and_sender);
382            }
383        }
384    }
385
386    /// Reset the `Counter` at which the given Join Pattern has last been fired.
387    fn reset_last_fired(&mut self, join_pattern_id: JoinPatternId) {
388        self.join_pattern_last_fired
389            .insert(join_pattern_id, Some(self.message_counter.clone()));
390    }
391
392    /// Send new, *unique* `ChannelId` back to the requesting `Junction`.
393    ///
394    /// # Panics
395    ///
396    /// Panics if the new `ChannelId` could not be sent to the requesting `Junction`.
397    fn handle_new_channel_id_request(&mut self, return_sender: Sender<ChannelId>) {
398        return_sender.send(self.new_channel_id()).unwrap();
399    }
400
401    /// Add new Join Pattern to `Controller` storage.
402    fn handle_add_join_pattern_request(&mut self, join_pattern: JoinPattern) {
403        let jp_id = self.new_join_pattern_id();
404
405        self.initialize_last_fired(jp_id);
406
407        self.insert_join_pattern(jp_id, join_pattern);
408    }
409
410    /// Initialize the `Instant` at which Join Pattern was last alive.
411    fn initialize_last_fired(&mut self, join_pattern_id: JoinPatternId) {
412        self.join_pattern_last_fired.insert(join_pattern_id, None);
413    }
414
415    /// Insert Join Pattern into relevant internal storage.
416    ///
417    /// The given Join Pattern needs to be registered within the internal
418    /// `InvertedIndex` for future look-up operations and then stored in the
419    /// Join Pattern collection.
420    fn insert_join_pattern(&mut self, join_pattern_id: JoinPatternId, join_pattern: JoinPattern) {
421        use JoinPattern::*;
422
423        match join_pattern {
424            UnarySend(jp) => {
425                self.join_pattern_index
426                    .insert_single(jp.channel_id(), join_pattern_id);
427
428                self.join_patterns.insert(join_pattern_id, UnarySend(jp));
429            }
430            UnaryRecv(jp) => {
431                self.join_pattern_index
432                    .insert_single(jp.channel_id(), join_pattern_id);
433
434                self.join_patterns.insert(join_pattern_id, UnaryRecv(jp));
435            }
436            UnaryBidir(jp) => {
437                self.join_pattern_index
438                    .insert_single(jp.channel_id(), join_pattern_id);
439
440                self.join_patterns.insert(join_pattern_id, UnaryBidir(jp));
441            }
442            BinarySend(jp) => {
443                self.join_pattern_index
444                    .insert_single(jp.first_send_channel_id(), join_pattern_id);
445                self.join_pattern_index
446                    .insert_single(jp.second_send_channel_id(), join_pattern_id);
447
448                self.join_patterns.insert(join_pattern_id, BinarySend(jp));
449            }
450            BinaryRecv(jp) => {
451                self.join_pattern_index
452                    .insert_single(jp.send_channel_id(), join_pattern_id);
453                self.join_pattern_index
454                    .insert_single(jp.recv_channel_id(), join_pattern_id);
455
456                self.join_patterns.insert(join_pattern_id, BinaryRecv(jp));
457            }
458            BinaryBidir(jp) => {
459                self.join_pattern_index
460                    .insert_single(jp.send_channel_id(), join_pattern_id);
461                self.join_pattern_index
462                    .insert_single(jp.bidir_channel_id(), join_pattern_id);
463
464                self.join_patterns.insert(join_pattern_id, BinaryBidir(jp));
465            }
466            TernarySend(jp) => {
467                self.join_pattern_index
468                    .insert_single(jp.first_send_channel_id(), join_pattern_id);
469                self.join_pattern_index
470                    .insert_single(jp.second_send_channel_id(), join_pattern_id);
471                self.join_pattern_index
472                    .insert_single(jp.third_send_channel_id(), join_pattern_id);
473
474                self.join_patterns.insert(join_pattern_id, TernarySend(jp));
475            }
476            TernaryRecv(jp) => {
477                self.join_pattern_index
478                    .insert_single(jp.first_send_channel_id(), join_pattern_id);
479                self.join_pattern_index
480                    .insert_single(jp.second_send_channel_id(), join_pattern_id);
481                self.join_pattern_index
482                    .insert_single(jp.recv_channel_id(), join_pattern_id);
483
484                self.join_patterns.insert(join_pattern_id, TernaryRecv(jp));
485            }
486            TernaryBidir(jp) => {
487                self.join_pattern_index
488                    .insert_single(jp.first_send_channel_id(), join_pattern_id);
489                self.join_pattern_index
490                    .insert_single(jp.second_send_channel_id(), join_pattern_id);
491                self.join_pattern_index
492                    .insert_single(jp.bidir_channel_id(), join_pattern_id);
493
494                self.join_patterns.insert(join_pattern_id, TernaryBidir(jp));
495            }
496        }
497    }
498
499    /// Generate new, *unique* `ChannelId`.
500    fn new_channel_id(&mut self) -> ChannelId {
501        let ch_id = self.latest_channel_id;
502        self.latest_channel_id.increment();
503
504        ch_id
505    }
506
507    /// Generate new, *unique* `JoinPatternId`.
508    fn new_join_pattern_id(&mut self) -> JoinPatternId {
509        let jp_id = self.latest_join_pattern_id;
510        self.latest_join_pattern_id.increment();
511
512        jp_id
513    }
514}