use crate::actor_runtime::Actor;
use crate::handlers::{InterruptedBy, StartedBy};
use crate::system::System;
use anyhow::Error;
use std::thread;
#[derive(Debug)]
pub struct ScopedRuntime {
log_target: String,
sender: Option<term::Sender>,
}
impl ScopedRuntime {
}
impl Drop for ScopedRuntime {
fn drop(&mut self) {
if let Some(sender) = self.sender.take() {
if sender.notifier_tx.send(()).is_err() {
log::error!(target: &self.log_target, "Can't send termination signal to the scope");
return;
}
if sender.blocker.lock().is_err() {
log::error!(target: &self.log_target, "Can't wait for termination of the scope");
}
}
}
}
pub fn spawn<T>(actor: T) -> Result<ScopedRuntime, Error>
where
T: Actor + StartedBy<System> + InterruptedBy<System>,
{
let log_target = actor.log_target().to_owned();
let name = format!("ScopedThread[{}]", log_target);
let (term_tx, term_rx) = term::channel();
thread::Builder::new().name(name.clone()).spawn(move || {
let on_start = name.clone();
let on_stop = name.clone();
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("meio-pool")
.worker_threads(1)
.on_thread_start(move || {
log::info!(target: &on_start, "New meio worker thread spawned");
})
.on_thread_stop(move || {
log::info!(target: &on_stop, "The meio worker thread retired");
})
.enable_all()
.build()?;
let routine = entrypoint(actor, term_rx);
runtime.block_on(routine)
})?;
Ok(ScopedRuntime {
log_target,
sender: Some(term_tx),
})
}
#[allow(clippy::await_holding_lock)]
async fn entrypoint<T>(actor: T, term_rx: term::Receiver) -> Result<(), Error>
where
T: Actor + StartedBy<System> + InterruptedBy<System>,
{
let blocker = term_rx
.blocker
.lock()
.map_err(|_| Error::msg("can't take termination blocker"))?;
let handle = System::spawn(actor);
term_rx.notifier_rx.await?;
System::interrupt(&handle)?;
handle.join().await;
drop(blocker);
Ok(())
}
mod term {
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
#[derive(Debug)]
pub struct Receiver {
pub notifier_rx: oneshot::Receiver<()>,
pub blocker: Arc<Mutex<()>>,
}
#[derive(Debug)]
pub struct Sender {
pub notifier_tx: oneshot::Sender<()>,
pub blocker: Arc<Mutex<()>>,
}
pub fn channel() -> (Sender, Receiver) {
let (tx, rx) = oneshot::channel();
let blocker = Arc::new(Mutex::new(()));
let sender = Sender {
notifier_tx: tx,
blocker: blocker.clone(),
};
let receiver = Receiver {
notifier_rx: rx,
blocker,
};
(sender, receiver)
}
}