rust_langgraph/channels/
mod.rs1use crate::errors::Result;
16use serde::{Deserialize, Serialize};
17use std::fmt::Debug;
18
19pub mod last_value;
20pub mod topic;
21pub mod binop;
22pub mod ephemeral;
23
24pub use last_value::LastValue;
25pub use topic::Topic;
26pub use binop::BinaryOperatorAggregate;
27pub use ephemeral::EphemeralValue;
28
29pub trait BaseChannel: Send + Sync + Debug {
35 fn get(&self) -> Result<Option<serde_json::Value>>;
37
38 fn update(&mut self, values: Vec<serde_json::Value>) -> Result<()>;
43
44 fn checkpoint(&self) -> Result<serde_json::Value>;
46
47 fn from_checkpoint(data: serde_json::Value) -> Result<Box<dyn BaseChannel>>
49 where
50 Self: Sized;
51
52 fn type_name(&self) -> &'static str;
54
55 fn is_empty(&self) -> bool {
57 self.get().ok().flatten().is_none()
58 }
59}
60
61pub type ChannelBox = Box<dyn BaseChannel>;
63
64pub fn last_value<T>() -> LastValue<T>
66where
67 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
68{
69 LastValue::new()
70}
71
72pub fn topic<T>() -> Topic<T>
74where
75 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
76{
77 Topic::new()
78}
79
80pub fn binop<T, F>(initial: T, reducer: F) -> BinaryOperatorAggregate<T, F>
82where
83 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Debug + 'static,
84 F: Fn(T, T) -> T + Send + Sync + 'static,
85{
86 BinaryOperatorAggregate::new(initial, reducer)
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92
93 #[test]
94 fn test_channel_helpers() {
95 let mut lv = last_value::<i32>();
96 lv.update(vec![serde_json::json!(42)]).unwrap();
97 assert_eq!(lv.get().unwrap(), Some(serde_json::json!(42)));
98
99 let mut topic = topic::<String>();
100 topic.update(vec![serde_json::json!("hello")]).unwrap();
101 topic.update(vec![serde_json::json!("world")]).unwrap();
102 let values: Vec<String> = serde_json::from_value(topic.get().unwrap().unwrap()).unwrap();
103 assert_eq!(values.len(), 2);
104 }
105}