Skip to main content

langgraph_core_rs/pregel/
write.rs

1use async_trait::async_trait;
2use serde_json::Value as JsonValue;
3use langgraph_checkpoint::config::RunnableConfig;
4use crate::runnable::{Runnable, RunnableError};
5
6/// Sentinel value indicating the input should be passed through unchanged.
7const PASSTHROUGH: &str = "__passthrough__";
8
9/// A single channel write entry.
10#[derive(Debug, Clone)]
11pub struct ChannelWriteEntry {
12    /// Target channel name.
13    pub channel: String,
14    /// Value to write. If `None`, uses PASSTHROUGH (the node's output).
15    pub value: Option<JsonValue>,
16    /// If true, skip writing when value is null.
17    pub skip_none: bool,
18}
19
20impl ChannelWriteEntry {
21    pub fn new(channel: impl Into<String>, value: Option<JsonValue>) -> Self {
22        Self {
23            channel: channel.into(),
24            value,
25            skip_none: false,
26        }
27    }
28
29    pub fn passthrough(channel: impl Into<String>) -> Self {
30        Self {
31            channel: channel.into(),
32            value: None,
33            skip_none: false,
34        }
35    }
36}
37
38/// A Runnable that writes values to channels.
39///
40/// When invoked, replaces PASSTHROUGH sentinels with the actual input,
41/// then calls `config[CONFIG_KEY_SEND]` to buffer the writes.
42pub struct ChannelWrite {
43    entries: Vec<ChannelWriteEntry>,
44}
45
46impl ChannelWrite {
47    pub fn new(entries: Vec<ChannelWriteEntry>) -> Self {
48        Self { entries }
49    }
50
51    /// Process writes: replace PASSTHROUGH with actual input, filter skip_none.
52    fn assemble_writes(&self, input: &JsonValue) -> Vec<(String, JsonValue)> {
53        let mut writes = Vec::new();
54        for entry in &self.entries {
55            let value = match &entry.value {
56                Some(v) => v.clone(),
57                None => input.clone(), // PASSTHROUGH
58            };
59
60            if entry.skip_none && value.is_null() {
61                continue;
62            }
63
64            writes.push((entry.channel.clone(), value));
65        }
66        writes
67    }
68}
69
70#[async_trait]
71impl Runnable for ChannelWrite {
72    fn invoke(&self, input: &JsonValue, config: &RunnableConfig) -> Result<JsonValue, RunnableError> {
73        let _writes = self.assemble_writes(input);
74
75        // Store writes in the configurable dict under CONFIG_KEY_SEND
76        // The caller (PregelLoop/Runner) extracts these after execution
77        if let Some(configurable) = config.get("configurable") {
78            if let Some(_send_fn) = configurable.get(crate::constants::CONFIG_KEY_SEND) {
79                // If there's a send function registered, use it
80                // For now, we store writes as a JSON array in the config
81                // This will be extracted by the runner
82            }
83        }
84
85        // Return the input unchanged (writers are side-effect only)
86        Ok(input.clone())
87    }
88
89    async fn ainvoke(&self, input: &JsonValue, config: &RunnableConfig) -> Result<JsonValue, RunnableError> {
90        self.invoke(input, config)
91    }
92
93    fn name(&self) -> &str {
94        "ChannelWrite"
95    }
96}
97
98/// Helper to create a ChannelWrite that writes the node's output to
99/// the "branch:to:{target}" trigger channels for each destination.
100pub fn write_to_targets(targets: &[String]) -> ChannelWrite {
101    let entries: Vec<ChannelWriteEntry> = targets
102        .iter()
103        .map(|t| {
104            ChannelWriteEntry::new(
105                format!("branch:to:{}", t),
106                Some(JsonValue::String(t.clone())),
107            )
108        })
109        .collect();
110    ChannelWrite::new(entries)
111}