Skip to main content

synaptic_graph/
command.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4use crate::send::Send;
5use crate::State;
6
7/// A command returned from a node to control graph flow.
8///
9/// Commands allow nodes to override normal edge-based routing,
10/// update state, fan out to multiple nodes, or signal interrupts.
11///
12/// # Example
13///
14/// ```ignore
15/// use synaptic_graph::{Command, NodeOutput, MessageState};
16///
17/// async fn my_node(state: MessageState) -> Result<NodeOutput<MessageState>, SynapticError> {
18///     Ok(NodeOutput::Command(Command::goto("summary")))
19/// }
20/// ```
21pub struct Command<S: State> {
22    /// State update to merge before routing.
23    pub(crate) update: Option<S>,
24    /// Routing override.
25    pub(crate) goto: Option<CommandGoto>,
26    /// Interrupt: pause graph and return this value to the caller.
27    pub(crate) interrupt_value: Option<Value>,
28    /// Resume value passed from the caller to continue from interrupt.
29    pub(crate) resume_value: Option<Value>,
30}
31
32impl<S: State> std::fmt::Debug for Command<S> {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("Command")
35            .field("has_update", &self.update.is_some())
36            .field("goto", &self.goto)
37            .field("interrupt_value", &self.interrupt_value)
38            .field("resume_value", &self.resume_value)
39            .finish()
40    }
41}
42
43impl<S: State> Clone for Command<S> {
44    fn clone(&self) -> Self {
45        Self {
46            update: self.update.clone(),
47            goto: self.goto.clone(),
48            interrupt_value: self.interrupt_value.clone(),
49            resume_value: self.resume_value.clone(),
50        }
51    }
52}
53
54impl<S: State> Command<S> {
55    /// Create a command that routes to a specific node.
56    pub fn goto(node: impl Into<String>) -> Self {
57        Self {
58            update: None,
59            goto: Some(CommandGoto::One(node.into())),
60            interrupt_value: None,
61            resume_value: None,
62        }
63    }
64
65    /// Create a command that routes to a specific node with a state update.
66    pub fn goto_with_update(node: impl Into<String>, update: S) -> Self {
67        Self {
68            update: Some(update),
69            goto: Some(CommandGoto::One(node.into())),
70            interrupt_value: None,
71            resume_value: None,
72        }
73    }
74
75    /// Create a command that fans out to multiple nodes (map-reduce).
76    pub fn send(targets: Vec<Send>) -> Self {
77        Self {
78            update: None,
79            goto: Some(CommandGoto::Many(targets)),
80            interrupt_value: None,
81            resume_value: None,
82        }
83    }
84
85    /// Create a command that only updates state (no routing override).
86    pub fn update(state: S) -> Self {
87        Self {
88            update: Some(state),
89            goto: None,
90            interrupt_value: None,
91            resume_value: None,
92        }
93    }
94
95    /// Create a resume command for continuing from an interrupt.
96    ///
97    /// Pass this as input to `graph.invoke()` to resume a previously
98    /// interrupted graph execution.
99    pub fn resume(value: Value) -> Self {
100        Self {
101            update: None,
102            goto: None,
103            interrupt_value: None,
104            resume_value: Some(value),
105        }
106    }
107
108    /// Create a command that ends the graph immediately.
109    pub fn end() -> Self {
110        Self {
111            update: None,
112            goto: Some(CommandGoto::One(crate::END.to_string())),
113            interrupt_value: None,
114            resume_value: None,
115        }
116    }
117}
118
119/// Routing target for a Command.
120#[derive(Debug, Clone)]
121pub enum CommandGoto {
122    /// Route to a single node.
123    One(String),
124    /// Fan-out to multiple nodes (map-reduce).
125    Many(Vec<Send>),
126}
127
128/// What a node can return from its `process()` method.
129///
130/// Nodes can either return a simple state update (existing behavior)
131/// or a `Command` for dynamic control flow.
132#[derive(Clone)]
133pub enum NodeOutput<S: State> {
134    /// Regular state update (existing behavior).
135    State(S),
136    /// A command controlling flow + state.
137    Command(Command<S>),
138}
139
140impl<S: State + std::fmt::Debug> std::fmt::Debug for NodeOutput<S> {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        match self {
143            NodeOutput::State(s) => f.debug_tuple("NodeOutput::State").field(s).finish(),
144            NodeOutput::Command(c) => f.debug_tuple("NodeOutput::Command").field(c).finish(),
145        }
146    }
147}
148
149/// Blanket conversion: returning `S` from a node is shorthand for `NodeOutput::State(S)`.
150impl<S: State> From<S> for NodeOutput<S> {
151    fn from(state: S) -> Self {
152        NodeOutput::State(state)
153    }
154}
155
156/// The result of a graph invocation.
157#[derive(Debug, Clone)]
158pub enum GraphResult<S> {
159    /// Graph completed successfully.
160    Complete(S),
161    /// Graph was interrupted and is waiting for input.
162    Interrupted {
163        state: S,
164        /// The value passed to `interrupt()` — can be inspected by the caller.
165        interrupt_value: Value,
166    },
167}
168
169impl<S> GraphResult<S> {
170    /// Get the state, regardless of whether the graph completed or was interrupted.
171    pub fn state(&self) -> &S {
172        match self {
173            GraphResult::Complete(s) => s,
174            GraphResult::Interrupted { state, .. } => state,
175        }
176    }
177
178    /// Consume and return the state.
179    pub fn into_state(self) -> S {
180        match self {
181            GraphResult::Complete(s) => s,
182            GraphResult::Interrupted { state, .. } => state,
183        }
184    }
185
186    /// Returns true if the graph completed normally.
187    pub fn is_complete(&self) -> bool {
188        matches!(self, GraphResult::Complete(_))
189    }
190
191    /// Returns true if the graph was interrupted.
192    pub fn is_interrupted(&self) -> bool {
193        matches!(self, GraphResult::Interrupted { .. })
194    }
195
196    /// Returns the interrupt value if the graph was interrupted.
197    pub fn interrupt_value(&self) -> Option<&Value> {
198        match self {
199            GraphResult::Interrupted {
200                interrupt_value, ..
201            } => Some(interrupt_value),
202            _ => None,
203        }
204    }
205}
206
207/// Interrupt graph execution and request human input.
208///
209/// This struct is stored in checkpoints for interrupted graphs.
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct Interrupt {
212    pub value: Value,
213}
214
215/// Create an interrupt command that pauses graph execution.
216///
217/// The interrupt value is saved with the checkpoint and returned
218/// to the caller as `GraphResult::Interrupted`. To resume, pass
219/// `Command::resume(value)` as input to `graph.invoke()`.
220///
221/// # Example
222///
223/// ```ignore
224/// use synaptic_graph::{interrupt, NodeOutput, MessageState};
225///
226/// async fn approval_node(state: MessageState) -> Result<NodeOutput<MessageState>, SynapticError> {
227///     Ok(interrupt(serde_json::json!({"question": "Approve this action?"})))
228/// }
229/// ```
230pub fn interrupt<S: State>(value: Value) -> NodeOutput<S> {
231    NodeOutput::Command(Command {
232        update: None,
233        goto: None,
234        interrupt_value: Some(value),
235        resume_value: None,
236    })
237}