use crate::bpmn::schema::{FlowNodeType, ParallelGateway as Element};
use crate::flow_node::{self, Action, FlowNode, IncomingIndex};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
pub struct Gateway {
element: Arc<Element>,
state: State,
waker: Option<Waker>,
}
impl Gateway {
pub fn new(element: Element) -> Self {
let element = Arc::new(element);
let number_of_incomings = element.incomings.len();
Self {
element,
state: State::Ready {
incoming_completed: smallvec![false; number_of_incomings],
},
waker: None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum State {
Ready {
incoming_completed: SmallVec<[bool; flow_node::SMALL_INCOMING]>,
},
Complete,
}
impl FlowNode for Gateway {
fn set_state(&mut self, state: flow_node::State) -> Result<(), flow_node::StateError> {
match state {
flow_node::State::ParallelGateway(state) => {
self.state = state;
Ok(())
}
_ => Err(flow_node::StateError::InvalidVariant),
}
}
fn get_state(&mut self) -> flow_node::State {
flow_node::State::ParallelGateway(self.state.clone())
}
fn element(&self) -> Box<dyn FlowNodeType> {
Box::new(self.element.as_ref().clone())
}
fn incoming(&mut self, index: IncomingIndex) {
if let State::Ready {
ref mut incoming_completed,
} = self.state
{
incoming_completed[index] = true;
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}
}
impl From<Element> for Gateway {
fn from(element: Element) -> Self {
Self::new(element)
}
}
impl Stream for Gateway {
type Item = Action;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.state {
State::Ready {
ref incoming_completed,
} => {
if incoming_completed.iter().all(|c| *c) {
self.state = State::Complete;
Poll::Ready(Some(Action::Flow(
(0..self.element().outgoings().len()).collect(),
)))
} else {
self.waker.replace(cx.waker().clone());
Poll::Pending
}
}
State::Complete => {
self.state = State::Ready {
incoming_completed: smallvec![false; self.element().incomings().len()],
};
Poll::Pending
}
}
}
}
#[cfg(test)]
mod tests {
use crate::bpmn::parse;
use crate::event::ProcessEvent;
use crate::model;
use crate::process::Log;
use crate::test::*;
use bpxe_internal_macros as bpxe_im;
#[bpxe_im::test]
async fn fork() {
let definitions = parse(include_str!("test_models/parallel_fork.bpmn")).unwrap();
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let mut mailbox = Mailbox::new(handle.event_receiver());
assert!(handle.start().await.is_ok());
for _ in 0..2u8 {
assert!(
mailbox
.receive(|e| matches!(e, ProcessEvent::NoneEvent))
.await
);
}
model.terminate().await;
}
#[bpxe_im::test]
async fn join() {
let definitions = parse(include_str!("test_models/parallel_join.bpmn")).unwrap();
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let mut mailbox = Mailbox::new(handle.event_receiver());
assert!(handle.start().await.is_ok());
for _ in 0..3u8 {
assert!(
mailbox
.receive(|e| matches!(e, ProcessEvent::NoneEvent))
.await
);
}
model.terminate().await;
}
#[bpxe_im::test]
async fn not_enough_to_join() {
let definitions =
parse(include_str!("test_models/parallel_not_enough_to_join.bpmn")).unwrap();
let model = model::Model::new(definitions).spawn().await;
let handle = model.processes().await.unwrap().pop().unwrap();
let mut mailbox = Mailbox::new(handle.event_receiver());
let mut log_mailbox = Mailbox::new(handle.log_receiver());
assert!(handle.start().await.is_ok());
for _ in 0..2u8 {
assert!(
mailbox
.receive(|e| matches!(e, ProcessEvent::NoneEvent))
.await
);
}
assert!(
log_mailbox
.receive(|log| matches!(log, Log::FlowNodeIncoming { node, .. }
if node.id().as_ref().unwrap() == "dead_end"))
.await
);
assert!(
expects_timeout(mailbox.receive(|e| matches!(e, ProcessEvent::NoneEvent)))
.await
.is_ok()
);
model.terminate().await;
}
}