use async_trait::async_trait;
use crate::delta::StateDelta;
use crate::error::{GraphError, TerminalError};
use crate::event::{BarrierDecision, BarrierId, GraphEvent};
use crate::ids::SpanId;
use crate::node::{FlowNode, NextStep, NodeMetadata, NodeOutput, StreamNodeResult};
use crate::state::State;
#[derive(Debug, Clone, Default)]
pub enum BarrierDefaultAction {
#[default]
Reject,
Approve,
Skip,
}
#[derive(Debug, Clone)]
pub struct BarrierNode {
pub name: String,
pub timeout: Option<std::time::Duration>,
pub default_action: BarrierDefaultAction,
pub reject_key: String,
pub approve_key: String,
}
impl BarrierNode {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
Self {
name: name.clone(),
timeout: None,
default_action: BarrierDefaultAction::default(),
reject_key: format!("{name}.reject_reason"),
approve_key: format!("{name}.approved"),
}
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
self.default_action = action;
self
}
pub fn reject_key(mut self, key: impl Into<String>) -> Self {
self.reject_key = key.into();
self
}
pub fn approve_key(mut self, key: impl Into<String>) -> Self {
self.approve_key = key.into();
self
}
pub fn apply_decision(&self, decision: BarrierDecision) -> (NextStep, Vec<StateDelta>) {
match decision {
BarrierDecision::Approve => {
tracing::info!(barrier = %self.name, "approved");
let deltas = vec![
StateDelta::put(&self.approve_key, serde_json::json!(true)),
StateDelta::delete(&self.reject_key),
];
(NextStep::GoToNext, deltas)
}
BarrierDecision::Reject { reason } => {
tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
let deltas = vec![
StateDelta::put(&self.reject_key, serde_json::json!(reason)),
StateDelta::delete(&self.approve_key),
];
(NextStep::GoToNext, deltas)
}
BarrierDecision::Modify { key, value } => {
tracing::info!(barrier = %self.name, key = %key, "state modified");
let deltas = vec![StateDelta::put(key, value)];
(NextStep::GoToNext, deltas)
}
BarrierDecision::Reroute { target } => {
tracing::info!(barrier = %self.name, target = %target, "rerouted");
(NextStep::Goto(target), vec![])
}
}
}
}
#[async_trait]
impl FlowNode for BarrierNode {
async fn execute(&self, _state: &State) -> Result<NodeOutput, GraphError> {
Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
"BarrierNode '{}' requires stream mode. Use GraphExecutor::execute_stream() for human-in-the-loop.",
self.name
))))
}
async fn execute_stream(
&self,
_state: &State,
_sink: &tokio::sync::mpsc::Sender<GraphEvent>,
span_id: SpanId,
) -> Result<StreamNodeResult, GraphError> {
let node_name = self.name.clone();
let barrier_id = BarrierId::new(&node_name, 0);
Ok(StreamNodeResult::Pause {
deltas: vec![],
barrier_id,
node_name,
span_id,
timeout: self.timeout,
default_action: self.default_action.clone(),
})
}
fn metadata_hint(&self) -> NodeMetadata {
NodeMetadata {
token_cost: 0.0,
has_side_effects: true, }
}
}