Skip to main content

rust_langgraph/
types.rs

1//! Core types for LangGraph execution.
2//!
3//! This module defines the fundamental types used throughout LangGraph,
4//! including streaming modes, commands, and control flow primitives.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9/// A command to send work to a specific node with custom input.
10///
11/// `Send` enables dynamic routing where the graph can decide at runtime
12/// to invoke specific nodes with specific inputs, enabling map-reduce patterns.
13///
14/// # Example
15///
16/// ```rust
17/// use rust_langgraph::types::Send;
18/// use serde_json::json;
19///
20/// let send = Send::new("process_item", json!({"item_id": 42}));
21/// ```
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct Send {
24    /// The name of the node to send to
25    pub node: String,
26    /// The input to send to the node
27    pub arg: serde_json::Value,
28}
29
30impl Send {
31    /// Create a new Send command
32    pub fn new(node: impl Into<String>, arg: serde_json::Value) -> Self {
33        Self {
34            node: node.into(),
35            arg,
36        }
37    }
38}
39
40/// A command that can be returned from a node to control execution flow.
41///
42/// Commands provide a way for nodes to influence graph execution beyond
43/// just updating state. They can update state, change routing, or control
44/// interrupts.
45///
46/// # Example
47///
48/// ```rust
49/// use rust_langgraph::types::Command;
50/// use serde_json::json;
51///
52/// // Update state and go to specific nodes
53/// let cmd = Command::new()
54///     .with_update("key", json!("value"))
55///     .with_goto(vec!["node1", "node2"]);
56/// ```
57#[derive(Debug, Clone, Default, Serialize, Deserialize)]
58pub struct Command {
59    /// State updates to apply
60    pub update: Option<HashMap<String, serde_json::Value>>,
61    /// Specific nodes to route to (overrides normal routing)
62    pub goto: Option<Vec<String>>,
63    /// Value to resume with after interrupt
64    pub resume: Option<serde_json::Value>,
65}
66
67impl Command {
68    /// Create a new empty command
69    pub fn new() -> Self {
70        Self::default()
71    }
72
73    /// Add a state update
74    pub fn with_update(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
75        self.update
76            .get_or_insert_with(HashMap::new)
77            .insert(key.into(), value);
78        self
79    }
80
81    /// Set the goto nodes
82    pub fn with_goto(mut self, nodes: Vec<impl Into<String>>) -> Self {
83        self.goto = Some(nodes.into_iter().map(|n| n.into()).collect());
84        self
85    }
86
87    /// Set the resume value
88    pub fn with_resume(mut self, value: serde_json::Value) -> Self {
89        self.resume = Some(value);
90        self
91    }
92}
93
94/// Streaming mode for graph execution.
95///
96/// Different streaming modes provide different levels of granularity
97/// for observing graph execution.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
99#[serde(rename_all = "lowercase")]
100pub enum StreamMode {
101    /// Stream the full state after each step
102    Values,
103    /// Stream only the updates (deltas) to state
104    Updates,
105    /// Stream checkpoint information after each step
106    Checkpoints,
107    /// Stream task execution details
108    Tasks,
109    /// Stream detailed debug information
110    Debug,
111    /// Stream messages (useful for chat applications)
112    Messages,
113    /// Custom streaming mode
114    Custom,
115}
116
117impl Default for StreamMode {
118    fn default() -> Self {
119        StreamMode::Values
120    }
121}
122
123/// An event emitted during graph streaming execution.
124///
125/// Stream events provide visibility into graph execution, allowing
126/// applications to react to intermediate states, debug issues, or
127/// provide real-time feedback.
128#[derive(Debug, Clone, Serialize, Deserialize)]
129#[serde(tag = "event", content = "data")]
130pub enum StreamEvent {
131    /// The full state values at a point in execution
132    Values {
133        /// Namespace (for subgraphs)
134        ns: Vec<String>,
135        /// The state data
136        data: serde_json::Value,
137        /// Any interrupts that occurred
138        interrupts: Vec<Interrupt>,
139    },
140
141    /// State updates (deltas) from a step
142    Updates {
143        /// Namespace
144        ns: Vec<String>,
145        /// The update data
146        data: serde_json::Value,
147        /// Node that produced the update
148        node: String,
149    },
150
151    /// Checkpoint saved
152    Checkpoint {
153        /// Namespace
154        ns: Vec<String>,
155        /// Checkpoint ID
156        checkpoint_id: String,
157        /// Step number
158        step: usize,
159    },
160
161    /// Task execution started
162    TaskStart {
163        /// Task ID
164        task_id: String,
165        /// Node name
166        node: String,
167    },
168
169    /// Task execution completed
170    TaskEnd {
171        /// Task ID
172        task_id: String,
173        /// Node name
174        node: String,
175        /// Result data
176        result: serde_json::Value,
177    },
178
179    /// Debug information
180    Debug {
181        /// Debug message
182        message: String,
183        /// Additional context
184        context: HashMap<String, serde_json::Value>,
185    },
186
187    /// A message (for chat applications)
188    Message {
189        /// The message content
190        content: String,
191        /// Message metadata
192        metadata: HashMap<String, serde_json::Value>,
193    },
194
195    /// Custom event
196    Custom {
197        /// Event type
198        event_type: String,
199        /// Event data
200        data: serde_json::Value,
201    },
202}
203
204/// An interrupt that occurred during execution.
205///
206/// Interrupts pause graph execution and can be used for human-in-the-loop
207/// patterns or to handle errors gracefully.
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct Interrupt {
210    /// The value that triggered the interrupt
211    pub value: serde_json::Value,
212    /// When the interrupt occurred
213    pub when: InterruptType,
214    /// The node where the interrupt occurred
215    pub node: Option<String>,
216}
217
218/// Type of interrupt
219#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
220#[serde(rename_all = "lowercase")]
221pub enum InterruptType {
222    /// Interrupt before node execution
223    Before,
224    /// Interrupt during node execution
225    During,
226    /// Interrupt after node execution
227    After,
228}
229
230/// Retry policy for node execution.
231///
232/// Defines how and when to retry failed node executions.
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct RetryPolicy {
235    /// Maximum number of retry attempts
236    pub max_attempts: usize,
237    /// Initial delay between retries (in milliseconds)
238    pub initial_delay_ms: u64,
239    /// Maximum delay between retries (in milliseconds)
240    pub max_delay_ms: u64,
241    /// Multiplier for exponential backoff
242    pub backoff_multiplier: f64,
243}
244
245impl Default for RetryPolicy {
246    fn default() -> Self {
247        Self {
248            max_attempts: 3,
249            initial_delay_ms: 100,
250            max_delay_ms: 10_000,
251            backoff_multiplier: 2.0,
252        }
253    }
254}
255
256impl RetryPolicy {
257    /// Create a new retry policy with the given max attempts
258    pub fn new(max_attempts: usize) -> Self {
259        Self {
260            max_attempts,
261            ..Default::default()
262        }
263    }
264
265    /// Calculate delay for a given attempt (0-indexed)
266    pub fn delay_for_attempt(&self, attempt: usize) -> u64 {
267        let delay = self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32);
268        delay.min(self.max_delay_ms as f64) as u64
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn test_send_creation() {
278        let send = Send::new("test_node", serde_json::json!({"key": "value"}));
279        assert_eq!(send.node, "test_node");
280        assert!(send.arg.is_object());
281    }
282
283    #[test]
284    fn test_command_builder() {
285        let cmd = Command::new()
286            .with_update("key1", serde_json::json!("val1"))
287            .with_update("key2", serde_json::json!(42))
288            .with_goto(vec!["node1", "node2"]);
289
290        assert_eq!(cmd.update.as_ref().unwrap().len(), 2);
291        assert_eq!(cmd.goto.as_ref().unwrap().len(), 2);
292    }
293
294    #[test]
295    fn test_retry_policy_delay() {
296        let policy = RetryPolicy::default();
297        
298        assert_eq!(policy.delay_for_attempt(0), 100);
299        assert_eq!(policy.delay_for_attempt(1), 200);
300        assert_eq!(policy.delay_for_attempt(2), 400);
301        
302        // Should cap at max_delay_ms
303        assert_eq!(policy.delay_for_attempt(10), 10_000);
304    }
305
306    #[test]
307    fn test_stream_mode_serialization() {
308        let mode = StreamMode::Values;
309        let json = serde_json::to_string(&mode).unwrap();
310        assert_eq!(json, "\"values\"");
311        
312        let deserialized: StreamMode = serde_json::from_str(&json).unwrap();
313        assert_eq!(deserialized, StreamMode::Values);
314    }
315}