langgraph_core_rs/pregel/
write.rs1use async_trait::async_trait;
2use serde_json::Value as JsonValue;
3use langgraph_checkpoint::config::RunnableConfig;
4use crate::runnable::{Runnable, RunnableError};
5
6const PASSTHROUGH: &str = "__passthrough__";
8
9#[derive(Debug, Clone)]
11pub struct ChannelWriteEntry {
12 pub channel: String,
14 pub value: Option<JsonValue>,
16 pub skip_none: bool,
18}
19
20impl ChannelWriteEntry {
21 pub fn new(channel: impl Into<String>, value: Option<JsonValue>) -> Self {
22 Self {
23 channel: channel.into(),
24 value,
25 skip_none: false,
26 }
27 }
28
29 pub fn passthrough(channel: impl Into<String>) -> Self {
30 Self {
31 channel: channel.into(),
32 value: None,
33 skip_none: false,
34 }
35 }
36}
37
38pub struct ChannelWrite {
43 entries: Vec<ChannelWriteEntry>,
44}
45
46impl ChannelWrite {
47 pub fn new(entries: Vec<ChannelWriteEntry>) -> Self {
48 Self { entries }
49 }
50
51 fn assemble_writes(&self, input: &JsonValue) -> Vec<(String, JsonValue)> {
53 let mut writes = Vec::new();
54 for entry in &self.entries {
55 let value = match &entry.value {
56 Some(v) => v.clone(),
57 None => input.clone(), };
59
60 if entry.skip_none && value.is_null() {
61 continue;
62 }
63
64 writes.push((entry.channel.clone(), value));
65 }
66 writes
67 }
68}
69
70#[async_trait]
71impl Runnable for ChannelWrite {
72 fn invoke(&self, input: &JsonValue, config: &RunnableConfig) -> Result<JsonValue, RunnableError> {
73 let _writes = self.assemble_writes(input);
74
75 if let Some(configurable) = config.get("configurable") {
78 if let Some(_send_fn) = configurable.get(crate::constants::CONFIG_KEY_SEND) {
79 }
83 }
84
85 Ok(input.clone())
87 }
88
89 async fn ainvoke(&self, input: &JsonValue, config: &RunnableConfig) -> Result<JsonValue, RunnableError> {
90 self.invoke(input, config)
91 }
92
93 fn name(&self) -> &str {
94 "ChannelWrite"
95 }
96}
97
98pub fn write_to_targets(targets: &[String]) -> ChannelWrite {
101 let entries: Vec<ChannelWriteEntry> = targets
102 .iter()
103 .map(|t| {
104 ChannelWriteEntry::new(
105 format!("branch:to:{}", t),
106 Some(JsonValue::String(t.clone())),
107 )
108 })
109 .collect();
110 ChannelWrite::new(entries)
111}