Skip to main content

lellm_graph/
barrier_node.rs

1//! Human-in-the-loop 审批节点。
2//!
3//! v0.4+: 泛型化 `BarrierNode<S: WorkflowState>`。
4
5use async_trait::async_trait;
6
7use crate::error::GraphError;
8use crate::event::{BarrierDecision, BarrierId};
9use crate::node::FlowNode;
10use crate::node_context::NodeContext;
11use crate::state::{State, StateEffect};
12use crate::workflow_state::WorkflowState;
13
14/// Barrier 超时后的默认行为。
15#[derive(Debug, Clone, Default)]
16pub enum BarrierDefaultAction {
17    #[default]
18    Reject,
19    Approve,
20    Skip,
21}
22
23/// Human-in-the-loop 审批节点。
24#[derive(Debug, Clone)]
25pub struct BarrierNode<S: WorkflowState = State> {
26    pub name: String,
27    pub timeout: Option<std::time::Duration>,
28    pub default_action: BarrierDefaultAction,
29    pub reject_key: String,
30    pub approve_key: String,
31    /// Phantom 用于标记泛型类型
32    _phantom: std::marker::PhantomData<S>,
33}
34
35impl<S: WorkflowState<Effect = StateEffect>> BarrierNode<S> {
36    pub fn new(name: impl Into<String>) -> Self {
37        let name = name.into();
38        Self {
39            name: name.clone(),
40            timeout: None,
41            default_action: BarrierDefaultAction::default(),
42            reject_key: format!("{name}.reject_reason"),
43            approve_key: format!("{name}.approved"),
44            _phantom: std::marker::PhantomData,
45        }
46    }
47
48    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
49        self.timeout = Some(timeout);
50        self
51    }
52
53    pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
54        self.default_action = action;
55        self
56    }
57
58    pub fn reject_key(mut self, key: impl Into<String>) -> Self {
59        self.reject_key = key.into();
60        self
61    }
62
63    pub fn approve_key(mut self, key: impl Into<String>) -> Self {
64        self.approve_key = key.into();
65        self
66    }
67
68    pub fn apply_decision_to_ctx(&self, ctx: &mut NodeContext<'_, S>, decision: BarrierDecision) {
69        match decision {
70            BarrierDecision::Approve => {
71                tracing::info!(barrier = %self.name, "approved");
72                ctx.emit_effect(StateEffect::Put(
73                    self.approve_key.clone(),
74                    serde_json::json!(true),
75                ));
76                ctx.emit_effect(StateEffect::Delete(self.reject_key.clone()));
77            }
78            BarrierDecision::Reject { reason } => {
79                tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
80                ctx.emit_effect(StateEffect::Put(
81                    self.reject_key.clone(),
82                    serde_json::json!(reason),
83                ));
84                ctx.emit_effect(StateEffect::Delete(self.approve_key.clone()));
85            }
86            BarrierDecision::Modify { key, value } => {
87                tracing::info!(barrier = %self.name, key = %key, "state modified");
88                ctx.emit_effect(StateEffect::Put(key, value));
89            }
90            BarrierDecision::Reroute { target } => {
91                tracing::info!(barrier = %self.name, target = %target, "rerouted");
92                ctx.goto(&target);
93            }
94        }
95    }
96}
97
98#[async_trait]
99impl<S: WorkflowState> FlowNode<S> for BarrierNode<S> {
100    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
101        let barrier_id = BarrierId::new(&self.name, 0);
102        ctx.pause(barrier_id, self.timeout);
103        ctx.set_has_side_effects();
104        Ok(())
105    }
106}