1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
//! # Flow Node
use crate::activity;
use crate::bpmn::schema::{
    ActivityType, DocumentElement, Element, EndEvent, EventBasedGateway, ExclusiveGateway,
    FlowNodeType, InclusiveGateway, IntermediateCatchEvent, IntermediateThrowEvent,
    ParallelGateway, ScriptTask, SequenceFlow, StartEvent,
};
use crate::event::{end_event, intermediate_catch_event, intermediate_throw_event, start_event};
use crate::gateway;
use crate::process::{self};
use factory::ParameterizedFactory;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::marker::PhantomData;

use thiserror::Error;

/// Flow node state
///
/// ## Notes
///
/// All flow nodes' state is combined into one "big" enum so that [`FlowNode`] doesn't need to have
/// any associated types (which makes the final type be sized differently, and this makes it
/// problematic for runtime dispatching.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum State {
    StartEvent(start_event::State),
    EndEvent(end_event::State),
    IntermediateThrowEvent(intermediate_throw_event::State),
    IntermediateCatchEvent(intermediate_catch_event::State),
    ParallelGateway(gateway::parallel::State),
    ExclusiveGateway(gateway::exclusive::State),
    InclusiveGateway(gateway::inclusive::State),
    EventBasedGateway(gateway::event_based::State),
    ScriptTask(activity::script_task::State),
    ActivityState(activity::State),
}

/// State handling errors
#[derive(Error, Debug)]
pub enum StateError {
    /// Invalid state variant. A different variant was expected.
    #[error("invalid state variant")]
    InvalidVariant,
}

pub type IncomingIndex = usize;
pub type OutgoingIndex = usize;

/// Hard-coded size limit for [`smallvec::SmallVec`]`<[IncomingIndex; _]>`
///
/// This is a default expectation for maintaining small arrays of incomings
/// that can grow into a heap allocation if it gets over it.
///
/// It's chosen as a somewhat arbitrary guess for what can constitute a "normal" flow count.
pub const SMALL_INCOMING: usize = 8;

/// Hard-coded size limit for [`smallvec::SmallVec`]`<[OutgoingIndex; _]>`
///
/// This is a default expectation for maintaining small arrays of outgoings
/// that can grow into a heap allocation if it gets over it.
///
/// It's chosen as a somewhat arbitrary guess for what can constitute a "normal" flow count.
pub const SMALL_OUTGOING: usize = 8;

/// Determination of next action by flow nodes
#[derive(Debug)]
pub enum Action {
    /// Check whether given outgoings will flow
    ///
    /// This is useful if the flow node needs to know whether certain outgoings
    /// *will flow.
    ProbeOutgoingSequenceFlows(SmallVec<[OutgoingIndex; SMALL_OUTGOING]>),
    /// Enact flow through given outgoings
    ///
    /// This action will still check whether given outgoings *can* flow.
    Flow(SmallVec<[OutgoingIndex; SMALL_OUTGOING]>),
    /// Mark flow node as complete, no further action necessary.
    Complete,
}

/// Flow node
///
/// Flow node type should also implement [`futures::stream::Stream`] with `Item` set to [`Action`].
pub trait FlowNode: Stream<Item = Action> + Send + Unpin {
    /// Sets durable state
    ///
    /// If the state variant is incorect, [`StateError`] error should be returned.
    fn set_state(&mut self, state: State) -> Result<(), StateError>;

    /// Gets durable state
    ///
    /// The reason why it's mutable is that in some cases some activites might want to change their
    /// data or simply get mutable access to it during state retrieval
    fn get_state(&mut self) -> State;

    /// Sets process handle
    ///
    /// This allows flow nodes to access the process they are running in.
    ///
    /// Default implementation does nothing.
    #[allow(unused_variables)]
    fn set_process(&mut self, process: process::Handle) {}

    /// Reports outgoing sequence flow processing
    ///
    /// If condition was not present or it returned a truthful result, `condition_result`
    /// will be set to `true`, otherwise it will be zero.
    ///
    /// This allows flow nodes to make further decisions after each sequence flow processing.
    /// Flow node will be polled after this.
    ///
    /// Default implementation does nothing.
    #[allow(unused_variables)]
    fn sequence_flow(
        &mut self,
        outgoing: OutgoingIndex,
        sequence_flow: &SequenceFlow,
        condition_result: bool,
    ) {
    }

