pub mod connector;
use crate::traits::Node;
use crate::zfresult::Error;
use crate::Result as ZFResult;
use async_std::task::JoinHandle;
use futures::future::{AbortHandle, Abortable, Aborted};
use std::sync::Arc;
use std::time::Instant;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RunnerKind {
Source,
Operator,
Sink,
Connector,
}
pub enum RunAction {
RestartRun(Option<Error>),
Stop,
}
pub(crate) struct Runner {
pub(crate) node: Arc<dyn Node>,
pub(crate) run_loop_handle: Option<JoinHandle<Result<Error, Aborted>>>,
pub(crate) run_loop_abort_handle: Option<AbortHandle>,
}
impl Runner {
pub(crate) fn new(node: Arc<dyn Node>) -> Self {
Self {
node,
run_loop_handle: None,
run_loop_abort_handle: None,
}
}
pub(crate) fn start(&mut self) {
if self.is_running() {
log::warn!("Called `start` while node is ALREADY running. Returning.");
return;
}
let node = self.node.clone();
let run_loop = async move {
let mut instant: Instant;
loop {
instant = Instant::now();
log::trace!("Iteration start: {:?}", instant);
if let Err(e) = node.iteration().await {
log::error!("Iteration error: {:?}", e);
return e;
}
log::trace!("iteration took: {}ms", instant.elapsed().as_millis());
async_std::task::yield_now().await;
}
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let handle = async_std::task::spawn(Abortable::new(run_loop, abort_registration));
self.run_loop_handle = Some(handle);
self.run_loop_abort_handle = Some(abort_handle);
}
pub(crate) async fn stop(&mut self) -> ZFResult<()> {
if !self.is_running() {
log::warn!("Called `stop` while node is NOT running. Returning.");
return Ok(()); }
if let Some(abort_handle) = self.run_loop_abort_handle.take() {
abort_handle.abort();
if let Some(handle) = self.run_loop_handle.take() {
log::trace!("Handler finished with {:?}", handle.await);
}
}
Ok(())
}
pub(crate) fn is_running(&self) -> bool {
self.run_loop_handle.is_some()
}
}