apocalypse/hell.rs
1use std::{
2 collections::{HashMap},
3 time::Duration
4};
5use futures::future::join_all;
6use crate::{Gate, Error};
7use tokio::{
8 sync::{
9 oneshot::{self},
10 mpsc::{self}
11 },
12 task::JoinHandle
13};
14use chrono::{DateTime, Utc};
15
16pub(crate) use self::mini_hell::MiniHell;
17mod mini_hell;
18pub(crate) use self::multiple_mini_hell::MultipleMiniHell;
19mod multiple_mini_hell;
20#[cfg(feature = "ws")]
21pub(crate) use self::mini_ws_hell::MiniWSHell;
22#[cfg(feature = "ws")]
23mod mini_ws_hell;
24
25pub(crate) use self::demon_channels::{DemonChannels};
26mod demon_channels;
27
28pub use self::hell_stats::{HellStats};
29mod hell_stats;
30
31pub(crate) use self::hell_instruction::{HellInstruction};
32mod hell_instruction;
33
34pub(crate) use self::mini_hell_instruction::{MiniHellInstruction};
35mod mini_hell_instruction;
36
37/// Builder helper for a Hell instance
38pub struct HellBuilder {
39 /// Timeout before shutdown of a demon
40 timeout: Option<Duration>
41}
42
43impl HellBuilder {
44 /// Generates a new instance of a hell builder
45 ///
46 /// ```rust
47 /// # use apocalypse::{HellBuilder};
48 /// # fn main() {
49 /// let hell = HellBuilder::new();
50 /// // Change params of the hell instance
51 /// # }
52 /// ```
53 pub fn new() -> HellBuilder {
54 HellBuilder {
55 timeout: None
56 }
57 }
58
59 /// Sets a timeout for the vanquish method to be executed
60 ///
61 /// ```rust
62 /// use apocalypse::{HellBuilder};
63 /// use std::time::Duration;
64 ///
65 /// # fn main() {
66 /// let hell = HellBuilder::new().timeout(Duration::from_secs(5));
67 /// // further modify this hell instance
68 /// # }
69 /// ```
70 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
71 self.timeout = Some(timeout);
72 self
73 }
74
75 /// Generates the hell instance from the builder params
76 ///
77 /// ```rust
78 /// # use apocalypse::{HellBuilder};
79 /// # fn main() {
80 /// let hell = HellBuilder::new().build();
81 /// # }
82 /// ```
83 pub fn build(self) -> Hell {
84 Hell {
85 counter: 0,
86 zombie_counter: 0,
87 successful_messages: 0,
88 failed_messages: 0,
89 demons: HashMap::new(),
90 timeout: self.timeout,
91 ignition_time: Utc::now()
92 }
93 }
94}
95
96/// ## Hell structure
97///
98/// This is equivalent to a normal actor framework system/runtime. A `Hell` instance will dispatch messages and coordinate interaction between actors.
99pub struct Hell {
100 /// Demon counter, to asign a unique address to each demon
101 counter: usize,
102 /// Amount of messages delivered to demons
103 successful_messages: usize,
104 /// Amount of messages delivered to demons
105 failed_messages: usize,
106 /// Zombie counter
107 zombie_counter: usize,
108 /// Communication channels with demons.
109 demons: HashMap<usize, DemonChannels>,
110 /// Maximum wait time for killswitch calls
111 timeout: Option<Duration>,
112 /// Time that hell has been active
113 ignition_time: DateTime<Utc>
114}
115
116impl Hell {
117 /// Creates a new hell instance with default parameters
118 ///
119 /// In this case, a timeout is not set, and vanquish calls are executed until the demon gracefully shuts down
120 ///
121 /// ```rust
122 /// # use apocalypse::{Hell};
123 /// let hell = Hell::new();
124 /// // Now we can spawn demons!
125 /// ```
126 pub fn new() -> Hell {
127 Hell {
128 counter: 0,
129 zombie_counter: 0,
130 successful_messages: 0,
131 failed_messages: 0,
132 demons: HashMap::new(),
133 timeout: None,
134 ignition_time: Utc::now()
135 }
136 }
137
138 /// Creates a new [HellBuilder](HellBuilder)
139 ///
140 /// ```rust
141 /// # use apocalypse::{Hell};
142 /// let hell = Hell::builder().timeout(std::time::Duration::from_secs(5)).build();
143 /// // Now we can spawn demons!
144 /// ```
145 pub fn builder() -> HellBuilder {
146 HellBuilder::new()
147 }
148
149 /// Starts the actor system
150 ///
151 /// This method returns both a Gate, and a JoinHandle.
152 ///
153 /// ```rust,no_run
154 /// use apocalypse::{Hell};
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let hell = Hell::new();
159 /// let (gate, join_handle) = hell.ignite().await.unwrap();
160 /// // Do stuff with the gate
161 /// // ...
162 /// // Finally, await the actor's system execution
163 /// join_handle.await.unwrap();
164 /// }
165 /// ```
166 pub async fn ignite(mut self) -> Result<(Gate, JoinHandle<()>), Error>{
167 // ignition time update
168 self.ignition_time = Utc::now();
169
170 // Message communication for the gate
171 let (hell_channel, outer_instructions) = mpsc::unbounded_channel();
172 // Incoming close messages from websockets demons
173 let (on_close_tx, mut on_close_rx) = mpsc::unbounded_channel();
174
175 let gate = Gate {
176 hell_channel,
177 #[cfg(feature = "ws")]
178 on_close_tx
179 };
180
181 let gate_clone = gate.clone();
182
183 let jh = tokio::spawn(async move {
184 #[cfg(feature = "full_log")]
185 log::info!("Broker starts \u{1f525}");
186
187 // We need another channel, for zombie count removal
188 let (zombie_tx, mut zombie_rx) = mpsc::unbounded_channel();
189
190 let clean = {
191 let mut instructions = outer_instructions;
192 loop {
193 #[cfg(feature = "full_log")]
194 log::trace!("[Hell] entering message process loop iteration, waiting for incoming message...");
195 tokio::select! {
196 value = instructions.recv() => if let Some(instruction) = value {
197 #[cfg(feature = "full_log")]
198 log::debug!("[Hell] entering instruction handler");
199 match instruction {
200 HellInstruction::CreateAddress{tx} => {
201 #[cfg(feature = "full_log")]
202 log::trace!("[Hell] received address creation request");
203 let current_counter = self.counter;
204 if tx.send(current_counter).is_ok() {
205 #[cfg(feature = "full_log")]
206 log::debug!("[Hell] reserved address {}", current_counter);
207 self.counter += 1;
208 } else {
209 #[cfg(feature = "full_log")]
210 log::debug!("[Hell] failed to notify address {} reservation", current_counter);
211 }
212 #[cfg(feature = "full_log")]
213 log::trace!("[Hell] leaving address creation request");
214 },
215 HellInstruction::RegisterDemon{address, demon_channels, tx} => {
216 #[cfg(feature = "full_log")]
217 log::trace!("[Hell] received demon registration request");
218 let added = match self.demons.entry(address) {
219 std::collections::hash_map::Entry::Occupied(_) => {
220 #[cfg(feature = "full_log")]
221 log::debug!("[Hell] demon address {} is already taken", address);
222 Err(Error::OccupiedAddress)
223 },
224 std::collections::hash_map::Entry::Vacant(v) => {
225 #[cfg(feature = "full_log")]
226 log::debug!("[Hell] registering new demon with address {}", address);
227 v.insert(demon_channels);
228 Ok(())
229 }
230 };
231
232 if tx.send(added).is_err() {
233 #[cfg(feature = "full_log")]
234 log::debug!("[Hell] dangling demon with address {}, as it could not be notified that it was registered. removing.", address);
235 self.demons.remove(&address);
236 }
237
238 #[cfg(feature = "full_log")]
239 log::trace!("[Hell] leaving demon registration request");
240 },
241 HellInstruction::Message{tx, address, ignore, input} => {
242 #[cfg(feature = "full_log")]
243 log::trace!("[Hell] received message delivery request to demon at location {}", address);
244 if let Some(demon_channels) = self.demons.get_mut(&address) {
245 let tx = if ignore {
246 let (ignore_tx, ignore_rx) = oneshot::channel();
247 tokio::spawn(async move {
248 let _ = ignore_rx.await;
249 #[cfg(feature = "full_log")]
250 log::trace!("[Hell] ignored reply received");
251 });
252 let _ = tx.send(Ok(Box::new(())));
253 ignore_tx
254 } else {
255 tx
256 };
257 if demon_channels.instructions.send(MiniHellInstruction::Message(tx, input)).is_err() {
258 self.failed_messages += 1;
259 #[cfg(feature = "full_log")]
260 log::debug!("[Hell] message could not be delivered to demon {}", address);
261 } else {
262 self.successful_messages += 1;
263 };
264 } else {
265 if tx.send(Err(Error::InvalidLocation)).is_err() {
266 #[cfg(feature = "full_log")]
267 log::debug!("[Hell] invalid location message for address {} could not be delivered back", address);
268 };
269 }
270 #[cfg(feature = "full_log")]
271 log::trace!("[Hell] leaving message delivery request");
272 },
273 HellInstruction::RemoveDemon{address, tx, ignore, force} => {
274 #[cfg(feature = "full_log")]
275 log::trace!("[Hell] received demon removal request for demon at location {}", address);
276 // We simply drop the channel, so the mini hell will close automatically
277 let removed = self.demons.remove(&address);
278 if let Some(demon_channels) = removed {
279 // This channel will allow the zombie counter to be decreased, when necessary
280 let (demon_tx, demon_rx) = oneshot::channel();
281
282 // force timeout has the prefference
283 let timeout = match force {
284 Some(v) => {
285 log::trace!("[Hell] custom timeout is being used");
286 v
287 },
288 None => {
289 log::trace!("[Hell] default timeout in use");
290 self.timeout
291 }
292 };
293
294 let killswitch = if let Some(timeout) = timeout {
295 #[cfg(feature = "full_log")]
296 log::trace!("[Hell] killswitch trigger requested in {}ms", timeout.as_millis());
297 // We send the killswitch with a timeout
298 let demon_channel_killswitch = demon_channels.killswitch;
299 let (killswitch_tx, killswitch) = oneshot::channel();
300 tokio::spawn(async move {
301 tokio::time::sleep(timeout).await;
302 #[cfg(feature = "full_log")]
303 log::trace!("[Hell] sending killswitch trigger now");
304 // We ignore the killswitch send, because maybe the demon_channel is already obsolete
305 match demon_channel_killswitch.send(killswitch_tx) {
306 Ok(_) => {
307 #[cfg(feature = "full_log")]
308 log::trace!("[Hell] killswitch sent");
309 },
310 Err(_) => {
311 #[cfg(feature = "full_log")]
312 log::error!("[Hell] killswitch not successfully sent");
313 }
314 }
315 });
316 Some(killswitch)
317 } else {
318 #[cfg(feature = "full_log")]
319 log::trace!("[Hell] no timeout was set for this vanquish call");
320 None
321 };
322
323 if demon_channels.instructions.send(MiniHellInstruction::Shutdown(demon_tx)).is_err() {
324 #[cfg(feature = "full_log")]
325 log::debug!("[Hell] could not notify demon thread the requested demon at address {} removal", address);
326 if tx.send(Err(Error::DemonCommunication)).is_err() {
327 #[cfg(feature = "full_log")]
328 log::debug!("[Hell] could not notify demon at address {} removal failure", address);
329 }
330 } else {
331 let _address_copy = address.clone();
332 let zombie_tx_clone = zombie_tx.clone();
333 let waiter = async move {
334 if let Some(killswitch) = killswitch {
335 tokio::select! {
336 res = demon_rx => {
337 if res.is_ok() {
338 #[cfg(feature = "full_log")]
339 log::trace!("[Hell] gracefull vanquish executed properly at address {}", _address_copy);
340 }
341 },
342 res = killswitch => {
343 if res.is_ok() {
344 #[cfg(feature = "full_log")]
345 log::trace!("[Hell] killswitch vanquish executed properly at address {}", _address_copy);
346 }
347 }
348 };
349 } else {
350 if demon_rx.await.is_ok() {
351 #[cfg(feature = "full_log")]
352 log::trace!("[Hell] gracefull vanquish executed properly at address {}", _address_copy);
353 }
354 }
355
356 if ignore {
357 if zombie_tx_clone.send(()).is_err() {
358 #[cfg(feature = "full_log")]
359 log::trace!("[Hell] demon zombie counter message decrease could not be sent");
360 }
361 }
362 };
363 // if the message should be ignored, we need to move it to a different thread
364 if ignore {
365 #[cfg(feature = "full_log")]
366 log::trace!("[Hell] ignore requested, zombie demon count increased by one");
367 self.zombie_counter += 1;
368 tokio::spawn(waiter);
369 } else {
370 waiter.await;
371 }
372
373 if tx.send(Ok(())).is_err() {
374 #[cfg(feature = "full_log")]
375 log::trace!("[Hell] could not notify back demon at address {} removal", address);
376 }
377 }
378 } else {
379 #[cfg(feature = "full_log")]
380 log::debug!("[Hell] demon with address {} was not found", address);
381 if tx.send(Err(Error::InvalidLocation)).is_err() {
382 #[cfg(feature = "full_log")]
383 log::trace!("[Hell] could not notify that demon with address {} was not found", address);
384 }
385 }
386
387 #[cfg(feature = "full_log")]
388 log::trace!("[Hell] leaving demon removal request");
389 },
390 HellInstruction::Stats{tx} => {
391 #[cfg(feature = "full_log")]
392 log::trace!("[Hell] received stats request");
393 if tx.send(HellStats {
394 spawned_demons: self.counter,
395 active_demons: self.demons.len(),
396 zombie_demons: self.zombie_counter,
397 successful_messages: self.successful_messages,
398 failed_messages: self.failed_messages,
399 ignition_time: self.ignition_time.clone()
400 }).is_err() {
401 #[cfg(feature = "full_log")]
402 log::debug!("[Hell] could not return hell stats, channel closed");
403 }
404 #[cfg(feature = "full_log")]
405 log::trace!("[Hell] leaving stats request");
406 },
407 HellInstruction::Extinguish{tx, timeout} => {
408 #[cfg(feature = "full_log")]
409 log::trace!("[Hell] extinguish message received");
410 break Some((tx, timeout));
411 }
412 }
413 #[cfg(feature = "full_log")]
414 log::trace!("[Hell] leaving instruction handler");
415 } else {
416 #[cfg(feature = "full_log")]
417 log::debug!("[Hell] all gates to hell have been dropped");
418 break None;
419 },
420 value = zombie_rx.recv() => if value.is_some() {
421 self.zombie_counter -= 1;
422 #[cfg(feature = "full_log")]
423 log::debug!("[Hell] zombie counter decrease requested, new zombie count: {}", self.zombie_counter);
424 } else {
425 #[cfg(feature = "full_log")]
426 log::error!("[Hell] impossible failure, channel was closed unexpectedly");
427 break None;
428 },
429 value = on_close_rx.recv() => if let Some(location) = value {
430 #[cfg(feature = "full_log")]
431 log::debug!("[Hell] demon closed due to websockets lost connection");
432 let _ = self.demons.remove(&location);
433 } else {
434 #[cfg(feature = "full_log")]
435 log::error!("[Hell] impossible failure, on_close channel was closed unexpectedly");
436 break None;
437 }
438 }
439
440 #[cfg(feature = "full_log")]
441 log::trace!("[Hell] message loop iteration ended");
442 }
443 };
444
445 if let Some((tx, timeout)) = clean {
446 // extinguish was requested
447 let mut handles = Vec::new();
448 for (id, demon_channels) in self.demons {
449 #[cfg(feature = "full_log")]
450 log::trace!("[Hell] sending demon with id {} shutdown request", id);
451
452 // This channel will allow the zombie counter to be decreased, when necessary
453 let (demon_tx, demon_rx) = oneshot::channel();
454 let (killswitch_tx, killswitch) = oneshot::channel();
455
456 // force timeout has the prefference
457 let timeout = match timeout {
458 Some(v) => v,
459 None => self.timeout
460 };
461
462 if let Some(timeout) = timeout {
463 #[cfg(feature = "full_log")]
464 log::trace!("[Hell] killswitch trigger requested in {}ms", timeout.as_millis());
465 // We send the killswitch with a timeout
466 let demon_channel_killswitch = demon_channels.killswitch;
467 let _address_copy = id.clone();
468 tokio::spawn(async move {
469 tokio::time::sleep(timeout).await;
470 #[cfg(feature = "full_log")]
471 log::trace!("[Hell] sending killswitch trigger now");
472 // We ignore the killswitch send, because maybe the demon_channel is already obsolete
473 match demon_channel_killswitch.send(killswitch_tx) {
474 Ok(_) => {
475 #[cfg(feature = "full_log")]
476 log::trace!("[Hell] killswitch sent to address {}", _address_copy);
477 },
478 Err(_) => {
479 #[cfg(feature = "full_log")]
480 log::error!("[Hell] killswitch not successfully sent to address {}", _address_copy);
481 }
482 }
483 });
484 } else {
485 #[cfg(feature = "full_log")]
486 log::trace!("[Hell] no timeout was set for this vanquish call");
487 }
488
489 if demon_channels.instructions.send(MiniHellInstruction::Shutdown(demon_tx)).is_err() {
490 #[cfg(feature = "full_log")]
491 log::trace!("[Hell] could not notify demon thread the requested demon at address {} removal", id);
492 } else {
493 #[cfg(feature = "full_log")]
494 log::trace!("[Hell] shutdown message sent to address {}", id);
495 let _address_copy = id.clone();
496 let waiter = async move {
497 #[cfg(feature = "full_log")]
498 log::trace!("[Hell] entering wait selection for address {}", _address_copy);
499 tokio::select! {
500 res = demon_rx => {
501 if res.is_ok() {
502 #[cfg(feature = "full_log")]
503 log::trace!("[Hell] gracefull vanquish for address {}", _address_copy);
504 }
505 },
506 res = killswitch => {
507 if res.is_ok() {
508 #[cfg(feature = "full_log")]
509 log::trace!("[Hell] killswitch vanquish requested, sending to address {}", _address_copy);
510 }
511 }
512 };
513 #[cfg(feature = "full_log")]
514 log::trace!("[Hell] exiting wait selection for address {}", _address_copy);
515 };
516
517 handles.push(tokio::spawn(waiter));
518 }
519 }
520
521 #[cfg(feature = "full_log")]
522 log::trace!("[Hell] waiting for all {} handles to complete...", handles.len());
523 join_all(handles).await;
524 #[cfg(feature = "full_log")]
525 log::trace!("[Hell] all handles completed");
526
527 if tx.send(Ok(())).is_err() {
528 #[cfg(feature = "full_log")]
529 log::debug!("[Hell] could not notify gate about extintion");
530 }
531 }
532
533 // We force this thing to move to this thread
534 #[cfg(not(feature = "ws"))]
535 let _ = on_close_tx.closed();
536
537 #[cfg(feature = "full_log")]
538 log::info!("Broker stops \u{1f9ca}");
539 });
540 Ok((gate_clone, jh))
541 }
542}