lellm_graph/
barrier_node.rs1use async_trait::async_trait;
7
8use crate::error::{GraphError, TerminalError};
9use crate::event::{BarrierDecision, BarrierId, GraphEvent, SpanId};
10use crate::node::{GraphNode, NextStep, StreamNodeResult};
11use crate::state::State;
12
13#[derive(Debug, Clone, Default)]
15pub enum BarrierDefaultAction {
16 #[default]
18 Reject,
19 Approve,
21 Skip,
23}
24
25pub struct BarrierNode {
34 pub name: String,
35 pub timeout: Option<std::time::Duration>,
37 pub default_action: BarrierDefaultAction,
39 pub reject_key: String,
41 pub approve_key: String,
43}
44
45impl BarrierNode {
46 pub fn new(name: impl Into<String>) -> Self {
47 let name = name.into();
48 Self {
49 name: name.clone(),
50 timeout: None,
51 default_action: BarrierDefaultAction::default(),
52 reject_key: format!("{name}.reject_reason"),
53 approve_key: format!("{name}.approved"),
54 }
55 }
56
57 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
59 self.timeout = Some(timeout);
60 self
61 }
62
63 pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
65 self.default_action = action;
66 self
67 }
68
69 pub fn reject_key(mut self, key: impl Into<String>) -> Self {
71 self.reject_key = key.into();
72 self
73 }
74
75 pub fn approve_key(mut self, key: impl Into<String>) -> Self {
77 self.approve_key = key.into();
78 self
79 }
80
81 pub fn apply_decision(
85 &self,
86 decision: BarrierDecision,
87 state: &mut State,
88 ) -> Result<NextStep, GraphError> {
89 match decision {
90 BarrierDecision::Approve => {
91 tracing::info!(barrier = %self.name, "approved");
92 state.insert(self.approve_key.clone(), serde_json::json!(true));
93 state.remove(&self.reject_key);
94 Ok(NextStep::GoToNext)
95 }
96 BarrierDecision::Reject { reason } => {
97 tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
98 state.insert(self.reject_key.clone(), serde_json::json!(reason));
99 state.remove(&self.approve_key);
100 Ok(NextStep::GoToNext)
101 }
102 BarrierDecision::Modify { key, value } => {
103 tracing::info!(barrier = %self.name, key = %key, "state modified");
104 state.insert(key, value);
105 Ok(NextStep::GoToNext)
106 }
107 BarrierDecision::Reroute { target } => {
108 tracing::info!(barrier = %self.name, target = %target, "rerouted");
109 Ok(NextStep::Goto(target))
110 }
111 }
112 }
113}
114
115#[async_trait]
116impl GraphNode for BarrierNode {
117 async fn execute(&self, _state: &mut State) -> Result<NextStep, GraphError> {
119 Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
120 "BarrierNode '{}' requires stream mode. Use GraphExecutor::execute_stream() for human-in-the-loop.",
121 self.name
122 ))))
123 }
124
125 async fn execute_stream(
127 &self,
128 _state: &mut State,
129 _sink: &tokio::sync::mpsc::Sender<GraphEvent>,
130 span_id: SpanId,
131 ) -> Result<StreamNodeResult, GraphError> {
132 let node_name = self.name.clone();
133
134 let barrier_id = BarrierId::new(&node_name, 0);
137
138 Ok(StreamNodeResult::BarrierPaused {
140 barrier_id,
141 node_name,
142 span_id,
143 timeout: self.timeout,
144 default_action: self.default_action.clone(),
145 })
146 }
147}