    /// Maps outgoing node's action to a new action (or inaction)
    ///
    /// This is useful for nodes with more complex processing (for example, event-based gateway)
    /// that need to handle the result action of the outgoing node.
    ///
    /// Returning `None` will mean that the action has to be dropped, returning `Some(action)` will
    /// replace the original action with the returned one in the flow.
    ///
    /// Default implementation does nothing (returns the same action)
    #[allow(unused_variables)]
    fn handle_outgoing_action(
        &mut self,
        index: OutgoingIndex,
        action: Option<Action>,
    ) -> Option<Option<Action>> {
        Some(action)
    }

    /// Reports incoming sequence flow
    ///
    /// Default implementation does nothing.
    #[allow(unused_variables)]
    fn incoming(&mut self, index: IncomingIndex) {}

    /// Reports token count at ingress.
    ///
    /// Useful for complex flow node behaviours where it needs to know how many outstanding tokens
    /// there are.
    ///
    /// Default implementation does nothing.
    #[allow(unused_variables)]
    fn tokens(&mut self, count: usize) {}

    /// Returns a flow element
    fn element(&self) -> Box<dyn FlowNodeType>;
}

pub(crate) fn new(element: Box<dyn DocumentElement>) -> Option<Box<dyn FlowNode>> {
    let e = element.element();
    match e {
        Element::StartEvent => make::<StartEvent, start_event::StartEvent>(element),
        Element::EndEvent => make::<EndEvent, end_event::EndEvent>(element),
        Element::IntermediateThrowEvent => make::<
            IntermediateThrowEvent,
            intermediate_throw_event::IntermediateThrowEvent,
        >(element),
        Element::IntermediateCatchEvent => make::<
            IntermediateCatchEvent,
            intermediate_catch_event::IntermediateCatchEvent,
        >(element),
        Element::ParallelGateway => make::<ParallelGateway, gateway::parallel::Gateway>(element),
        Element::ExclusiveGateway => make::<ExclusiveGateway, gateway::exclusive::Gateway>(element),
        Element::InclusiveGateway => make::<InclusiveGateway, gateway::inclusive::Gateway>(element),
        Element::EventBasedGateway => {
            make::<EventBasedGateway, gateway::event_based::Gateway>(element)
        }
        Element::ScriptTask => make_activity::<ScriptTask, activity::script_task::Task>(element),
        _ => None,
    }
}

fn make<E, F>(element: Box<dyn DocumentElement>) -> Option<Box<dyn FlowNode>>
where
    E: DocumentElement + FlowNodeType + Clone + Default,
    F: 'static + From<E> + FlowNode,
{
    element
        .downcast::<E>()
        .ok()
        .map(|e| Box::new(F::from((*e).clone())) as Box<dyn FlowNode>)
}

struct ActivityFactory<F, E>(PhantomData<(F, E)>)
where
    E: DocumentElement + ActivityType + Clone + Default + Unpin,
    F: 'static + From<E> + activity::Activity;

impl<F, E> Clone for ActivityFactory<F, E>
where
    E: DocumentElement + ActivityType + Clone + Default + Unpin,
    F: 'static + From<E> + activity::Activity,
{
    fn clone(&self) -> Self {
        ActivityFactory::<F, E>(PhantomData)
    }
}

impl<F, E> ParameterizedFactory for ActivityFactory<F, E>
where
    E: DocumentElement + ActivityType + Clone + Default + Unpin,
    F: 'static + From<E> + activity::Activity,
{
    type Item = F;
    type Parameter = E;
    fn create(&self, param: Self::Parameter) -> Self::Item {
        F::from(param)
    }
}

fn make_activity<E, F>(element: Box<dyn DocumentElement>) -> Option<Box<dyn FlowNode>>
where
    E: DocumentElement + ActivityType + Clone + Default + Unpin,
    F: 'static + From<E> + activity::Activity,
{
    if let Ok(e) = element.downcast::<E>() {
        Some(Box::new(activity::ActivityContainer::new(
            (*e).clone(),
            ActivityFactory::<F, E>(PhantomData),
        )) as Box<dyn FlowNode>)
    } else {
        None
    }
}