use async_trait::async_trait;
use crate::error::{GraphError, TerminalError};
use crate::event::{BarrierDecision, BarrierId, GraphEvent, SpanId};
use crate::node::{GraphNode, NextStep, StreamNodeResult};
use crate::state::State;
#[derive(Debug, Clone, Default)]
pub enum BarrierDefaultAction {
#[default]
Reject,
Approve,
Skip,
}
pub struct BarrierNode {
pub name: String,
pub timeout: Option<std::time::Duration>,
pub default_action: BarrierDefaultAction,
pub reject_key: String,
pub approve_key: String,
}
impl BarrierNode {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
Self {
name: name.clone(),
timeout: None,
default_action: BarrierDefaultAction::default(),
reject_key: format!("{name}.reject_reason"),
approve_key: format!("{name}.approved"),
}
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
self.default_action = action;
self
}
pub fn reject_key(mut self, key: impl Into<String>) -> Self {
self.reject_key = key.into();
self
}
pub fn approve_key(mut self, key: impl Into<String>) -> Self {
self.approve_key = key.into();
self
}
pub fn apply_decision(
&self,
decision: BarrierDecision,
state: &mut State,
) -> Result<NextStep, GraphError> {
match decision {
BarrierDecision::Approve => {
tracing::info!(barrier = %self.name, "approved");
state.insert(self.approve_key.clone(), serde_json::json!(true));
state.remove(&self.reject_key);
Ok(NextStep::GoToNext)
}
BarrierDecision::Reject { reason } => {
tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
state.insert(self.reject_key.clone(), serde_json::json!(reason));
state.remove(&self.approve_key);
Ok(NextStep::GoToNext)
}
BarrierDecision::Modify { key, value } => {
tracing::info!(barrier = %self.name, key = %key, "state modified");
state.insert(key, value);
Ok(NextStep::GoToNext)
}
BarrierDecision::Reroute { target } => {
tracing::info!(barrier = %self.name, target = %target, "rerouted");
Ok(NextStep::Goto(target))
}
}
}
}
#[async_trait]
impl GraphNode for BarrierNode {
async fn execute(&self, _state: &mut State) -> Result<NextStep, GraphError> {
Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
"BarrierNode '{}' requires stream mode. Use GraphExecutor::execute_stream() for human-in-the-loop.",
self.name
))))
}
async fn execute_stream(
&self,
_state: &mut State,
_sink: &tokio::sync::mpsc::Sender<GraphEvent>,
span_id: SpanId,
) -> Result<StreamNodeResult, GraphError> {
let node_name = self.name.clone();
let barrier_id = BarrierId::new(&node_name, 0);
Ok(StreamNodeResult::BarrierPaused {
barrier_id,
node_name,
span_id,
timeout: self.timeout,
default_action: self.default_action.clone(),
})
}
}