Skip to main content

apocalypse/
hell.rs

1use std::{
2    collections::{HashMap},
3    time::Duration
4};
5use futures::future::join_all;
6use crate::{Gate, Error};
7use tokio::{
8    sync::{
9        oneshot::{self},
10        mpsc::{self}
11    },
12    task::JoinHandle
13};
14use chrono::{DateTime, Utc};
15
16pub(crate) use self::mini_hell::MiniHell;
17mod mini_hell;
18pub(crate) use self::multiple_mini_hell::MultipleMiniHell;
19mod multiple_mini_hell;
20#[cfg(feature = "ws")]
21pub(crate) use self::mini_ws_hell::MiniWSHell;
22#[cfg(feature = "ws")]
23mod mini_ws_hell;
24
25pub(crate) use self::demon_channels::{DemonChannels};
26mod demon_channels;
27
28pub use self::hell_stats::{HellStats};
29mod hell_stats;
30
31pub(crate) use self::hell_instruction::{HellInstruction};
32mod hell_instruction;
33
34pub(crate) use self::mini_hell_instruction::{MiniHellInstruction};
35mod mini_hell_instruction;
36
37/// Builder helper for a Hell instance
38pub struct HellBuilder {
39    /// Timeout before shutdown of a demon
40    timeout: Option<Duration>
41}
42
43impl HellBuilder {
44    /// Generates a new instance of a hell builder
45    ///
46    /// ```rust
47    /// # use apocalypse::{HellBuilder};
48    /// # fn main() {
49    /// let hell = HellBuilder::new();
50    /// // Change params of the hell instance
51    /// # }
52    /// ```
53    pub fn new() -> HellBuilder {
54        HellBuilder {
55            timeout: None
56        }
57    }
58
59    /// Sets a timeout for the vanquish method to be executed
60    ///
61    /// ```rust
62    /// use apocalypse::{HellBuilder};
63    /// use std::time::Duration;
64    ///
65    /// # fn main() {
66    /// let hell = HellBuilder::new().timeout(Duration::from_secs(5));
67    /// // further modify this hell instance
68    /// # }
69    /// ```
70    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
71        self.timeout = Some(timeout);
72        self
73    }
74
75    /// Generates the hell instance from the builder params
76    ///
77    /// ```rust
78    /// # use apocalypse::{HellBuilder};
79    /// # fn main() {
80    /// let hell = HellBuilder::new().build();
81    /// # }
82    /// ```
83    pub fn build(self) -> Hell {
84        Hell {
85            counter: 0,
86            zombie_counter: 0,
87            successful_messages: 0,
88            failed_messages: 0,
89            demons: HashMap::new(),
90            timeout: self.timeout,
91            ignition_time: Utc::now()
92        }
93    }
94}
95
96/// ## Hell structure
97///
98/// This is equivalent to a normal actor framework system/runtime. A `Hell` instance will dispatch messages and coordinate interaction between actors.
99pub struct Hell {
100    /// Demon counter, to asign a unique address to each demon
101    counter: usize,
102    /// Amount of messages delivered to demons
103    successful_messages: usize,
104    /// Amount of messages delivered to demons
105    failed_messages: usize,
106    /// Zombie counter
107    zombie_counter: usize,
108    /// Communication channels with demons.
109    demons: HashMap<usize, DemonChannels>,
110    /// Maximum wait time for killswitch calls
111    timeout: Option<Duration>,
112    /// Time that hell has been active
113    ignition_time: DateTime<Utc>
114}
115
116impl Hell {
117    /// Creates a new hell instance with default parameters
118    ///
119    /// In this case, a timeout is not set, and vanquish calls are executed until the demon gracefully shuts down
120    ///
121    /// ```rust
122    /// # use apocalypse::{Hell};
123    /// let hell = Hell::new();
124    /// // Now we can spawn demons!
125    /// ```
126    pub fn new() -> Hell {
127        Hell {
128            counter: 0,
129            zombie_counter: 0,
130            successful_messages: 0,
131            failed_messages: 0,
132            demons: HashMap::new(),
133            timeout: None,
134            ignition_time: Utc::now()
135        }
136    }
137
138    /// Creates a new [HellBuilder](HellBuilder)
139    ///
140    /// ```rust
141    /// # use apocalypse::{Hell};
142    /// let hell = Hell::builder().timeout(std::time::Duration::from_secs(5)).build();
143    /// // Now we can spawn demons!
144    /// ```
145    pub fn builder() -> HellBuilder {
146        HellBuilder::new()
147    }
148
149    /// Starts the actor system
150    ///
151    /// This method returns both a Gate, and a JoinHandle.
152    ///
153    /// ```rust,no_run
154    /// use apocalypse::{Hell};
155    ///
156    /// #[tokio::main]
157    /// async fn main() {
158    ///     let hell = Hell::new();
159    ///     let (gate, join_handle) = hell.ignite().await.unwrap();
160    ///     // Do stuff with the gate
161    ///     // ...
162    ///     // Finally, await the actor's system execution
163    ///     join_handle.await.unwrap();
164    /// }
165    /// ```
166    pub async fn ignite(mut self) -> Result<(Gate, JoinHandle<()>), Error>{
167        // ignition time update
168        self.ignition_time = Utc::now();
169
170        // Message communication for the gate
171        let (hell_channel, outer_instructions) = mpsc::unbounded_channel();
172        // Incoming close messages from websockets demons
173        let (on_close_tx, mut on_close_rx) = mpsc::unbounded_channel();
174        
175        let gate = Gate {
176            hell_channel,
177            #[cfg(feature = "ws")]
178            on_close_tx
179        };
180
181        let gate_clone = gate.clone();
182
183        let jh = tokio::spawn(async move {
184            #[cfg(feature = "full_log")]
185            log::info!("Broker starts \u{1f525}");
186
187            // We need another channel, for zombie count removal
188            let (zombie_tx, mut zombie_rx) = mpsc::unbounded_channel();
189
190            let clean = {
191                let mut instructions = outer_instructions;
192                loop {
193                    #[cfg(feature = "full_log")]
194                    log::trace!("[Hell] entering message process loop iteration, waiting for incoming message...");
195                    tokio::select! {
196                        value = instructions.recv() => if let Some(instruction) = value {
197                            #[cfg(feature = "full_log")]
198                            log::debug!("[Hell] entering instruction handler");
199                            match instruction {
200                                HellInstruction::CreateAddress{tx} => {
201                                    #[cfg(feature = "full_log")]
202                                    log::trace!("[Hell] received address creation request");
203                                    let current_counter = self.counter;
204                                    if tx.send(current_counter).is_ok() {
205                                        #[cfg(feature = "full_log")]
206                                        log::debug!("[Hell] reserved address {}", current_counter);
207                                        self.counter += 1;
208                                    } else {
209                                        #[cfg(feature = "full_log")]
210                                        log::debug!("[Hell] failed to notify address {} reservation", current_counter);
211                                    }
212                                    #[cfg(feature = "full_log")]
213                                    log::trace!("[Hell] leaving address creation request");
214                                },
215                                HellInstruction::RegisterDemon{address, demon_channels, tx} => {
216                                    #[cfg(feature = "full_log")]
217                                    log::trace!("[Hell] received demon registration request");
218                                    let added = match self.demons.entry(address) {
219                                        std::collections::hash_map::Entry::Occupied(_) => {
220                                            #[cfg(feature = "full_log")]
221                                            log::debug!("[Hell] demon address {} is already taken", address);
222                                            Err(Error::OccupiedAddress)
223                                        },
224                                        std::collections::hash_map::Entry::Vacant(v) => {
225                                            #[cfg(feature = "full_log")]
226                                            log::debug!("[Hell] registering new demon with address {}", address);
227                                            v.insert(demon_channels);
228                                            Ok(())
229                                        }
230                                    };
231
232                                    if tx.send(added).is_err() {
233                                        #[cfg(feature = "full_log")]
234                                        log::debug!("[Hell] dangling demon with address {}, as it could not be notified that it was registered. removing.", address);
235                                        self.demons.remove(&address);
236                                    }
237
238                                    #[cfg(feature = "full_log")]
239                                    log::trace!("[Hell] leaving demon registration request");
240                                },
241                                HellInstruction::Message{tx, address, ignore, input} => {
242                                    #[cfg(feature = "full_log")]
243                                    log::trace!("[Hell] received message delivery request to demon at location {}", address);
244                                    if let Some(demon_channels) = self.demons.get_mut(&address) {
245                                        let tx = if ignore {
246                                            let (ignore_tx, ignore_rx) = oneshot::channel();
247                                            tokio::spawn(async move {
248                                                let _ = ignore_rx.await;
249                                                #[cfg(feature = "full_log")]
250                                                log::trace!("[Hell] ignored reply received");
251                                            });
252                                            let _ = tx.send(Ok(Box::new(())));
253                                            ignore_tx
254                                        } else {
255                                            tx
256                                        };
257                                        if demon_channels.instructions.send(MiniHellInstruction::Message(tx, input)).is_err() {
258                                            self.failed_messages += 1;
259                                            #[cfg(feature = "full_log")]
260                                            log::debug!("[Hell] message could not be delivered to demon {}", address);
261                                        } else {
262                                            self.successful_messages += 1;
263                                        };
264                                    } else {
265                                        if tx.send(Err(Error::InvalidLocation)).is_err() {
266                                            #[cfg(feature = "full_log")]
267                                            log::debug!("[Hell] invalid location message for address {} could not be delivered back", address);
268                                        };
269                                    }
270                                    #[cfg(feature = "full_log")]
271                                    log::trace!("[Hell] leaving message delivery request");
272                                },
273                                HellInstruction::RemoveDemon{address, tx, ignore, force} => {
274                                    #[cfg(feature = "full_log")]
275                                    log::trace!("[Hell] received demon removal request for demon at location {}", address);
276                                    // We simply drop the channel, so the mini hell will close automatically
277                                    let removed = self.demons.remove(&address);
278                                    if let Some(demon_channels) = removed {
279                                        // This channel will allow the zombie counter to be decreased, when necessary
280                                        let (demon_tx, demon_rx) = oneshot::channel();
281
282                                        // force timeout has the prefference
283                                        let timeout = match force {
284                                            Some(v) => {
285                                                log::trace!("[Hell] custom timeout is being used");
286                                                v
287                                            },
288                                            None => {
289                                                log::trace!("[Hell] default timeout in use");
290                                                self.timeout
291                                            }
292                                        };
293
294                                        let killswitch = if let Some(timeout) = timeout {
295                                            #[cfg(feature = "full_log")]
296                                            log::trace!("[Hell] killswitch trigger requested in {}ms", timeout.as_millis());
297                                            // We send the killswitch with a timeout
298                                            let demon_channel_killswitch = demon_channels.killswitch;
299                                            let (killswitch_tx, killswitch) = oneshot::channel();
300                                            tokio::spawn(async move {
301                                                tokio::time::sleep(timeout).await;
302                                                #[cfg(feature = "full_log")]
303                                                log::trace!("[Hell] sending killswitch trigger now");
304                                                // We ignore the killswitch send, because maybe the demon_channel is already obsolete
305                                                match demon_channel_killswitch.send(killswitch_tx) {
306                                                    Ok(_) => {
307                                                        #[cfg(feature = "full_log")]
308                                                        log::trace!("[Hell] killswitch sent");
309                                                    },
310                                                    Err(_) => {
311                                                        #[cfg(feature = "full_log")]
312                                                        log::error!("[Hell] killswitch not successfully sent");
313                                                    }
314                                                }
315                                            });
316                                            Some(killswitch)
317                                        } else {
318                                            #[cfg(feature = "full_log")]
319                                            log::trace!("[Hell] no timeout was set for this vanquish call");
320                                            None
321                                        };
322
323                                        if demon_channels.instructions.send(MiniHellInstruction::Shutdown(demon_tx)).is_err() {
324                                            #[cfg(feature = "full_log")]
325                                            log::debug!("[Hell] could not notify demon thread the requested demon at address {} removal", address);
326                                            if tx.send(Err(Error::DemonCommunication)).is_err() {
327                                                #[cfg(feature = "full_log")]
328                                                log::debug!("[Hell] could not notify demon at address {} removal failure", address);
329                                            }
330                                        } else {
331                                            let _address_copy = address.clone();
332                                            let zombie_tx_clone = zombie_tx.clone();
333                                            let waiter = async move {
334                                                if let Some(killswitch) = killswitch {
335                                                    tokio::select! {
336                                                        res = demon_rx => {
337                                                            if res.is_ok() {
338                                                                #[cfg(feature = "full_log")]
339                                                                log::trace!("[Hell] gracefull vanquish executed properly at address {}", _address_copy);
340                                                            }
341                                                        },
342                                                        res = killswitch => {
343                                                            if res.is_ok() {
344                                                                #[cfg(feature = "full_log")]
345                                                                log::trace!("[Hell] killswitch vanquish executed properly at address {}", _address_copy);
346                                                            }
347                                                        }
348                                                    };
349                                                } else {
350                                                    if demon_rx.await.is_ok() {
351                                                        #[cfg(feature = "full_log")]
352                                                        log::trace!("[Hell] gracefull vanquish executed properly at address {}", _address_copy);
353                                                    }
354                                                }
355
356                                                if ignore {
357                                                    if zombie_tx_clone.send(()).is_err() {
358                                                        #[cfg(feature = "full_log")]
359                                                        log::trace!("[Hell] demon zombie counter message decrease could not be sent");
360                                                    }
361                                                }
362                                            };
363                                            // if the message should be ignored, we need to move it to a different thread
364                                            if ignore {
365                                                #[cfg(feature = "full_log")]
366                                                log::trace!("[Hell] ignore requested, zombie demon count increased by one");
367                                                self.zombie_counter += 1;
368                                                tokio::spawn(waiter);
369                                            } else {
370                                                waiter.await;
371                                            }
372
373                                            if tx.send(Ok(())).is_err() {
374                                                #[cfg(feature = "full_log")]
375                                                log::trace!("[Hell] could not notify back demon at address {} removal", address);
376                                            }
377                                        }
378                                    } else {
379                                        #[cfg(feature = "full_log")]
380                                        log::debug!("[Hell] demon with address {} was not found", address);
381                                        if tx.send(Err(Error::InvalidLocation)).is_err() {
382                                            #[cfg(feature = "full_log")]
383                                            log::trace!("[Hell] could not notify that demon with address {} was not found", address);
384                                        }
385                                    }
386
387                                    #[cfg(feature = "full_log")]
388                                    log::trace!("[Hell] leaving demon removal request");
389                                },
390                                HellInstruction::Stats{tx} => {
391                                    #[cfg(feature = "full_log")]
392                                    log::trace!("[Hell] received stats request");
393                                    if tx.send(HellStats {
394                                        spawned_demons: self.counter,
395                                        active_demons: self.demons.len(),
396                                        zombie_demons: self.zombie_counter,
397                                        successful_messages: self.successful_messages,
398                                        failed_messages: self.failed_messages,
399                                        ignition_time: self.ignition_time.clone()
400                                    }).is_err() {
401                                        #[cfg(feature = "full_log")]
402                                        log::debug!("[Hell] could not return hell stats, channel closed");
403                                    }
404                                    #[cfg(feature = "full_log")]
405                                    log::trace!("[Hell] leaving stats request");
406                                },
407                                HellInstruction::Extinguish{tx, timeout} => {
408                                    #[cfg(feature = "full_log")]
409                                    log::trace!("[Hell] extinguish message received");
410                                    break Some((tx, timeout));
411                                }
412                            }
413                            #[cfg(feature = "full_log")]
414                            log::trace!("[Hell] leaving instruction handler");
415                        } else {
416                            #[cfg(feature = "full_log")]
417                            log::debug!("[Hell] all gates to hell have been dropped");
418                            break None;
419                        },
420                        value = zombie_rx.recv() => if value.is_some() {
421                            self.zombie_counter -= 1;
422                            #[cfg(feature = "full_log")]
423                            log::debug!("[Hell] zombie counter decrease requested, new zombie count: {}", self.zombie_counter);
424                        } else {
425                            #[cfg(feature = "full_log")]
426                            log::error!("[Hell] impossible failure, channel was closed unexpectedly");
427                            break None;
428                        },
429                        value = on_close_rx.recv() => if let Some(location) = value {
430                            #[cfg(feature = "full_log")]
431                            log::debug!("[Hell] demon closed due to websockets lost connection");
432                            let _ = self.demons.remove(&location);
433                        } else {
434                            #[cfg(feature = "full_log")]
435                            log::error!("[Hell] impossible failure, on_close channel was closed unexpectedly");
436                            break None;
437                        }
438                    }
439
440                    #[cfg(feature = "full_log")]
441                    log::trace!("[Hell] message loop iteration ended");
442                }
443            };
444
445            if let Some((tx, timeout)) = clean {
446                // extinguish was requested
447                let mut handles = Vec::new();
448                for (id, demon_channels) in self.demons {
449                    #[cfg(feature = "full_log")]
450                    log::trace!("[Hell] sending demon with id {} shutdown request", id);
451
452                    // This channel will allow the zombie counter to be decreased, when necessary
453                    let (demon_tx, demon_rx) = oneshot::channel();
454                    let (killswitch_tx, killswitch) = oneshot::channel();
455
456                    // force timeout has the prefference
457                    let timeout = match timeout {
458                        Some(v) => v,
459                        None => self.timeout
460                    };
461
462                    if let Some(timeout) = timeout {
463                        #[cfg(feature = "full_log")]
464                        log::trace!("[Hell] killswitch trigger requested in {}ms", timeout.as_millis());
465                        // We send the killswitch with a timeout
466                        let demon_channel_killswitch = demon_channels.killswitch;
467                        let _address_copy = id.clone();
468                        tokio::spawn(async move {
469                            tokio::time::sleep(timeout).await;
470                            #[cfg(feature = "full_log")]
471                            log::trace!("[Hell] sending killswitch trigger now");
472                            // We ignore the killswitch send, because maybe the demon_channel is already obsolete
473                            match demon_channel_killswitch.send(killswitch_tx) {
474                                Ok(_) => {
475                                    #[cfg(feature = "full_log")]
476                                    log::trace!("[Hell] killswitch sent to address {}", _address_copy);
477                                },
478                                Err(_) => {
479                                    #[cfg(feature = "full_log")]
480                                    log::error!("[Hell] killswitch not successfully sent to address {}", _address_copy);
481                                }
482                            }
483                        });
484                    } else {
485                        #[cfg(feature = "full_log")]
486                        log::trace!("[Hell] no timeout was set for this vanquish call");
487                    }
488
489                    if demon_channels.instructions.send(MiniHellInstruction::Shutdown(demon_tx)).is_err() {
490                        #[cfg(feature = "full_log")]
491                        log::trace!("[Hell] could not notify demon thread the requested demon at address {} removal", id);
492                    } else {
493                        #[cfg(feature = "full_log")]
494                        log::trace!("[Hell] shutdown message sent to address {}", id);
495                        let _address_copy = id.clone();
496                        let waiter = async move {
497                            #[cfg(feature = "full_log")]
498                            log::trace!("[Hell] entering wait selection for address {}", _address_copy);
499                            tokio::select! {
500                                res = demon_rx => {
501                                    if res.is_ok() {
502                                        #[cfg(feature = "full_log")]
503                                        log::trace!("[Hell] gracefull vanquish for address {}", _address_copy);
504                                    }
505                                },
506                                res = killswitch => {
507                                    if res.is_ok() {
508                                        #[cfg(feature = "full_log")]
509                                        log::trace!("[Hell] killswitch vanquish requested, sending to address {}", _address_copy);
510                                    }
511                                }
512                            };
513                            #[cfg(feature = "full_log")]
514                            log::trace!("[Hell] exiting wait selection for address {}", _address_copy);
515                        };
516                        
517                        handles.push(tokio::spawn(waiter));
518                    }
519                }
520
521                #[cfg(feature = "full_log")]
522                log::trace!("[Hell] waiting for all {} handles to complete...", handles.len());
523                join_all(handles).await;
524                #[cfg(feature = "full_log")]
525                log::trace!("[Hell] all handles completed");
526
527                if tx.send(Ok(())).is_err() {
528                    #[cfg(feature = "full_log")]
529                    log::debug!("[Hell] could not notify gate about extintion");
530                }
531            }
532
533            // We force this thing to move to this thread
534            #[cfg(not(feature = "ws"))]
535            let _ = on_close_tx.closed();
536
537            #[cfg(feature = "full_log")]
538            log::info!("Broker stops \u{1f9ca}");
539        });
540        Ok((gate_clone, jh))
541    }
542}