Skip to main content

juncture_core/
command.rs

1use crate::interrupt::ResumeValue;
2use crate::state::State;
3
4/// Command: node return value combining state update and routing
5///
6/// Nodes can return `S::Update` (simple case) or `Command<S>` (for routing control).
7#[derive(Clone, Debug)]
8pub struct Command<S: State> {
9    /// State update to apply
10    pub update: Option<S::Update>,
11
12    /// Routing instruction
13    pub goto: Goto,
14
15    /// Target graph (current or parent)
16    pub graph: GraphTarget,
17
18    /// Resume value for HITL interrupt resumption
19    ///
20    /// When provided, this value is used to resume from a previously triggered
21    /// interrupt. Supports single values, ID-based mapping, and namespace-based
22    /// mapping via [`ResumeValue`].
23    pub resume: Option<ResumeValue>,
24
25    /// Custom streaming data to emit during execution
26    ///
27    /// Nodes can attach arbitrary JSON values here that will be emitted as
28    /// [`StreamEvent::Custom`] events during graph execution. Each entry in
29    /// the vector produces one custom stream event tagged with the emitting
30    /// node name. Use [`Command::with_stream_data`] to append items.
31    pub stream_data: Vec<serde_json::Value>,
32}
33
34/// Routing instruction from node
35#[derive(Clone, Debug)]
36pub enum Goto {
37    /// No routing (use external edges)
38    None,
39
40    /// Route to single node
41    Next(String),
42
43    /// Route to multiple nodes (parallel)
44    Multiple(Vec<String>),
45
46    /// Dynamic fan-out to multiple targets
47    Send(Vec<SendTarget>),
48
49    /// Terminate this path
50    End,
51}
52
53/// Send target for dynamic fan-out
54#[derive(Clone, Debug)]
55pub struct SendTarget {
56    /// Target node name
57    pub node: String,
58
59    /// State to use for this task (overrides current state)
60    pub state: serde_json::Value,
61
62    /// Optional per-task timeout override
63    pub timeout: Option<std::time::Duration>,
64}
65
66/// Target graph for routing
67#[derive(Clone, Debug, PartialEq, Eq)]
68pub enum GraphTarget {
69    /// Current graph (default)
70    Current,
71
72    /// Parent graph (for subgraph navigation)
73    Parent,
74}
75
76/// Final value distinguishing return value from saved state
77///
78/// Used in entrypoint functions to separate what's returned to caller
79/// from what's saved to checkpoint.
80#[derive(Debug)]
81pub struct Final<V, S> {
82    /// Value returned to caller
83    pub value: V,
84
85    /// State update to save to checkpoint
86    pub save: S,
87}
88
89/// Routing command from node to parent or child graph
90///
91/// This type is used in subgraph communication to control execution flow
92/// between parent and child graphs.
93#[derive(Clone, Debug)]
94pub enum CommandGoto {
95    /// Route to a single node
96    One(String),
97
98    /// Route to multiple nodes (parallel execution)
99    Many(Vec<String>),
100
101    /// Route to parent graph
102    Parent(String),
103
104    /// Dynamic fan-out to multiple targets with state overrides
105    Send(Vec<SendTarget>),
106}
107
108/// Command wrapper for subgraph-to-parent communication
109///
110/// This wrapper is used as an exception mechanism for subgraph nodes
111/// to send commands to their parent graph, along with metadata about
112/// the source node and namespace.
113///
114/// # Type Parameters
115///
116/// * `S` - The parent graph's state type
117///
118/// # Fields
119///
120/// * `command` - The command to send to the parent graph
121/// * `source_node` - The name of the subgraph node sending this command (for debugging and logging)
122/// * `namespace` - The subgraph namespace for routing purposes
123#[derive(Clone)]
124pub struct ParentCommand<S: State> {
125    /// The command to send to the parent graph
126    pub command: Command<S>,
127
128    /// Source node information (for debugging and logging)
129    pub source_node: String,
130
131    /// Subgraph namespace (for routing)
132    pub namespace: String,
133}
134
135impl<S: State> ParentCommand<S> {
136    /// Create a new `ParentCommand` from a subgraph
137    ///
138    /// # Arguments
139    ///
140    /// * `command` - The command to send to the parent graph
141    /// * `source_node` - The name of the subgraph node sending this command
142    /// * `namespace` - The subgraph namespace for routing
143    #[must_use]
144    pub fn from_subgraph(command: Command<S>, source_node: &str, namespace: &str) -> Self {
145        Self {
146            command,
147            source_node: source_node.to_string(),
148            namespace: namespace.to_string(),
149        }
150    }
151}
152
153impl<S: State> std::fmt::Debug for ParentCommand<S> {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("ParentCommand")
156            .field("command", &"<command>")
157            .field("source_node", &self.source_node)
158            .field("namespace", &self.namespace)
159            .finish()
160    }
161}
162
163impl<S: State> Command<S> {
164    /// Create command with only state update
165    #[must_use]
166    pub const fn update(update: S::Update) -> Self {
167        Self {
168            update: Some(update),
169            goto: Goto::None,
170            graph: GraphTarget::Current,
171            resume: None,
172            stream_data: Vec::new(),
173        }
174    }
175
176    /// Create command with only routing
177    #[must_use]
178    pub fn goto(target: impl Into<String>) -> Self {
179        Self {
180            update: None,
181            goto: Goto::Next(target.into()),
182            graph: GraphTarget::Current,
183            resume: None,
184            stream_data: Vec::new(),
185        }
186    }
187
188    /// Create command with update and routing
189    #[must_use]
190    pub fn update_and_goto(update: S::Update, target: impl Into<String>) -> Self {
191        Self {
192            update: Some(update),
193            goto: Goto::Next(target.into()),
194            graph: GraphTarget::Current,
195            resume: None,
196            stream_data: Vec::new(),
197        }
198    }
199
200    /// Create command with dynamic fan-out
201    #[must_use]
202    pub const fn send(targets: Vec<SendTarget>) -> Self {
203        Self {
204            update: None,
205            goto: Goto::Send(targets),
206            graph: GraphTarget::Current,
207            resume: None,
208            stream_data: Vec::new(),
209        }
210    }
211
212    /// Create command with update and fan-out
213    #[must_use]
214    pub const fn update_and_send(update: S::Update, targets: Vec<SendTarget>) -> Self {
215        Self {
216            update: Some(update),
217            goto: Goto::Send(targets),
218            graph: GraphTarget::Current,
219            resume: None,
220            stream_data: Vec::new(),
221        }
222    }
223
224    /// Create command that terminates current path
225    #[must_use]
226    pub const fn end() -> Self {
227        Self {
228            update: None,
229            goto: Goto::End,
230            graph: GraphTarget::Current,
231            resume: None,
232            stream_data: Vec::new(),
233        }
234    }
235
236    /// Create command that routes to parent graph
237    pub fn goto_parent(target: impl Into<String>) -> Self {
238        Self {
239            update: None,
240            goto: Goto::Next(target.into()),
241            graph: GraphTarget::Parent,
242            resume: None,
243            stream_data: Vec::new(),
244        }
245    }
246
247    /// Attach a resume value to this command
248    #[must_use]
249    pub fn with_resume(mut self, value: ResumeValue) -> Self {
250        self.resume = Some(value);
251        self
252    }
253
254    /// Attach custom streaming data to this command
255    ///
256    /// The given value is appended to the command's streaming data list.
257    /// During graph execution, each entry is emitted as a
258    /// [`StreamEvent::Custom`] event, allowing nodes to push custom JSON
259    /// payloads to the stream consumer alongside state updates and routing.
260    ///
261    /// # Examples
262    ///
263    /// ```ignore
264    /// use juncture_core::Command;
265    /// use serde_json::json;
266    ///
267    /// // In a node returning Command:
268    /// Command::end().with_stream_data(json!({"progress": 0.75}));
269    /// ```
270    #[must_use]
271    pub fn with_stream_data(mut self, data: serde_json::Value) -> Self {
272        self.stream_data.push(data);
273        self
274    }
275}
276
277impl<S: State> Default for Command<S> {
278    fn default() -> Self {
279        Self {
280            update: None,
281            goto: Goto::None,
282            graph: GraphTarget::Current,
283            resume: None,
284            stream_data: Vec::new(),
285        }
286    }
287}
288
289// Rust guideline compliant 2026-05-22
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use crate::state::FieldVersions;
295    use serde_json::json;
296
297    #[derive(Clone, Debug, Default)]
298    struct TestState;
299
300    impl State for TestState {
301        type Update = TestUpdate;
302        type FieldVersions = FieldVersions;
303        fn apply(&mut self, _: Self::Update) -> crate::FieldsChanged {
304            crate::FieldsChanged(0)
305        }
306        fn reset_ephemeral(&mut self) {}
307    }
308
309    #[derive(Clone, Debug, Default, serde::Serialize)]
310    struct TestUpdate;
311
312    #[test]
313    fn command_default_has_empty_stream_data() {
314        let cmd = Command::<TestState>::default();
315        assert!(cmd.stream_data.is_empty());
316    }
317
318    #[test]
319    fn command_update_has_empty_stream_data() {
320        let cmd = Command::<TestState>::update(TestUpdate);
321        assert!(cmd.stream_data.is_empty());
322    }
323
324    #[test]
325    fn command_goto_has_empty_stream_data() {
326        let cmd = Command::<TestState>::goto("target");
327        assert!(cmd.stream_data.is_empty());
328    }
329
330    #[test]
331    fn command_end_has_empty_stream_data() {
332        let cmd = Command::<TestState>::end();
333        assert!(cmd.stream_data.is_empty());
334    }
335
336    #[test]
337    fn command_with_stream_data_appends_single_item() {
338        let cmd = Command::<TestState>::end().with_stream_data(json!({"key": "value"}));
339        assert_eq!(cmd.stream_data.len(), 1);
340        assert_eq!(cmd.stream_data[0], json!({"key": "value"}));
341    }
342
343    #[test]
344    fn command_with_stream_data_appends_multiple_items() {
345        let cmd = Command::<TestState>::end()
346            .with_stream_data(json!({"step": 1}))
347            .with_stream_data(json!({"step": 2}))
348            .with_stream_data(json!({"step": 3}));
349        assert_eq!(cmd.stream_data.len(), 3);
350        assert_eq!(cmd.stream_data[0], json!({"step": 1}));
351        assert_eq!(cmd.stream_data[1], json!({"step": 2}));
352        assert_eq!(cmd.stream_data[2], json!({"step": 3}));
353    }
354
355    #[test]
356    fn command_with_stream_data_preserves_other_fields() {
357        let cmd = Command::<TestState>::update(TestUpdate)
358            .with_stream_data(json!("progress"))
359            .with_resume(ResumeValue::Single(json!("resumed")));
360        assert!(cmd.update.is_some());
361        assert_eq!(cmd.stream_data.len(), 1);
362        assert!(cmd.resume.is_some());
363    }
364
365    #[test]
366    fn command_with_stream_data_works_with_goto() {
367        let cmd = Command::<TestState>::goto("next_node").with_stream_data(json!("data"));
368        assert!(matches!(cmd.goto, Goto::Next(ref target) if target == "next_node"));
369        assert_eq!(cmd.stream_data.len(), 1);
370    }
371
372    #[test]
373    fn command_send_has_empty_stream_data() {
374        let cmd = Command::<TestState>::send(vec![]);
375        assert!(cmd.stream_data.is_empty());
376    }
377
378    #[test]
379    fn command_goto_parent_has_empty_stream_data() {
380        let cmd = Command::<TestState>::goto_parent("parent");
381        assert!(cmd.stream_data.is_empty());
382    }
383}