use async_trait::async_trait;
use super::node_context::{LeafContext, NodeContext};
use super::{FlowNode, LeafNode};
use crate::error::GraphError;
use crate::event::{BarrierDecision, BarrierId};
use crate::state::workflow_state::WorkflowState;
use crate::state::{State, StateMutation};
#[derive(Debug, Clone, Default)]
pub enum BarrierDefaultAction {
#[default]
Reject,
Approve,
Skip,
}
#[derive(Debug, Clone)]
pub struct BarrierNode<S: WorkflowState = State> {
pub name: String,
pub timeout: Option<std::time::Duration>,
pub default_action: BarrierDefaultAction,
pub reject_key: String,
pub approve_key: String,
_phantom: std::marker::PhantomData<S>,
}
impl<S: WorkflowState<Mutation = StateMutation>> BarrierNode<S> {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
Self {
name: name.clone(),
timeout: None,
default_action: BarrierDefaultAction::default(),
reject_key: format!("{name}.reject_reason"),
approve_key: format!("{name}.approved"),
_phantom: std::marker::PhantomData,
}
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
self.default_action = action;
self
}
pub fn reject_key(mut self, key: impl Into<String>) -> Self {
self.reject_key = key.into();
self
}
pub fn approve_key(mut self, key: impl Into<String>) -> Self {
self.approve_key = key.into();
self
}
pub fn apply_decision_to_ctx(&self, ctx: &mut NodeContext<'_, S>, decision: BarrierDecision) {
match decision {
BarrierDecision::Approve => {
tracing::info!(barrier = %self.name, "approved");
ctx.record(StateMutation::Put(
self.approve_key.clone(),
serde_json::json!(true),
));
ctx.record(StateMutation::Delete(self.reject_key.clone()));
}
BarrierDecision::Reject { reason } => {
tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
ctx.record(StateMutation::Put(
self.reject_key.clone(),
serde_json::json!(reason),
));
ctx.record(StateMutation::Delete(self.approve_key.clone()));
}
BarrierDecision::Modify { key, value } => {
tracing::info!(barrier = %self.name, key = %key, "state modified");
ctx.record(StateMutation::Put(key, value));
}
BarrierDecision::Reroute { target } => {
tracing::info!(barrier = %self.name, target = %target, "rerouted");
ctx.goto(&target);
}
}
}
}
#[async_trait]
impl<S: WorkflowState> LeafNode<S> for BarrierNode<S> {
async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError> {
let barrier_id = BarrierId::new(&self.name, 0);
ctx.pause(barrier_id, self.timeout);
ctx.set_has_side_effects();
Ok(())
}
}
#[async_trait]
impl<S: WorkflowState> FlowNode<S> for BarrierNode<S> {
async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
let barrier_id = BarrierId::new(&self.name, 0);
ctx.pause(barrier_id, self.timeout);
ctx.set_has_side_effects();
Ok(())
}
}