use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use crate::{CtrlFuture, RuntimeControlMessage, RuntimeHandle, RuntimeTicker};
#[derive(Debug, Clone)]
pub struct RuntimeGuard {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
runtime_ticker_ch_sender: Arc<Mutex<Option<tokio::sync::mpsc::Sender<RuntimeControlMessage>>>>,
control_ch_sender: Arc<Mutex<tokio::sync::mpsc::Sender<RuntimeControlMessage>>>,
ticker_ready: Arc<Notify>,
}
async fn wait_for_ticker_sender(
sender_slot: Arc<Mutex<Option<tokio::sync::mpsc::Sender<RuntimeControlMessage>>>>,
ticker_ready: Arc<Notify>,
) -> tokio::sync::mpsc::Sender<RuntimeControlMessage> {
loop {
let notified = ticker_ready.notified();
if let Some(sender) = sender_slot.lock().await.clone()
&& !sender.is_closed()
{
return sender;
}
notified.await;
}
}
impl RuntimeGuard {
pub fn new() -> Self {
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
let ticker_sender: Arc<Mutex<Option<tokio::sync::mpsc::Sender<RuntimeControlMessage>>>> =
Arc::new(Mutex::new(None));
let ticker_ready = Arc::new(Notify::new());
let fanout_sender = Arc::clone(&ticker_sender);
let fanout_ready = Arc::clone(&ticker_ready);
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
let mut pending = msg;
loop {
let ticker = wait_for_ticker_sender(
Arc::clone(&fanout_sender),
Arc::clone(&fanout_ready),
)
.await;
match ticker.send(pending).await {
Ok(_) => break,
Err(err) => {
pending = err.0;
}
}
}
}
});
Self {
inner: Arc::new(Inner {
runtime_ticker_ch_sender: ticker_sender,
control_ch_sender: Arc::new(Mutex::new(sender)),
ticker_ready,
}),
}
}
pub async fn runtime_ticker(&self) -> RuntimeTicker {
assert!(
!self.is_running().await,
"process already started – only one ticker allowed"
);
let mut lock = self.inner.runtime_ticker_ch_sender.lock().await;
let (ticker, sender) = RuntimeTicker::new();
lock.replace(sender);
self.inner.ticker_ready.notify_waiters();
ticker
}
pub async fn is_running(&self) -> bool {
let lock = self.inner.runtime_ticker_ch_sender.lock().await;
let closed = lock.as_ref().map(|s| s.is_closed()).unwrap_or(true);
!closed
}
pub fn handle(&self) -> Arc<RuntimeHandle> {
Arc::new(RuntimeHandle::new(Arc::clone(
&self.inner.control_ch_sender,
)))
}
pub fn control(&self, msg: RuntimeControlMessage) -> CtrlFuture<'_> {
Box::pin(async move {
let ch = self.inner.control_ch_sender.lock().await;
let _ = ch.send(msg).await;
})
}
pub fn custom<T>(&self, message: T) -> CtrlFuture<'_>
where
T: std::any::Any + Send + Sync + 'static,
{
self.control(RuntimeControlMessage::Custom(Arc::new(message)))
}
pub async fn block_until_shutdown(&self) {
while self.is_running().await {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
impl Default for RuntimeGuard {
fn default() -> Self {
Self::new()
}
}