use crate::bpmn::schema::{FlowNodeType, Process as Element};
use crate::data_object::DataObject;
use crate::event::ProcessEvent as Event;
use crate::flow_node;
use crate::model;
use crate::sys::task::{self, JoinHandle};
use serde::Serialize;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
mod scheduler;
use scheduler::Scheduler;
#[derive(Error, Debug, PartialEq)]
pub enum StartError {
#[error("no startEvent element found")]
NoStartEvent,
#[error("response has not been received")]
NotReceived,
}
pub struct Process {
element: Arc<Element>,
model: model::Handle,
}
#[derive(Clone)]
pub struct Handle {
model: model::Handle,
element: Arc<Element>,
sender: mpsc::Sender<Request>,
log_broadcast: broadcast::Sender<Log>,
event_broadcast: broadcast::Sender<Event>,
}
pub type DataObjectContainer = Arc<RwLock<Box<dyn DataObject>>>;
#[derive(Error, Debug, PartialEq)]
pub enum DataObjectError {
#[error("data object not found")]
NotFound,
#[error("response has not been received")]
NotReceived,
}
pub(crate) enum Request {
JoinHandle(JoinHandle<()>),
Terminate(oneshot::Sender<Option<JoinHandle<()>>>),
Start(oneshot::Sender<Result<(), StartError>>),
DataObject(
String,
oneshot::Sender<Result<DataObjectContainer, DataObjectError>>,
),
}
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum Log {
FlowNodeIncoming {
#[serde(serialize_with = "crate::serde::serialize_flow_node")]
node: Box<dyn FlowNodeType>,
incoming_index: flow_node::IncomingIndex,
},
FlowNodeCompleted {
#[serde(serialize_with = "crate::serde::serialize_flow_node")]
node: Box<dyn FlowNodeType>,
},
#[cfg(test)]
FlowNodeTokens {
#[serde(serialize_with = "crate::serde::serialize_flow_node")]
node: Box<dyn FlowNodeType>,
count: usize,
},
NoDefaultPath {
#[serde(serialize_with = "crate::serde::serialize_flow_node")]
node: Box<dyn FlowNodeType>,
},
ExpressionError { error: String },
ScriptError { error: String },
Done,
}
impl Process {
pub fn new(element: Element, model: model::Handle) -> Self {
Self {
element: Arc::new(element),
model,
}
}
pub async fn spawn(self) -> Handle {
let (sender, receiver) = mpsc::channel(1);
let (log_broadcast, _) = broadcast::channel(128);
let (event_broadcast, _) = broadcast::channel(128);
let element = self.element.clone();
let handle = Handle {
sender: sender.clone(),
model: self.model.clone(),
log_broadcast,
event_broadcast,
element,
};
let scheduler = Scheduler::new(receiver, handle.clone());
let join_handle = task::spawn(async move { scheduler.run().await });
let _ = sender.send(Request::JoinHandle(join_handle)).await;
handle
}
}
impl Handle {
pub async fn terminate(self) {
let (sender, receiver) = oneshot::channel();
let _ = self.sender.send(Request::Terminate(sender)).await;
if let Ok(Some(handle)) = receiver.await {
let _ = handle.await;
}
}
pub async fn start(&self) -> Result<(), StartError> {
let (sender, receiver) = oneshot::channel();
let _ = self.sender.send(Request::Start(sender)).await;
if let Ok(result) = receiver.await {
result
} else {
Err(StartError::NotReceived)
}
}
pub fn model(&self) -> model::Handle {
self.model.clone()
}
pub fn log_receiver(&self) -> broadcast::Receiver<Log> {
self.log_broadcast.subscribe()
}
pub fn log_broadcast(&self) -> broadcast::Sender<Log> {
self.log_broadcast.clone()
}
pub fn element(&self) -> Arc<Element> {
self.element.clone()
}
pub fn event_receiver(&self) -> broadcast::Receiver<Event> {
self.event_broadcast.subscribe()
}
pub fn event_broadcast(&self) -> broadcast::Sender<Event> {
self.event_broadcast.clone()
}
pub async fn data_object(&self, id: &str) -> Result<DataObjectContainer, DataObjectError> {
let (sender, receiver) = oneshot::channel();
let _ = self
.sender
.send(Request::DataObject(id.to_owned(), sender))
.await;
if let Ok(result) = receiver.await {
result
} else {
Err(DataObjectError::NotReceived)
}
}
}
#[cfg(test)]
mod tests {
use super::{Log, StartError};
use crate::bpmn::parse;
use crate::bpmn::schema::*;
use crate::model;
use crate::test::*;
use bpxe_internal_macros as bpxe_im;
#[bpxe_im::test]
async fn no_start_event() {
let definitions = Definitions {
root_elements: vec![Process {
id: Some("proc1".into()),
..Default::default()
}
.into()],
..Default::default()
};
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
assert_eq!(handle.start().await, Err::<(), _>(StartError::NoStartEvent));
model.terminate().await;
}
#[bpxe_im::test]
async fn single_start_event() {
let definitions = Definitions {
root_elements: vec![Process {
id: Some("proc1".into()),
flow_elements: vec![StartEvent {
id: Some("start".into()),
..Default::default()
}
.into()],
..Default::default()
}
.into()],
..Default::default()
};
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let mut mailbox = Mailbox::new(handle.log_receiver());
assert!(handle.start().await.is_ok());
assert!(
mailbox
.receive(|e| if let Log::FlowNodeCompleted { node } = e {
matches!(node.downcast_ref::<StartEvent>(),
Some(start_event) if start_event.id().as_ref().unwrap() == "start")
} else {
false
})
.await
);
model.terminate().await;
}
#[bpxe_im::test]
async fn multiple_start_events() {
let definitions = Definitions {
root_elements: vec![Process {
id: Some("proc1".into()),
flow_elements: vec![
StartEvent {
id: Some("start1".into()),
..Default::default()
}
.into(),
StartEvent {
id: Some("start2".into()),
..Default::default()
}
.into(),
],
..Default::default()
}
.into()],
..Default::default()
};
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let mut mailbox = Mailbox::new(handle.log_receiver());
assert!(handle.start().await.is_ok());
assert!(
mailbox
.receive(|e| if let Log::FlowNodeCompleted { node } = e {
matches!(node.downcast_ref::<StartEvent>(),
Some(start_event) if start_event.id().as_ref().unwrap() == "start1")
} else {
false
})
.await
);
assert!(
mailbox
.receive(|e| if let Log::FlowNodeCompleted { node } = e {
matches!(node.downcast_ref::<StartEvent>(),
Some(start_event) if start_event.id().as_ref().unwrap() == "start2")
} else {
false
})
.await
);
model.terminate().await;
}
#[bpxe_im::test]
async fn incoming_log() {
let mut definitions = Definitions {
root_elements: vec![Process {
id: Some("proc1".into()),
flow_elements: vec![
StartEvent {
id: Some("start".into()),
..Default::default()
}
.into(),
EndEvent {
id: Some("end".into()),
..Default::default()
}
.into(),
],
..Default::default()
}
.into()],
..Default::default()
};
definitions
.find_by_id_mut("proc1")
.unwrap()
.downcast_mut::<Process>()
.unwrap()
.establish_sequence_flow("start", "end", "s1", None::<FormalExpression>)
.unwrap();
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let mut mailbox = Mailbox::new(handle.log_receiver());
assert!(handle.start().await.is_ok());
assert!(
mailbox
.receive(|e| {
if let Log::FlowNodeIncoming {
node,
incoming_index: 0,
} = e
{
matches!(node.downcast_ref::<EndEvent>(),
Some(end_event) if end_event.id().as_ref().unwrap() == "end")
} else {
false
}
})
.await
);
model.terminate().await;
}
#[bpxe_im::test]
async fn data_object() {
use crate::data_object;
use serde_json::json;
let definitions = parse(include_str!("process/test_models/data_object.bpmn")).unwrap();
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let data_object = handle.data_object("data_object").await.unwrap();
let read = data_object.read().await;
assert!(read.downcast_ref::<data_object::Empty>().is_some());
drop(read);
let mut write = data_object.write().await;
*write = Box::new(json!({"test": "passed"}));
drop(write);
let data_object = handle.data_object("data_object").await.unwrap();
let read = data_object.read().await;
assert_eq!(
read.downcast_ref::<serde_json::Value>().unwrap(),
&json!({"test": "passed"})
);
let data_object = handle.data_object("DataObject").await.unwrap();
let read = data_object.read().await;
assert_eq!(
read.downcast_ref::<serde_json::Value>().unwrap(),
&json!({"test": "passed"})
);
model.terminate().await;
}
}