Skip to main content

rust_langgraph/channels/
mod.rs

1//! Channels for state communication in graphs.
2//!
3//! Channels are the core mechanism for state management in LangGraph.
4//! Unlike typical graph systems where nodes pass data directly, in LangGraph
5//! nodes write to channels and read from channels. This enables powerful
6//! patterns like automatic state reduction and checkpoint/resume.
7//!
8//! # Channel Types
9//!
10//! - **LastValue**: Stores only the last written value
11//! - **Topic**: Accumulates all written values as a sequence
12//! - **BinaryOperatorAggregate**: Reduces multiple writes with a custom function
13//! - **EphemeralValue**: Cleared after each superstep
14
15use crate::errors::Result;
16use serde::{Deserialize, Serialize};
17use std::fmt::Debug;
18
19pub mod last_value;
20pub mod topic;
21pub mod binop;
22pub mod ephemeral;
23
24pub use last_value::LastValue;
25pub use topic::Topic;
26pub use binop::BinaryOperatorAggregate;
27pub use ephemeral::EphemeralValue;
28
29/// The base trait for all channels.
30///
31/// Channels manage how state flows through the graph. Each channel
32/// has its own semantics for how it handles multiple writes in a
33/// single superstep.
34pub trait BaseChannel: Send + Sync + Debug {
35    /// Get the current value as JSON
36    fn get(&self) -> Result<Option<serde_json::Value>>;
37
38    /// Update the channel with new values
39    ///
40    /// If multiple values are provided, the channel applies its
41    /// reduction logic (e.g., last-write-wins, sum, append).
42    fn update(&mut self, values: Vec<serde_json::Value>) -> Result<()>;
43
44    /// Serialize the channel state for checkpointing
45    fn checkpoint(&self) -> Result<serde_json::Value>;
46
47    /// Restore the channel state from a checkpoint
48    fn from_checkpoint(data: serde_json::Value) -> Result<Box<dyn BaseChannel>>
49    where
50        Self: Sized;
51
52    /// Get the channel's type name for debugging
53    fn type_name(&self) -> &'static str;
54
55    /// Check if the channel is empty
56    fn is_empty(&self) -> bool {
57        self.get().ok().flatten().is_none()
58    }
59}
60
61/// A wrapper for type-erased channels
62pub type ChannelBox = Box<dyn BaseChannel>;
63
64/// Helper to create a LastValue channel
65pub fn last_value<T>() -> LastValue<T>
66where
67    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
68{
69    LastValue::new()
70}
71
72/// Helper to create a Topic channel
73pub fn topic<T>() -> Topic<T>
74where
75    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
76{
77    Topic::new()
78}
79
80/// Helper to create a BinaryOperatorAggregate channel
81pub fn binop<T, F>(initial: T, reducer: F) -> BinaryOperatorAggregate<T, F>
82where
83    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
84    F: Fn(T, T) -> T + Send + Sync + 'static,
85{
86    BinaryOperatorAggregate::new(initial, reducer)
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92
93    #[test]
94    fn test_channel_helpers() {
95        let mut lv = last_value::<i32>();
96        lv.update(vec![serde_json::json!(42)]).unwrap();
97        assert_eq!(lv.get().unwrap(), Some(serde_json::json!(42)));
98
99        let mut topic = topic::<String>();
100        topic.update(vec![serde_json::json!("hello")]).unwrap();
101        topic.update(vec![serde_json::json!("world")]).unwrap();
102        let values: Vec<String> = serde_json::from_value(topic.get().unwrap().unwrap()).unwrap();
103        assert_eq!(values.len(), 2);
104    }
105}