rust_langgraph/types.rs
1//! Core types for LangGraph execution.
2//!
3//! This module defines the fundamental types used throughout LangGraph,
4//! including streaming modes, commands, and control flow primitives.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// A command to send work to a specific node with custom input.
10///
11/// `Send` enables dynamic routing where the graph can decide at runtime
12/// to invoke specific nodes with specific inputs, enabling map-reduce patterns.
13///
14/// # Example
15///
16/// ```rust
17/// use rust_langgraph::types::Send;
18/// use serde_json::json;
19///
20/// let send = Send::new("process_item", json!({"item_id": 42}));
21/// ```
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct Send {
24 /// The name of the node to send to
25 pub node: String,
26 /// The input to send to the node
27 pub arg: serde_json::Value,
28}
29
30impl Send {
31 /// Create a new Send command
32 pub fn new(node: impl Into<String>, arg: serde_json::Value) -> Self {
33 Self {
34 node: node.into(),
35 arg,
36 }
37 }
38}
39
40/// A command that can be returned from a node to control execution flow.
41///
42/// Commands provide a way for nodes to influence graph execution beyond
43/// just updating state. They can update state, change routing, or control
44/// interrupts.
45///
46/// # Example
47///
48/// ```rust
49/// use rust_langgraph::types::Command;
50/// use serde_json::json;
51///
52/// // Update state and go to specific nodes
53/// let cmd = Command::new()
54/// .with_update("key", json!("value"))
55/// .with_goto(vec!["node1", "node2"]);
56/// ```
57#[derive(Debug, Clone, Default, Serialize, Deserialize)]
58pub struct Command {
59 /// State updates to apply
60 pub update: Option<HashMap<String, serde_json::Value>>,
61 /// Specific nodes to route to (overrides normal routing)
62 pub goto: Option<Vec<String>>,
63 /// Value to resume with after interrupt
64 pub resume: Option<serde_json::Value>,
65}
66
67impl Command {
68 /// Create a new empty command
69 pub fn new() -> Self {
70 Self::default()
71 }
72
73 /// Add a state update
74 pub fn with_update(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
75 self.update
76 .get_or_insert_with(HashMap::new)
77 .insert(key.into(), value);
78 self
79 }
80
81 /// Set the goto nodes
82 pub fn with_goto(mut self, nodes: Vec<impl Into<String>>) -> Self {
83 self.goto = Some(nodes.into_iter().map(|n| n.into()).collect());
84 self
85 }
86
87 /// Set the resume value
88 pub fn with_resume(mut self, value: serde_json::Value) -> Self {
89 self.resume = Some(value);
90 self
91 }
92}
93
94/// Streaming mode for graph execution.
95///
96/// Different streaming modes provide different levels of granularity
97/// for observing graph execution.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(rename_all = "lowercase")]
100pub enum StreamMode {
101 /// Stream the full state after each step
102 Values,
103 /// Stream only the updates (deltas) to state
104 Updates,
105 /// Stream checkpoint information after each step
106 Checkpoints,
107 /// Stream task execution details
108 Tasks,
109 /// Stream detailed debug information
110 Debug,
111 /// Stream messages (useful for chat applications)
112 Messages,
113 /// Custom streaming mode
114 Custom,
115}
116
117impl Default for StreamMode {
118 fn default() -> Self {
119 StreamMode::Values
120 }
121}
122
123/// An event emitted during graph streaming execution.
124///
125/// Stream events provide visibility into graph execution, allowing
126/// applications to react to intermediate states, debug issues, or
127/// provide real-time feedback.
128#[derive(Debug, Clone, Serialize, Deserialize)]
129#[serde(tag = "event", content = "data")]
130pub enum StreamEvent {
131 /// The full state values at a point in execution
132 Values {
133 /// Namespace (for subgraphs)
134 ns: Vec<String>,
135 /// The state data
136 data: serde_json::Value,
137 /// Any interrupts that occurred
138 interrupts: Vec<Interrupt>,
139 },
140
141 /// State updates (deltas) from a step
142 Updates {
143 /// Namespace
144 ns: Vec<String>,
145 /// The update data
146 data: serde_json::Value,
147 /// Node that produced the update
148 node: String,
149 },
150
151 /// Checkpoint saved
152 Checkpoint {
153 /// Namespace
154 ns: Vec<String>,
155 /// Checkpoint ID
156 checkpoint_id: String,
157 /// Step number
158 step: usize,
159 },
160
161 /// Task execution started
162 TaskStart {
163 /// Task ID
164 task_id: String,
165 /// Node name
166 node: String,
167 },
168
169 /// Task execution completed
170 TaskEnd {
171 /// Task ID
172 task_id: String,
173 /// Node name
174 node: String,
175 /// Result data
176 result: serde_json::Value,
177 },
178
179 /// Debug information
180 Debug {
181 /// Debug message
182 message: String,
183 /// Additional context
184 context: HashMap<String, serde_json::Value>,
185 },
186
187 /// A message (for chat applications)
188 Message {
189 /// The message content
190 content: String,
191 /// Message metadata
192 metadata: HashMap<String, serde_json::Value>,
193 },
194
195 /// Custom event
196 Custom {
197 /// Event type
198 event_type: String,
199 /// Event data
200 data: serde_json::Value,
201 },
202}
203
204/// An interrupt that occurred during execution.
205///
206/// Interrupts pause graph execution and can be used for human-in-the-loop
207/// patterns or to handle errors gracefully.
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct Interrupt {
210 /// The value that triggered the interrupt
211 pub value: serde_json::Value,
212 /// When the interrupt occurred
213 pub when: InterruptType,
214 /// The node where the interrupt occurred
215 pub node: Option<String>,
216}
217
218/// Type of interrupt
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220#[serde(rename_all = "lowercase")]
221pub enum InterruptType {
222 /// Interrupt before node execution
223 Before,
224 /// Interrupt during node execution
225 During,
226 /// Interrupt after node execution
227 After,
228}
229
230/// Retry policy for node execution.
231///
232/// Defines how and when to retry failed node executions.
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct RetryPolicy {
235 /// Maximum number of retry attempts
236 pub max_attempts: usize,
237 /// Initial delay between retries (in milliseconds)
238 pub initial_delay_ms: u64,
239 /// Maximum delay between retries (in milliseconds)
240 pub max_delay_ms: u64,
241 /// Multiplier for exponential backoff
242 pub backoff_multiplier: f64,
243}
244
245impl Default for RetryPolicy {
246 fn default() -> Self {
247 Self {
248 max_attempts: 3,
249 initial_delay_ms: 100,
250 max_delay_ms: 10_000,
251 backoff_multiplier: 2.0,
252 }
253 }
254}
255
256impl RetryPolicy {
257 /// Create a new retry policy with the given max attempts
258 pub fn new(max_attempts: usize) -> Self {
259 Self {
260 max_attempts,
261 ..Default::default()
262 }
263 }
264
265 /// Calculate delay for a given attempt (0-indexed)
266 pub fn delay_for_attempt(&self, attempt: usize) -> u64 {
267 let delay = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
268 delay.min(self.max_delay_ms as f64) as u64
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 #[test]
277 fn test_send_creation() {
278 let send = Send::new("test_node", serde_json::json!({"key": "value"}));
279 assert_eq!(send.node, "test_node");
280 assert!(send.arg.is_object());
281 }
282
283 #[test]
284 fn test_command_builder() {
285 let cmd = Command::new()
286 .with_update("key1", serde_json::json!("val1"))
287 .with_update("key2", serde_json::json!(42))
288 .with_goto(vec!["node1", "node2"]);
289
290 assert_eq!(cmd.update.as_ref().unwrap().len(), 2);
291 assert_eq!(cmd.goto.as_ref().unwrap().len(), 2);
292 }
293
294 #[test]
295 fn test_retry_policy_delay() {
296 let policy = RetryPolicy::default();
297
298 assert_eq!(policy.delay_for_attempt(0), 100);
299 assert_eq!(policy.delay_for_attempt(1), 200);
300 assert_eq!(policy.delay_for_attempt(2), 400);
301
302 // Should cap at max_delay_ms
303 assert_eq!(policy.delay_for_attempt(10), 10_000);
304 }
305
306 #[test]
307 fn test_stream_mode_serialization() {
308 let mode = StreamMode::Values;
309 let json = serde_json::to_string(&mode).unwrap();
310 assert_eq!(json, "\"values\"");
311
312 let deserialized: StreamMode = serde_json::from_str(&json).unwrap();
313 assert_eq!(deserialized, StreamMode::Values);
314 }
315}