use std::sync::Arc;
use libpetri_core::token::ErasedToken;
#[cfg(test)]
use libpetri_core::token::Token;
use crate::environment::{ExecutorSignal, ExternalEvent};
pub struct ExecutorHandle {
tx: tokio::sync::mpsc::UnboundedSender<ExecutorSignal>,
drained: bool,
}
impl ExecutorHandle {
pub fn new(tx: tokio::sync::mpsc::UnboundedSender<ExecutorSignal>) -> Self {
Self { tx, drained: false }
}
pub fn inject(&mut self, place_name: Arc<str>, token: ErasedToken) -> bool {
if self.drained {
return false;
}
self.tx
.send(ExecutorSignal::Event(ExternalEvent { place_name, token }))
.is_ok()
}
pub fn drain(&mut self) -> bool {
if self.drained {
return false;
}
self.drained = true;
self.tx.send(ExecutorSignal::Drain).is_ok()
}
pub fn close(&mut self) -> bool {
self.drained = true;
self.tx.send(ExecutorSignal::Close).is_ok()
}
pub fn is_drained(&self) -> bool {
self.drained
}
}
impl Drop for ExecutorHandle {
fn drop(&mut self) {
if !self.drained {
let _ = self.tx.send(ExecutorSignal::Drain);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn handle_drain_sets_drained_flag() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
let mut handle = ExecutorHandle::new(tx);
assert!(!handle.is_drained());
assert!(handle.drain());
assert!(handle.is_drained());
assert!(!handle.drain());
}
#[tokio::test]
async fn handle_close_sets_drained_flag() {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
let mut handle = ExecutorHandle::new(tx);
assert!(handle.close());
assert!(handle.is_drained());
}
#[tokio::test]
async fn handle_inject_rejected_after_drain() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
let mut handle = ExecutorHandle::new(tx);
handle.drain();
let result = handle.inject(Arc::from("p1"), ErasedToken::from_typed(&Token::new(42i32)));
assert!(!result);
match rx.recv().await {
Some(ExecutorSignal::Drain) => {}
other => panic!("expected Drain, got {:?}", other),
}
}
#[tokio::test]
async fn handle_drop_sends_drain_automatically() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
{
let _handle = ExecutorHandle::new(tx);
}
match rx.recv().await {
Some(ExecutorSignal::Drain) => {}
other => panic!("expected Drain from RAII drop, got {:?}", other),
}
assert!(rx.recv().await.is_none());
}
#[tokio::test]
async fn handle_drop_after_drain_does_not_double_send() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
{
let mut handle = ExecutorHandle::new(tx);
handle.drain();
}
match rx.recv().await {
Some(ExecutorSignal::Drain) => {}
other => panic!("expected exactly one Drain, got {:?}", other),
}
assert!(rx.recv().await.is_none());
}
#[tokio::test]
async fn handle_close_after_drain_escalates() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ExecutorSignal>();
let mut handle = ExecutorHandle::new(tx);
handle.drain();
handle.close(); match rx.recv().await {
Some(ExecutorSignal::Drain) => {}
other => panic!("expected Drain first, got {:?}", other),
}
match rx.recv().await {
Some(ExecutorSignal::Close) => {}
other => panic!("expected Close second, got {:?}", other),
}
}
}