Skip to main content

lellm_graph/node/
barrier_node.rs

1//! Human-in-the-loop 审批节点。
2//!
3//! v0.4+: 泛型化 `BarrierNode<S: WorkflowState>`。
4//!
5//! # 限制
6//!
7//! `BarrierNode` 要求 `S: WorkflowState<Mutation = StateMutation>`,
8//! 因为 `apply_decision_to_ctx()` 使用 key/value 操作(`Put`/`Delete`)。
9//! 对于 `AgentState` 等类型化状态,请实现自定义 Barrier 节点
10//! 或使用 `BarrierDecision` 枚举配合自定义逻辑。
11
12use async_trait::async_trait;
13
14use super::node_context::{LeafContext, NodeContext};
15use super::{FlowNode, LeafNode};
16use crate::error::GraphError;
17use crate::event::{BarrierDecision, BarrierId};
18use crate::state::workflow_state::WorkflowState;
19use crate::state::{State, StateMutation};
20
21/// Barrier 超时后的默认行为。
22#[derive(Debug, Clone, Default)]
23pub enum BarrierDefaultAction {
24    #[default]
25    Reject,
26    Approve,
27    Skip,
28}
29
30/// Human-in-the-loop 审批节点。
31#[derive(Debug, Clone)]
32pub struct BarrierNode<S: WorkflowState = State> {
33    pub name: String,
34    pub timeout: Option<std::time::Duration>,
35    pub default_action: BarrierDefaultAction,
36    pub reject_key: String,
37    pub approve_key: String,
38    /// Phantom 用于标记泛型类型
39    _phantom: std::marker::PhantomData<S>,
40}
41
42impl<S: WorkflowState<Mutation = StateMutation>> BarrierNode<S> {
43    pub fn new(name: impl Into<String>) -> Self {
44        let name = name.into();
45        Self {
46            name: name.clone(),
47            timeout: None,
48            default_action: BarrierDefaultAction::default(),
49            reject_key: format!("{name}.reject_reason"),
50            approve_key: format!("{name}.approved"),
51            _phantom: std::marker::PhantomData,
52        }
53    }
54
55    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
56        self.timeout = Some(timeout);
57        self
58    }
59
60    pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
61        self.default_action = action;
62        self
63    }
64
65    pub fn reject_key(mut self, key: impl Into<String>) -> Self {
66        self.reject_key = key.into();
67        self
68    }
69
70    pub fn approve_key(mut self, key: impl Into<String>) -> Self {
71        self.approve_key = key.into();
72        self
73    }
74
75    pub fn apply_decision_to_ctx(&self, ctx: &mut NodeContext<'_, S>, decision: BarrierDecision) {
76        match decision {
77            BarrierDecision::Approve => {
78                tracing::info!(barrier = %self.name, "approved");
79                ctx.record(StateMutation::Put(
80                    self.approve_key.clone(),
81                    serde_json::json!(true),
82                ));
83                ctx.record(StateMutation::Delete(self.reject_key.clone()));
84            }
85            BarrierDecision::Reject { reason } => {
86                tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
87                ctx.record(StateMutation::Put(
88                    self.reject_key.clone(),
89                    serde_json::json!(reason),
90                ));
91                ctx.record(StateMutation::Delete(self.approve_key.clone()));
92            }
93            BarrierDecision::Modify { key, value } => {
94                tracing::info!(barrier = %self.name, key = %key, "state modified");
95                ctx.record(StateMutation::Put(key, value));
96            }
97            BarrierDecision::Reroute { target } => {
98                tracing::info!(barrier = %self.name, target = %target, "rerouted");
99                ctx.goto(&target);
100            }
101        }
102    }
103}
104
105/// BarrierNode 实现 LeafNode(推荐路径 — 只读 state + pause)。
106#[async_trait]
107impl<S: WorkflowState> LeafNode<S> for BarrierNode<S> {
108    async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError> {
109        let barrier_id = BarrierId::new(&self.name, 0);
110        ctx.pause(barrier_id, self.timeout);
111        ctx.set_has_side_effects();
112        Ok(())
113    }
114}
115
116#[async_trait]
117impl<S: WorkflowState> FlowNode<S> for BarrierNode<S> {
118    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
119        let barrier_id = BarrierId::new(&self.name, 0);
120        ctx.pause(barrier_id, self.timeout);
121        ctx.set_has_side_effects();
122        Ok(())
123    }
124}