Skip to main content

langgraph_core_rs/pregel/
mod.rs

1pub mod algo;
2pub mod read;
3pub mod write;
4pub mod runner;
5pub mod io;
6
7pub use algo::{prepare_next_tasks, apply_writes};
8pub use read::PregelNode;
9pub use write::{ChannelWrite, ChannelWriteEntry};
10pub use runner::PregelRunner;
11pub use io::{map_input, map_command, read_channels, NULL_TASK_ID};
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use serde_json::Value as JsonValue;
16use langgraph_checkpoint::config::RunnableConfig;
17use crate::channels::Channel;
18use crate::runnable::Runnable;
19
20/// A task ready for execution in the current super-step.
21#[derive(Debug, Clone)]
22pub struct PregelTask {
23    /// Name of the node to execute.
24    pub name: String,
25    /// Input value for this task.
26    pub input: JsonValue,
27    /// Channels this task reads from.
28    pub input_channels: Vec<String>,
29    /// Channels that triggered this task.
30    pub triggers: Vec<String>,
31    /// Unique task ID (deterministic from checkpoint + step + name).
32    pub id: String,
33}
34
35/// An executable task with its runnable and write buffer.
36pub struct PregelExecutableTask {
37    /// Name of the node.
38    pub name: String,
39    /// Input value.
40    pub input: JsonValue,
41    /// The runnable to execute (node logic + writers).
42    pub proc: Arc<dyn Runnable>,
43    /// Write buffer: (channel, value) pairs collected during execution.
44    pub writes: Vec<(String, JsonValue)>,
45    /// Task configuration (with CONFIG_KEY_SEND, CONFIG_KEY_READ, etc.).
46    pub config: RunnableConfig,
47    /// Channels that triggered this task.
48    pub triggers: Vec<String>,
49    /// Unique task ID.
50    pub id: String,
51}
52
53/// Status of the Pregel loop.
54#[derive(Debug, Clone, PartialEq)]
55pub enum LoopStatus {
56    /// Processing initial input.
57    Input,
58    /// Tasks pending execution.
59    Pending,
60    /// All done — no more tasks.
61    Done,
62    /// Interrupted before node execution.
63    InterruptBefore,
64    /// Interrupted after node execution.
65    InterruptAfter,
66    /// Step limit reached.
67    OutOfSteps,
68}
69
70/// Channel version tracking.
71pub type ChannelVersions = HashMap<String, JsonValue>;
72
73/// Reverse index from channel name to nodes triggered by that channel.
74pub type TriggerToNodes = HashMap<String, Vec<String>>;
75
76/// Build the reverse index: channel_name -> [node_names that are triggered by it].
77pub fn build_trigger_to_nodes(nodes: &HashMap<String, PregelNode>) -> TriggerToNodes {
78    let mut map: TriggerToNodes = HashMap::new();
79    for (name, node) in nodes {
80        for trigger in &node.triggers {
81            map.entry(trigger.clone()).or_default().push(name.clone());
82        }
83    }
84    map
85}
86
87/// Reconstruct live channels from a checkpoint.
88pub fn channels_from_checkpoint(
89    specs: &HashMap<String, Box<dyn Channel>>,
90    checkpoint_channels: &HashMap<String, Option<JsonValue>>,
91) -> HashMap<String, Box<dyn Channel>> {
92    let mut channels = HashMap::new();
93    for (key, spec) in specs {
94        let cp = checkpoint_channels.get(key).and_then(|v| v.as_ref());
95        channels.insert(key.clone(), spec.from_checkpoint(cp));
96    }
97    channels
98}