lellm_graph/
barrier_node.rs1use async_trait::async_trait;
13
14use crate::error::GraphError;
15use crate::event::{BarrierDecision, BarrierId};
16use crate::node::{FlowNode, LeafNode};
17use crate::node_context::{LeafContext, NodeContext};
18use crate::state::{State, StateMutation};
19use crate::workflow_state::WorkflowState;
20
21#[derive(Debug, Clone, Default)]
23pub enum BarrierDefaultAction {
24 #[default]
25 Reject,
26 Approve,
27 Skip,
28}
29
30#[derive(Debug, Clone)]
32pub struct BarrierNode<S: WorkflowState = State> {
33 pub name: String,
34 pub timeout: Option<std::time::Duration>,
35 pub default_action: BarrierDefaultAction,
36 pub reject_key: String,
37 pub approve_key: String,
38 _phantom: std::marker::PhantomData<S>,
40}
41
42impl<S: WorkflowState<Mutation = StateMutation>> BarrierNode<S> {
43 pub fn new(name: impl Into<String>) -> Self {
44 let name = name.into();
45 Self {
46 name: name.clone(),
47 timeout: None,
48 default_action: BarrierDefaultAction::default(),
49 reject_key: format!("{name}.reject_reason"),
50 approve_key: format!("{name}.approved"),
51 _phantom: std::marker::PhantomData,
52 }
53 }
54
55 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
56 self.timeout = Some(timeout);
57 self
58 }
59
60 pub fn default_action(mut self, action: BarrierDefaultAction) -> Self {
61 self.default_action = action;
62 self
63 }
64
65 pub fn reject_key(mut self, key: impl Into<String>) -> Self {
66 self.reject_key = key.into();
67 self
68 }
69
70 pub fn approve_key(mut self, key: impl Into<String>) -> Self {
71 self.approve_key = key.into();
72 self
73 }
74
75 pub fn apply_decision_to_ctx(&self, ctx: &mut NodeContext<'_, S>, decision: BarrierDecision) {
76 match decision {
77 BarrierDecision::Approve => {
78 tracing::info!(barrier = %self.name, "approved");
79 ctx.record(StateMutation::Put(
80 self.approve_key.clone(),
81 serde_json::json!(true),
82 ));
83 ctx.record(StateMutation::Delete(self.reject_key.clone()));
84 }
85 BarrierDecision::Reject { reason } => {
86 tracing::warn!(barrier = %self.name, reason = %reason, "rejected");
87 ctx.record(StateMutation::Put(
88 self.reject_key.clone(),
89 serde_json::json!(reason),
90 ));
91 ctx.record(StateMutation::Delete(self.approve_key.clone()));
92 }
93 BarrierDecision::Modify { key, value } => {
94 tracing::info!(barrier = %self.name, key = %key, "state modified");
95 ctx.record(StateMutation::Put(key, value));
96 }
97 BarrierDecision::Reroute { target } => {
98 tracing::info!(barrier = %self.name, target = %target, "rerouted");
99 ctx.goto(&target);
100 }
101 }
102 }
103}
104
105#[async_trait]
107impl<S: WorkflowState> LeafNode<S> for BarrierNode<S> {
108 async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError> {
109 let barrier_id = BarrierId::new(&self.name, 0);
110 ctx.pause(barrier_id, self.timeout);
111 ctx.set_has_side_effects();
112 Ok(())
113 }
114}
115
116#[async_trait]
117impl<S: WorkflowState> FlowNode<S> for BarrierNode<S> {
118 async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
119 let barrier_id = BarrierId::new(&self.name, 0);
120 ctx.pause(barrier_id, self.timeout);
121 ctx.set_has_side_effects();
122 Ok(())
123 }
124}