1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::actor_runtime::Actor;
use crate::handlers::{InterruptedBy, StartedBy};
use crate::system::System;
use anyhow::Error;
use std::thread;
#[derive(Debug)]
pub struct ScopedRuntime {
name: String,
sender: Option<term::Sender>,
}
impl ScopedRuntime {
}
impl Drop for ScopedRuntime {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
if sender.notifier_tx.send(()).is_err() {
log::error!("Can't send termination signal to the {}", self.name);
return;
}
if sender.blocker.lock().is_err() {
log::error!("Can't wait for termination of the {}", self.name);
}
}
}
}
pub fn spawn<T>(actor: T) -> Result<ScopedRuntime, Error>
where
T: Actor + StartedBy<System> + InterruptedBy<System>,
{
let name = format!("thread-{}", actor.name());
let (term_tx, term_rx) = term::channel();
thread::Builder::new()
.name(name.clone())
.spawn(move || entrypoint(actor, term_rx))?;
Ok(ScopedRuntime {
name,
sender: Some(term_tx),
})
}
#[allow(clippy::await_holding_lock)]
#[tokio::main]
async fn entrypoint<T>(actor: T, term_rx: term::Receiver) -> Result<(), Error>
where
T: Actor + StartedBy<System> + InterruptedBy<System>,
{
let blocker = term_rx
.blocker
.lock()
.map_err(|_| Error::msg("can't take termination blocker"))?;
let mut handle = System::spawn(actor);
term_rx.notifier_rx.await?;
System::interrupt(&mut handle)?;
handle.join().await;
drop(blocker);
Ok(())
}
mod term {
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
#[derive(Debug)]
pub struct Receiver {
pub notifier_rx: oneshot::Receiver<()>,
pub blocker: Arc<Mutex<()>>,
}
#[derive(Debug)]
pub struct Sender {
pub notifier_tx: oneshot::Sender<()>,
pub blocker: Arc<Mutex<()>>,
}
pub fn channel() -> (Sender, Receiver) {
let (tx, rx) = oneshot::channel();
let blocker = Arc::new(Mutex::new(()));
let sender = Sender {
notifier_tx: tx,
blocker: blocker.clone(),
};
let receiver = Receiver {
notifier_rx: rx,
blocker,
};
(sender, receiver)
}
}