lellm_graph/
barrier_node.rs1use async_trait::async_trait;
7
8use crate::delta::StateDelta;
9use crate::error::{GraphError, TerminalError};
10use crate::event::{BarrierDecision, BarrierId, GraphEvent};
11use crate::ids::SpanId;
12use crate::node::{FlowNode, NextStep, NodeMetadata, NodeOutput, StreamNodeResult};
13use crate::state::State;
14
15#[derive(Debug, Clone, Default)]
17pub enum BarrierDefaultAction {
18 #[default]
20 Reject,
21 Approve,
23 Skip,
25}
26
27#[derive(Debug, Clone)]
36pub struct BarrierNode {
37 pub name: String,
38 pub timeout: Option<std::time::Duration>,
40 pub default_action: BarrierDefaultAction,
42 pub reject_key: String,
44 pub approve_key: String,
46}
47
48impl BarrierNode {
49 pub fn new(name: impl Into<String>) -> Self {
50 let name = name.into();
51 Self {
52 name: name.clone(),
53 timeout: None,
54 default_action: BarrierDefaultAction::default(),
55 reject_key: format!("{name}.reject_reason"),
56 approve_key: format!("{name}.approved"),
57 }
58 }
59
60 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
62 self.timeout = Some(timeout);
63 self
64 }
65
66 pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
68 self.default_action = action;
69 self
70 }
71
72 pub fn reject_key(mut self, key: impl Into<String>) -> Self {
74 self.reject_key = key.into();
75 self
76 }
77
78 pub fn approve_key(mut self, key: impl Into<String>) -> Self {
80 self.approve_key = key.into();
81 self
82 }
83
84 pub fn apply_decision(&self, decision: BarrierDecision) -> (NextStep, Vec<StateDelta>) {
88 match decision {
89 BarrierDecision::Approve => {
90 tracing::info!(barrier = %self.name, "approved");
91 let deltas = vec![
92 StateDelta::put(&self.approve_key, serde_json::json!(true)),
93 StateDelta::delete(&self.reject_key),
94 ];
95 (NextStep::GoToNext, deltas)
96 }
97 BarrierDecision::Reject { reason } => {
98 tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
99 let deltas = vec![
100 StateDelta::put(&self.reject_key, serde_json::json!(reason)),
101 StateDelta::delete(&self.approve_key),
102 ];
103 (NextStep::GoToNext, deltas)
104 }
105 BarrierDecision::Modify { key, value } => {
106 tracing::info!(barrier = %self.name, key = %key, "state modified");
107 let deltas = vec![StateDelta::put(key, value)];
108 (NextStep::GoToNext, deltas)
109 }
110 BarrierDecision::Reroute { target } => {
111 tracing::info!(barrier = %self.name, target = %target, "rerouted");
112 (NextStep::Goto(target), vec![])
113 }
114 }
115 }
116}
117
118#[async_trait]
119impl FlowNode for BarrierNode {
120 async fn execute(&self, _state: &State) -> Result<NodeOutput, GraphError> {
122 Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
123 "BarrierNode '{}' requires stream mode. Use GraphExecutor::execute_stream() for human-in-the-loop.",
124 self.name
125 ))))
126 }
127
128 async fn execute_stream(
130 &self,
131 _state: &State,
132 _sink: &tokio::sync::mpsc::Sender<GraphEvent>,
133 span_id: SpanId,
134 ) -> Result<StreamNodeResult, GraphError> {
135 let node_name = self.name.clone();
136
137 let barrier_id = BarrierId::new(&node_name, 0);
140
141 Ok(StreamNodeResult::Pause {
143 deltas: vec![],
144 barrier_id,
145 node_name,
146 span_id,
147 timeout: self.timeout,
148 default_action: self.default_action.clone(),
149 })
150 }
151
152 fn metadata_hint(&self) -> NodeMetadata {
153 NodeMetadata {
155 token_cost: 0.0,
156 has_side_effects: true, }
158 }
159}