lellm_graph/
barrier_node.rs1use async_trait::async_trait;
6
7use crate::error::GraphError;
8use crate::event::{BarrierDecision, BarrierId};
9use crate::node::FlowNode;
10use crate::node_context::NodeContext;
11use crate::state::{State, StateEffect};
12use crate::workflow_state::WorkflowState;
13
14#[derive(Debug, Clone, Default)]
16pub enum BarrierDefaultAction {
17 #[default]
18 Reject,
19 Approve,
20 Skip,
21}
22
23#[derive(Debug, Clone)]
25pub struct BarrierNode<S: WorkflowState = State> {
26 pub name: String,
27 pub timeout: Option<std::time::Duration>,
28 pub default_action: BarrierDefaultAction,
29 pub reject_key: String,
30 pub approve_key: String,
31 _phantom: std::marker::PhantomData<S>,
33}
34
35impl<S: WorkflowState<Effect = StateEffect>> BarrierNode<S> {
36 pub fn new(name: impl Into<String>) -> Self {
37 let name = name.into();
38 Self {
39 name: name.clone(),
40 timeout: None,
41 default_action: BarrierDefaultAction::default(),
42 reject_key: format!("{name}.reject_reason"),
43 approve_key: format!("{name}.approved"),
44 _phantom: std::marker::PhantomData,
45 }
46 }
47
48 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
49 self.timeout = Some(timeout);
50 self
51 }
52
53 pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
54 self.default_action = action;
55 self
56 }
57
58 pub fn reject_key(mut self, key: impl Into<String>) -> Self {
59 self.reject_key = key.into();
60 self
61 }
62
63 pub fn approve_key(mut self, key: impl Into<String>) -> Self {
64 self.approve_key = key.into();
65 self
66 }
67
68 pub fn apply_decision_to_ctx(&self, ctx: &mut NodeContext<'_, S>, decision: BarrierDecision) {
69 match decision {
70 BarrierDecision::Approve => {
71 tracing::info!(barrier = %self.name, "approved");
72 ctx.emit_effect(StateEffect::Put(
73 self.approve_key.clone(),
74 serde_json::json!(true),
75 ));
76 ctx.emit_effect(StateEffect::Delete(self.reject_key.clone()));
77 }
78 BarrierDecision::Reject { reason } => {
79 tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
80 ctx.emit_effect(StateEffect::Put(
81 self.reject_key.clone(),
82 serde_json::json!(reason),
83 ));
84 ctx.emit_effect(StateEffect::Delete(self.approve_key.clone()));
85 }
86 BarrierDecision::Modify { key, value } => {
87 tracing::info!(barrier = %self.name, key = %key, "state modified");
88 ctx.emit_effect(StateEffect::Put(key, value));
89 }
90 BarrierDecision::Reroute { target } => {
91 tracing::info!(barrier = %self.name, target = %target, "rerouted");
92 ctx.goto(&target);
93 }
94 }
95 }
96}
97
98#[async_trait]
99impl<S: WorkflowState> FlowNode<S> for BarrierNode<S> {
100 async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
101 let barrier_id = BarrierId::new(&self.name, 0);
102 ctx.pause(barrier_id, self.timeout);
103 ctx.set_has_side_effects();
104 Ok(())
105 }
106}