Skip to main content

mofa_kernel/workflow/
command.rs

1//! Command Pattern for Workflow Control
2//!
3//! Provides a unified way to update state and control workflow execution flow
4//! from within node functions. Inspired by LangGraph's Command pattern.
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9use super::StateUpdate;
10
11/// Control flow directive for workflow execution
12///
13/// Determines what happens after a node completes execution.
14#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
15pub enum ControlFlow {
16    /// Jump to a specific node by ID
17    Goto(String),
18
19    /// Continue to the next node(s) based on graph edges
20    Continue,
21
22    /// End workflow execution and return current state
23    Return,
24
25    /// Dynamically create parallel execution branches (MapReduce pattern)
26    Send(Vec<SendCommand>),
27}
28
29impl Default for ControlFlow {
30    fn default() -> Self {
31        Self::Continue
32    }
33}
34
35/// Command returned by node functions
36///
37/// A Command encapsulates both state updates and control flow decisions.
38/// This allows nodes to update state AND determine where to go next in a
39/// single return value.
40///
41/// # Example
42///
43/// ```rust,ignore
44/// // Update state and continue to next node
45/// let cmd = Command::new()
46///     .update("result", json!("processed"))
47///     .continue_();
48///
49/// // Update state and jump to specific node
50/// let cmd = Command::new()
51///     .update("classification", json!("type_a"))
52/// .goto("handle_a");
53///
54/// // End execution with final state
55/// let cmd = Command::new()
56///     .update("final_result", json!("done"))
57///     .return_();
58///
59/// // Create parallel branches for MapReduce
60/// let cmd = Command::send(vec![
61///     SendCommand::new("process", json!({"item": 1})),
62///     SendCommand::new("process", json!({"item": 2})),
63/// ]);
64/// ```
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct Command {
67    /// State updates to apply
68    pub updates: Vec<StateUpdate>,
69    /// Control flow directive
70    pub control: ControlFlow,
71}
72
73impl Command {
74    /// Create a new empty command
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Add a state update
80    pub fn update(mut self, key: impl Into<String>, value: Value) -> Self {
81        self.updates.push(StateUpdate::new(key, value));
82        self
83    }
84
85    /// Add multiple state updates
86    pub fn updates(mut self, updates: Vec<StateUpdate>) -> Self {
87        self.updates.extend(updates);
88        self
89    }
90
91    /// Set control flow to continue to next node
92    pub fn continue_(mut self) -> Self {
93        self.control = ControlFlow::Continue;
94        self
95    }
96
97    /// Set control flow to jump to a specific node
98    pub fn goto(mut self, node: impl Into<String>) -> Self {
99        self.control = ControlFlow::Goto(node.into());
100        self
101    }
102
103    /// Set control flow to end execution
104    pub fn return_(mut self) -> Self {
105        self.control = ControlFlow::Return;
106        self
107    }
108
109    /// Set control flow to create parallel branches (MapReduce)
110    pub fn send(targets: Vec<SendCommand>) -> Self {
111        Self {
112            updates: Vec::new(),
113            control: ControlFlow::Send(targets),
114        }
115    }
116
117    /// Create a command that just updates state (continues by default)
118    pub fn just_update(key: impl Into<String>, value: Value) -> Self {
119        Self::new().update(key, value)
120    }
121
122    /// Create a command that just controls flow (no state update)
123    pub fn just_goto(node: impl Into<String>) -> Self {
124        Self::new().goto(node)
125    }
126
127    /// Create a command that ends execution
128    pub fn just_return() -> Self {
129        Self::new().return_()
130    }
131
132    /// Check if this command ends execution
133    pub fn is_return(&self) -> bool {
134        matches!(self.control, ControlFlow::Return)
135    }
136
137    /// Check if this command creates parallel branches
138    pub fn is_send(&self) -> bool {
139        matches!(self.control, ControlFlow::Send(_))
140    }
141
142    /// Get the target node if this is a goto command
143    pub fn goto_target(&self) -> Option<&str> {
144        match &self.control {
145            ControlFlow::Goto(target) => Some(target),
146            _ => None,
147        }
148    }
149}
150
151/// Send command for MapReduce pattern
152///
153/// Represents a dynamic edge creation - sending execution to a target node
154/// with specific input state.
155#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
156pub struct SendCommand {
157    /// Target node ID
158    pub target: String,
159    /// Input state for this branch
160    pub input: Value,
161    /// Optional branch identifier
162    pub branch_id: Option<String>,
163}
164
165impl SendCommand {
166    /// Create a new send command
167    pub fn new(target: impl Into<String>, input: Value) -> Self {
168        Self {
169            target: target.into(),
170            input,
171            branch_id: None,
172        }
173    }
174
175    /// Create a send command with a branch ID
176    pub fn with_branch(target: impl Into<String>, input: Value, branch_id: impl Into<String>) -> Self {
177        Self {
178            target: target.into(),
179            input,
180            branch_id: Some(branch_id.into()),
181        }
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use serde_json::json;
189
190    #[test]
191    fn test_command_builder() {
192        let cmd = Command::new()
193            .update("key1", json!("value1"))
194            .update("key2", json!(42))
195            .goto("next_node");
196
197        assert_eq!(cmd.updates.len(), 2);
198        assert_eq!(cmd.updates[0].key, "key1");
199        assert_eq!(cmd.goto_target(), Some("next_node"));
200    }
201
202    #[test]
203    fn test_command_continue() {
204        let cmd = Command::new()
205            .update("result", json!("done"))
206            .continue_();
207
208        assert_eq!(cmd.control, ControlFlow::Continue);
209        assert!(!cmd.is_return());
210    }
211
212    #[test]
213    fn test_command_return() {
214        let cmd = Command::new()
215            .update("final", json!("result"))
216            .return_();
217
218        assert!(cmd.is_return());
219    }
220
221    #[test]
222    fn test_command_send() {
223        let cmd = Command::send(vec![
224            SendCommand::new("worker", json!({"task": 1})),
225            SendCommand::new("worker", json!({"task": 2})),
226        ]);
227
228        assert!(cmd.is_send());
229        if let ControlFlow::Send(targets) = &cmd.control {
230            assert_eq!(targets.len(), 2);
231        } else {
232            panic!("Expected Send control flow");
233        }
234    }
235
236    #[test]
237    fn test_send_command() {
238        let send = SendCommand::new("process", json!({"data": "test"}));
239        assert_eq!(send.target, "process");
240        assert!(send.branch_id.is_none());
241
242        let send_with_branch = SendCommand::with_branch(
243            "process",
244            json!({"data": "test"}),
245            "branch-1",
246        );
247        assert_eq!(send_with_branch.branch_id, Some("branch-1".to_string()));
248    }
249
250    #[test]
251    fn test_just_helpers() {
252        let cmd = Command::just_update("key", json!("value"));
253        assert_eq!(cmd.updates.len(), 1);
254        assert_eq!(cmd.control, ControlFlow::Continue);
255
256        let cmd = Command::just_goto("target");
257        assert!(cmd.updates.is_empty());
258        assert_eq!(cmd.goto_target(), Some("target"));
259
260        let cmd = Command::just_return();
261        assert!(cmd.is_return());
262    }
263}