use crate::bpmn::schema::{Definitions, RootElement};
use crate::language::MultiLanguageEngine;
use crate::process;
use factory::Factory;
use futures::future::join_all;
use crate::sys::task::{self, JoinHandle};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, oneshot};
#[derive(Error, Debug)]
pub enum Error {
#[error("error while receiving response to a request: {error:?}")]
ResponseRecvError {
#[from]
error: oneshot::error::RecvError,
},
}
pub trait LanguageEngineFactory: Factory<Item = MultiLanguageEngine> + Send + Sync {}
pub struct FnLanguageEngineFactory<F>(pub F)
where
F: Fn() -> MultiLanguageEngine + Send + Sync;
impl<F> Factory for FnLanguageEngineFactory<F>
where
F: Fn() -> MultiLanguageEngine + Send + Sync,
{
type Item = MultiLanguageEngine;
fn create(&self) -> Self::Item {
self.0()
}
}
impl<F> LanguageEngineFactory for FnLanguageEngineFactory<F> where
F: Fn() -> MultiLanguageEngine + Send + Sync
{
}
pub struct DefaultLanguageEngineFactory;
impl Factory for DefaultLanguageEngineFactory {
type Item = MultiLanguageEngine;
fn create(&self) -> Self::Item {
MultiLanguageEngine::new()
}
}
impl LanguageEngineFactory for DefaultLanguageEngineFactory {}
pub trait ExpressionEngineFactory<T>: Factory<Item = MultiLanguageEngine> + Send + Sync
where
T: Send + Sync + 'static,
{
}
pub struct Model<ScriptEngine, ExpressionEngine>
where
ScriptEngine: LanguageEngineFactory,
ExpressionEngine: LanguageEngineFactory,
{
definitions: Arc<Definitions>,
processes: Vec<process::Handle>,
script_engine_factory: Option<ScriptEngine>,
expression_engine_factory: Option<ExpressionEngine>,
}
#[derive(Clone)]
pub struct Handle {
definitions: Arc<Definitions>,
sender: mpsc::Sender<Request>,
log_broadcast: broadcast::Sender<Log>,
script_engine_factory: Arc<Box<dyn LanguageEngineFactory>>,
expression_engine_factory: Arc<Box<dyn LanguageEngineFactory>>,
}
#[derive(Clone, Debug)]
pub enum Log {}
enum Request {
JoinHandle(JoinHandle<()>),
Terminate(oneshot::Sender<Option<JoinHandle<()>>>),
Processes(oneshot::Sender<Vec<process::Handle>>),
}
impl Model<DefaultLanguageEngineFactory, DefaultLanguageEngineFactory> {
pub fn new(definitions: Definitions) -> Self {
Self {
definitions: Arc::new(definitions),
processes: vec![],
script_engine_factory: Some(DefaultLanguageEngineFactory),
expression_engine_factory: Some(DefaultLanguageEngineFactory),
}
}
}
impl<ScriptEngine, ExpressionEngine> Model<ScriptEngine, ExpressionEngine>
where
ScriptEngine: LanguageEngineFactory + 'static,
ExpressionEngine: LanguageEngineFactory + 'static,
{
pub fn with_script_engine_factory<Factory>(
self,
script_engine_factory: Factory,
) -> Model<Factory, ExpressionEngine>
where
Factory: LanguageEngineFactory,
{
Model {
script_engine_factory: Some(script_engine_factory),
expression_engine_factory: self.expression_engine_factory,
definitions: self.definitions,
processes: self.processes,
}
}
pub fn with_expression_engine_factory<Factory>(
self,
expression_engine_factory: Factory,
) -> Model<ScriptEngine, Factory>
where
Factory: LanguageEngineFactory,
{
Model {
script_engine_factory: self.script_engine_factory,
expression_engine_factory: Some(expression_engine_factory),
definitions: self.definitions,
processes: self.processes,
}
}
pub async fn spawn(mut self) -> Handle {
let (sender, receiver) = mpsc::channel(1);
let (log_broadcast, _) = broadcast::channel(128);
let log_sender = log_broadcast.clone();
let handle = Handle {
definitions: self.definitions.clone(),
sender: sender.clone(),
log_broadcast,
script_engine_factory: Arc::new(Box::new(self.script_engine_factory.take().unwrap())),
expression_engine_factory: Arc::new(Box::new(
self.expression_engine_factory.take().unwrap(),
)),
};
let handle_clone = handle.clone();
let join_handle =
task::spawn(async move { self.runner(receiver, handle_clone, log_sender).await });
let _ = sender.send(Request::JoinHandle(join_handle)).await;
handle
}
async fn runner(
mut self,
mut receiver: mpsc::Receiver<Request>,
handle: Handle,
_log_broadcast: broadcast::Sender<Log>,
) {
let mut join_handle = None;
self.processes = join_all(
self.definitions
.root_elements
.iter()
.filter_map(|e| match e {
RootElement::Process(def) => {
Some(process::Process::new(def.clone(), handle.clone()).spawn())
}
_ => None,
}),
)
.await;
loop {
let next = receiver.recv().await;
match next {
Some(Request::JoinHandle(handle)) => join_handle = Some(handle),
Some(Request::Terminate(sender)) => {
for process in self.processes.drain(..) {
let _ = process.terminate().await;
}
let _ = sender.send(join_handle.take());
return;
}
Some(Request::Processes(sender)) => {
let _ = sender.send(self.processes.clone());
}
None => {}
}
}
}
}
impl Handle {
pub async fn terminate(self) {
let (sender, receiver) = oneshot::channel();
let _ = self.sender.send(Request::Terminate(sender)).await;
if let Ok(Some(handle)) = receiver.await {
let _ = handle.await;
}
}
pub fn definitions(&self) -> Arc<Definitions> {
self.definitions.clone()
}
pub fn log_receiver(&self) -> broadcast::Receiver<Log> {
self.log_broadcast.subscribe()
}
pub async fn processes(&self) -> Result<Vec<process::Handle>, Error> {
let (sender, receiver) = oneshot::channel();
let _ = self.sender.send(Request::Processes(sender)).await;
Ok(receiver.await?)
}
pub fn script_engine_factory(&self) -> Arc<Box<dyn LanguageEngineFactory>> {
self.script_engine_factory.clone()
}
pub fn expression_engine_factory(&self) -> Arc<Box<dyn LanguageEngineFactory>> {
self.expression_engine_factory.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bpmn::schema::*;
use bpxe_internal_macros as bpxe_im;
#[bpxe_im::test]
async fn spawn_and_terminate() {
let model = Model::new(Default::default());
let handle = model.spawn().await;
handle.terminate().await;
}
#[bpxe_im::test]
async fn list_processes() {
let definitions = Definitions {
root_elements: vec![
Process {
id: Some("proc1".into()),
..Default::default()
}
.into(),
Process {
id: Some("proc2".into()),
..Default::default()
}
.into(),
],
..Default::default()
};
let model = Model::new(definitions);
let handle = model.spawn().await;
let mut process_names: Vec<String> = handle
.processes()
.await
.unwrap()
.into_iter()
.map(|proc| proc.element().id.as_ref().unwrap().clone())
.collect();
process_names.sort();
assert_eq!(process_names, vec!["proc1", "proc2"]);
}
}