pub struct CompiledGraph<S: State, I: IntoState<S> = S, O: FromState<S> = S> { /* private fields */ }Expand description
Compiled and validated graph ready for execution
This is the output of [StateGraph::compile] and contains all information
needed for graph execution by the Pregel engine.
§Examples
use juncture_core::{StateGraph, State};
let mut graph = StateGraph::<MyState>::new();
// ... add nodes and edges ...
let compiled = graph.compile()?;
let output = compiled.invoke(initial_state, &config)?;Implementations§
Source§impl<S: State, I: IntoState<S>, O: FromState<S>> CompiledGraph<S, I, O>
impl<S: State, I: IntoState<S>, O: FromState<S>> CompiledGraph<S, I, O>
Sourcepub fn invoke(
&self,
input: I,
config: &RunnableConfig,
) -> Result<GraphOutput<S, O>, JunctureError>
pub fn invoke( &self, input: I, config: &RunnableConfig, ) -> Result<GraphOutput<S, O>, JunctureError>
Invoke the graph synchronously
Executes the graph from the given input state and returns the final output.
§Errors
Returns JunctureError if execution fails.
§Examples
let output = compiled.invoke(initial_state, &config)?;
let final_state = output.value;Sourcepub async fn invoke_async(
&self,
input: I,
config: &RunnableConfig,
) -> Result<GraphOutput<S, O>, JunctureError>
pub async fn invoke_async( &self, input: I, config: &RunnableConfig, ) -> Result<GraphOutput<S, O>, JunctureError>
Invoke the graph asynchronously
Async version of invoke for use in async contexts.
§Errors
Returns JunctureError if execution fails.
§Examples
let output = compiled.invoke_async(initial_state, &config).await?;
let final_state = output.value;Sourcepub async fn stream(
&self,
input: I,
config: &RunnableConfig,
mode: StreamMode,
) -> Result<StreamHandle<S>, JunctureError>
pub async fn stream( &self, input: I, config: &RunnableConfig, mode: StreamMode, ) -> Result<StreamHandle<S>, JunctureError>
Stream graph execution as a sequence of events.
Executes the graph and emits StreamEvents
as each superstep completes, enabling real-time monitoring of execution progress.
This is a convenience wrapper around stream_with_config
that uses a default [StreamConfig] with no output key filtering.
§Arguments
input- Initial state for executionconfig- Execution configurationmode- Stream mode controlling what events are emitted
§Returns
A StreamHandle containing the run_id and a pinned stream of results,
where each result is either a StreamEvent or a JunctureError.
§Errors
Returns JunctureError if the graph cannot be initialized.
Runtime errors during execution are sent through the stream.
§Examples
use juncture_core::{StateGraph, State, StreamMode};
use futures::StreamExt;
let handle = compiled.stream(initial_state, &config, StreamMode::Values).await?;
println!("run_id = {}", handle.run_id());
let mut stream = handle.stream;
while let Some(result) = stream.next().await {
match result? {
StreamEvent::Values { state, step } => {
println!("Step {}: {:?}", step, state);
}
StreamEvent::End { output } => {
println!("Final state: {:?}", output);
}
_ => {}
}
}Sourcepub async fn stream_with_config(
&self,
input: I,
config: &RunnableConfig,
stream_config: StreamConfig,
) -> Result<StreamHandle<S>, JunctureError>
pub async fn stream_with_config( &self, input: I, config: &RunnableConfig, stream_config: StreamConfig, ) -> Result<StreamHandle<S>, JunctureError>
Stream graph execution with full [StreamConfig] control.
Like stream but accepts a [StreamConfig] instead
of a bare StreamMode, enabling output key filtering, subgraph
inclusion, and message batch tuning.
When [StreamConfig::output_keys] is set, StreamEvent::Values
events are replaced by StreamEvent::FilteredValues containing only
the requested fields as a JSON object. Similarly, StreamEvent::Updates
events become StreamEvent::FilteredUpdates.
§Arguments
input- Initial state for executionconfig- Execution configurationstream_config- Full streaming configuration (mode, output keys, etc.)
§Returns
A StreamHandle containing the run_id and a pinned stream of
results, where each result is either a StreamEvent or a JunctureError.
§Errors
Returns JunctureError if the graph cannot be initialized.
Runtime errors during execution are sent through the stream.
§Examples
use juncture_core::{StateGraph, State, StreamMode, stream::StreamConfig};
use futures::StreamExt;
let cfg = StreamConfig::new(StreamMode::Values)
.with_output_keys(vec!["messages".to_string()]);
let handle = compiled.stream_with_config(initial_state, &config, cfg).await?;
println!("run_id = {}", handle.run_id());
let mut stream = handle.stream;
while let Some(result) = stream.next().await {
match result? {
StreamEvent::FilteredValues { data, step } => {
println!("Step {}: {}", step, data);
}
StreamEvent::Values { state, step } => {
println!("Step {}: {:?}", step, state);
}
StreamEvent::End { output } => {
println!("Final state: {:?}", output);
}
_ => {}
}
}Sourcepub async fn execute_with_emitter(
&self,
input: S,
config: &RunnableConfig,
emitter: EventEmitter<S>,
) -> Result<S, JunctureError>
pub async fn execute_with_emitter( &self, input: S, config: &RunnableConfig, emitter: EventEmitter<S>, ) -> Result<S, JunctureError>
Execute the graph with an externally-provided event emitter
Unlike stream which creates internal channels,
this method accepts a pre-configured EventEmitter for subgraph
execution and custom streaming pipelines. The caller retains control
over the receiver end of the channel.
§Arguments
input- Initial state for executionconfig- Execution configurationemitter- Pre-configured event emitter for streaming events
§Returns
The final state S after graph execution completes.
§Errors
Returns JunctureError if the graph cannot be initialized
or if execution fails during a superstep.
§Examples
use juncture_core::{StateGraph, State, StreamMode, stream::EventEmitter};
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(256);
let emitter = EventEmitter::new(tx, StreamMode::Values);
// Spawn a task to consume events
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
println!("{event:?}");
}
});
let final_state = compiled.execute_with_emitter(input, &config, emitter).await?;Sourcepub async fn resume(
&self,
config: &RunnableConfig,
resume_value: ResumeValue,
) -> Result<GraphOutput<S, O>, JunctureError>
pub async fn resume( &self, config: &RunnableConfig, resume_value: ResumeValue, ) -> Result<GraphOutput<S, O>, JunctureError>
Resume execution from an interrupt point
Continues graph execution from where it was interrupted by a human-in-the-loop interaction, using the provided resume value(s).
§Arguments
config- Configuration withthread_idandcheckpoint_idsetresume_value- Resume value(s) to pass to interrupted node(s). Supports single value, ID-based resume, and namespace-based resume.
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured
or if the checkpoint cannot be found.
§Notes
This method requires S: DeserializeOwned to deserialize the state
from the checkpoint. This is a requirement of checkpoint-based recovery.
§Examples
use juncture_core::{StateGraph, State, interrupt::ResumeValue};
use serde_json::json;
// Single value resume
let output = compiled.resume(
&config,
ResumeValue::Single(json!("approved"))
).await?;
// ID-based resume for named interrupts
let mut by_id = std::collections::HashMap::new();
by_id.insert("interrupt_123".to_string(), json!("yes"));
let output = compiled.resume(&config, ResumeValue::ById(by_id)).await?;
// Namespace-based resume for multiple interrupts
let mut by_ns = std::collections::HashMap::new();
by_ns.insert("node1:0".to_string(), json!("value1"));
by_ns.insert("node2:0".to_string(), json!("value2"));
let output = compiled.resume(&config, ResumeValue::ByNamespace(by_ns)).await?;Sourcepub async fn resume_single(
&self,
config: &RunnableConfig,
value: Value,
) -> Result<GraphOutput<S, O>, JunctureError>
pub async fn resume_single( &self, config: &RunnableConfig, value: Value, ) -> Result<GraphOutput<S, O>, JunctureError>
Resume execution from an interrupt point with a single value
Convenience method for resuming with a single value. Equivalent to
calling resume() with ResumeValue::Single(value).
§Arguments
config- Configuration withthread_idandcheckpoint_idsetvalue- Single value to pass to the interrupted node
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured
or if the checkpoint cannot be found.
§Examples
use juncture_core::{StateGraph, State};
use serde_json::json;
// Simple single-value resume
let output = compiled.resume_single(&config, json!("approved")).await?;Sourcepub async fn resume_stream(
&self,
config: &RunnableConfig,
resume_value: ResumeValue,
mode: StreamMode,
) -> Result<StreamHandle<S>, JunctureError>
pub async fn resume_stream( &self, config: &RunnableConfig, resume_value: ResumeValue, mode: StreamMode, ) -> Result<StreamHandle<S>, JunctureError>
Resume execution from an interrupt checkpoint with streaming events.
Like resume but returns a stream of events
for monitoring execution progress in real time. Loads the checkpoint
identified by config.thread_id / config.checkpoint_id, validates
that it originated from an interrupt, deserializes the saved state,
and then runs the Pregel engine with the same streaming infrastructure
used by stream.
§Arguments
config- Configuration withthread_idandcheckpoint_idsetresume_value- Resume value(s) to pass to interrupted node(s). Supports single value, ID-based resume, and namespace-based resume.mode- Stream mode controlling what events are emitted
§Returns
A StreamHandle containing the run_id and a pinned stream of
results, where each result is either a
StreamEvent or a JunctureError.
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured,
no checkpoint is found, the checkpoint is not from an interrupt state,
or the state cannot be deserialized.
§Examples
use juncture_core::{StateGraph, State, StreamMode, interrupt::ResumeValue};
use futures::StreamExt;
use serde_json::json;
let handle = compiled.resume_stream(
&config,
ResumeValue::Single(json!("approved")),
StreamMode::Values,
).await?;
println!("run_id = {}", handle.run_id());
let mut stream = handle.stream;
while let Some(result) = stream.next().await {
match result? {
StreamEvent::Values { state, step } => {
println!("Step {}: {:?}", step, state);
}
StreamEvent::End { output } => {
println!("Final state: {:?}", output);
}
_ => {}
}
}Sourcepub async fn get_state(
&self,
config: &RunnableConfig,
) -> Result<Option<StateSnapshot<S>>, JunctureError>where
S: DeserializeOwned,
pub async fn get_state(
&self,
config: &RunnableConfig,
) -> Result<Option<StateSnapshot<S>>, JunctureError>where
S: DeserializeOwned,
Get the current state snapshot for a thread
Returns the state at the latest checkpoint for the given configuration.
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured,
the checkpoint cannot be retrieved, or the state cannot be deserialized.
Sourcepub async fn get_state_history(
&self,
_config: &RunnableConfig,
filter: Option<CheckpointFilter>,
) -> Result<Vec<StateSnapshot<S>>, JunctureError>
pub async fn get_state_history( &self, _config: &RunnableConfig, filter: Option<CheckpointFilter>, ) -> Result<Vec<StateSnapshot<S>>, JunctureError>
Get the full state history for a thread
Returns all checkpointed state snapshots for the given configuration, optionally filtered by the provided filter.
§Arguments
config- Configuration withthread_idsetfilter- Optional filter for narrowing history results
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured
or if the history cannot be retrieved.
Sourcepub async fn update_state(
&self,
config: &RunnableConfig,
update: StateUpdate<S>,
) -> Result<RunnableConfig, JunctureError>where
S: DeserializeOwned + Serialize,
pub async fn update_state(
&self,
config: &RunnableConfig,
update: StateUpdate<S>,
) -> Result<RunnableConfig, JunctureError>where
S: DeserializeOwned + Serialize,
Manually update the state at a checkpoint
Applies the provided state update to the current checkpoint state.
Used for administrative state modifications outside of normal execution.
The updated checkpoint is saved with CheckpointSource::Update and an
incremented step counter.
§Arguments
config- Configuration withthread_idandcheckpoint_idsetupdate- State update to apply (carriesupdate,label, andas_node)
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured,
the checkpoint cannot be found, state deserialization/serialization fails,
or the checkpoint cannot be saved.
§Notes
This method requires S: DeserializeOwned + Serialize to deserialize
the state from the checkpoint and re-serialize after applying the update.
Sourcepub async fn bulk_update_state(
&self,
_config: &RunnableConfig,
updates: Vec<StateUpdate<S>>,
) -> Result<Vec<RunnableConfig>, JunctureError>
pub async fn bulk_update_state( &self, _config: &RunnableConfig, updates: Vec<StateUpdate<S>>, ) -> Result<Vec<RunnableConfig>, JunctureError>
Bulk update state across multiple checkpoints
Applies multiple state updates atomically. If any update fails, none of the updates are applied.
§Arguments
config- Configuration withthread_idsetupdates- List of state updates to apply
§Errors
Returns [JunctureError::Checkpoint] if no checkpointer is configured
or if any update cannot be applied.
Sourcepub fn get_graph(&self, xray: Option<usize>) -> DrawableGraph
pub fn get_graph(&self, xray: Option<usize>) -> DrawableGraph
Get a drawable graph representation
Returns the graph structure for visualization, optionally including nested subgraph detail up to the specified depth.
§Arguments
xray- Optional depth for subgraph x-ray visualization.Nonerenders only the top-level graph;Some(n)expands subgraphs up tonlevels deep.
Sourcepub fn get_subgraphs(&self) -> Vec<SubgraphInfo>
pub fn get_subgraphs(&self) -> Vec<SubgraphInfo>
Get information about subgraphs in this compiled graph
Returns metadata about each mounted subgraph, including its name and persistence configuration.
Sourcepub fn trigger_table(&self) -> &TriggerTable<S>
pub fn trigger_table(&self) -> &TriggerTable<S>
Get the trigger table
Sourcepub fn checkpointer(&self) -> Option<&Arc<dyn CheckpointSaver>>
pub fn checkpointer(&self) -> Option<&Arc<dyn CheckpointSaver>>
Get the checkpointer (if configured)
Sourcepub fn builder_metadata(&self) -> &IndexMap<String, NodeMetadata>
pub fn builder_metadata(&self) -> &IndexMap<String, NodeMetadata>
Get the builder metadata for nodes
Sourcepub fn to_mermaid(&self) -> String
pub fn to_mermaid(&self) -> String
Trait Implementations§
Source§impl<S: Clone + State, I: Clone + IntoState<S>, O: Clone + FromState<S>> Clone for CompiledGraph<S, I, O>
impl<S: Clone + State, I: Clone + IntoState<S>, O: Clone + FromState<S>> Clone for CompiledGraph<S, I, O>
Source§fn clone(&self) -> CompiledGraph<S, I, O>
fn clone(&self) -> CompiledGraph<S, I, O>
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more