Skip to main content

CompiledGraph

Struct CompiledGraph 

Source
pub struct CompiledGraph<S: State, I: IntoState<S> = S, O: FromState<S> = S> { /* private fields */ }
Expand description

Compiled and validated graph ready for execution

This is the output of [StateGraph::compile] and contains all information needed for graph execution by the Pregel engine.

§Examples

use juncture_core::{StateGraph, State};

let mut graph = StateGraph::<MyState>::new();
// ... add nodes and edges ...

let compiled = graph.compile()?;
let output = compiled.invoke(initial_state, &config)?;

Implementations§

Source§

impl<S: State, I: IntoState<S>, O: FromState<S>> CompiledGraph<S, I, O>

Source

pub fn invoke( &self, input: I, config: &RunnableConfig, ) -> Result<GraphOutput<S, O>, JunctureError>

Invoke the graph synchronously

Executes the graph from the given input state and returns the final output.

§Errors

Returns JunctureError if execution fails.

§Examples
let output = compiled.invoke(initial_state, &config)?;
let final_state = output.value;
Source

pub async fn invoke_async( &self, input: I, config: &RunnableConfig, ) -> Result<GraphOutput<S, O>, JunctureError>

Invoke the graph asynchronously

Async version of invoke for use in async contexts.

§Errors

Returns JunctureError if execution fails.

§Examples
let output = compiled.invoke_async(initial_state, &config).await?;
let final_state = output.value;
Source

pub async fn stream( &self, input: I, config: &RunnableConfig, mode: StreamMode, ) -> Result<StreamHandle<S>, JunctureError>
where S: Clone + Send + DeserializeOwned + Serialize + 'static, S::Update: Serialize,

Stream graph execution as a sequence of events.

Executes the graph and emits StreamEvents as each superstep completes, enabling real-time monitoring of execution progress.

This is a convenience wrapper around stream_with_config that uses a default [StreamConfig] with no output key filtering.

§Arguments
  • input - Initial state for execution
  • config - Execution configuration
  • mode - Stream mode controlling what events are emitted
§Returns

A StreamHandle containing the run_id and a pinned stream of results, where each result is either a StreamEvent or a JunctureError.

§Errors

Returns JunctureError if the graph cannot be initialized. Runtime errors during execution are sent through the stream.

§Examples
use juncture_core::{StateGraph, State, StreamMode};
use futures::StreamExt;

let handle = compiled.stream(initial_state, &config, StreamMode::Values).await?;
println!("run_id = {}", handle.run_id());

let mut stream = handle.stream;
while let Some(result) = stream.next().await {
    match result? {
        StreamEvent::Values { state, step } => {
            println!("Step {}: {:?}", step, state);
        }
        StreamEvent::End { output } => {
            println!("Final state: {:?}", output);
        }
        _ => {}
    }
}
Source

pub async fn stream_with_config( &self, input: I, config: &RunnableConfig, stream_config: StreamConfig, ) -> Result<StreamHandle<S>, JunctureError>
where S: Clone + Send + DeserializeOwned + Serialize + 'static, S::Update: Serialize,

Stream graph execution with full [StreamConfig] control.

Like stream but accepts a [StreamConfig] instead of a bare StreamMode, enabling output key filtering, subgraph inclusion, and message batch tuning.

When [StreamConfig::output_keys] is set, StreamEvent::Values events are replaced by StreamEvent::FilteredValues containing only the requested fields as a JSON object. Similarly, StreamEvent::Updates events become StreamEvent::FilteredUpdates.

§Arguments
  • input - Initial state for execution
  • config - Execution configuration
  • stream_config - Full streaming configuration (mode, output keys, etc.)
§Returns

A StreamHandle containing the run_id and a pinned stream of results, where each result is either a StreamEvent or a JunctureError.

§Errors

Returns JunctureError if the graph cannot be initialized. Runtime errors during execution are sent through the stream.

§Examples
use juncture_core::{StateGraph, State, StreamMode, stream::StreamConfig};
use futures::StreamExt;

let cfg = StreamConfig::new(StreamMode::Values)
    .with_output_keys(vec!["messages".to_string()]);

let handle = compiled.stream_with_config(initial_state, &config, cfg).await?;
println!("run_id = {}", handle.run_id());

let mut stream = handle.stream;
while let Some(result) = stream.next().await {
    match result? {
        StreamEvent::FilteredValues { data, step } => {
            println!("Step {}: {}", step, data);
        }
        StreamEvent::Values { state, step } => {
            println!("Step {}: {:?}", step, state);
        }
        StreamEvent::End { output } => {
            println!("Final state: {:?}", output);
        }
        _ => {}
    }
}
Source

pub async fn execute_with_emitter( &self, input: S, config: &RunnableConfig, emitter: EventEmitter<S>, ) -> Result<S, JunctureError>
where S: Clone + Send + DeserializeOwned + Serialize + 'static, S::Update: Serialize,

Execute the graph with an externally-provided event emitter

Unlike stream which creates internal channels, this method accepts a pre-configured EventEmitter for subgraph execution and custom streaming pipelines. The caller retains control over the receiver end of the channel.

§Arguments
  • input - Initial state for execution
  • config - Execution configuration
  • emitter - Pre-configured event emitter for streaming events
§Returns

The final state S after graph execution completes.

§Errors

Returns JunctureError if the graph cannot be initialized or if execution fails during a superstep.

§Examples
use juncture_core::{StateGraph, State, StreamMode, stream::EventEmitter};
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(256);
let emitter = EventEmitter::new(tx, StreamMode::Values);

// Spawn a task to consume events
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        println!("{event:?}");
    }
});

let final_state = compiled.execute_with_emitter(input, &config, emitter).await?;
Source

pub async fn resume( &self, config: &RunnableConfig, resume_value: ResumeValue, ) -> Result<GraphOutput<S, O>, JunctureError>
where S: for<'de> Deserialize<'de> + Serialize, S::Update: Serialize, O: FromState<S>,

Resume execution from an interrupt point

Continues graph execution from where it was interrupted by a human-in-the-loop interaction, using the provided resume value(s).

§Arguments
  • config - Configuration with thread_id and checkpoint_id set
  • resume_value - Resume value(s) to pass to interrupted node(s). Supports single value, ID-based resume, and namespace-based resume.
§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured or if the checkpoint cannot be found.

§Notes

This method requires S: DeserializeOwned to deserialize the state from the checkpoint. This is a requirement of checkpoint-based recovery.

§Examples
use juncture_core::{StateGraph, State, interrupt::ResumeValue};
use serde_json::json;

// Single value resume
let output = compiled.resume(
    &config,
    ResumeValue::Single(json!("approved"))
).await?;

// ID-based resume for named interrupts
let mut by_id = std::collections::HashMap::new();
by_id.insert("interrupt_123".to_string(), json!("yes"));
let output = compiled.resume(&config, ResumeValue::ById(by_id)).await?;

// Namespace-based resume for multiple interrupts
let mut by_ns = std::collections::HashMap::new();
by_ns.insert("node1:0".to_string(), json!("value1"));
by_ns.insert("node2:0".to_string(), json!("value2"));
let output = compiled.resume(&config, ResumeValue::ByNamespace(by_ns)).await?;
Source

pub async fn resume_single( &self, config: &RunnableConfig, value: Value, ) -> Result<GraphOutput<S, O>, JunctureError>
where S: for<'de> Deserialize<'de> + Serialize, S::Update: Serialize, O: FromState<S>,

Resume execution from an interrupt point with a single value

Convenience method for resuming with a single value. Equivalent to calling resume() with ResumeValue::Single(value).

§Arguments
  • config - Configuration with thread_id and checkpoint_id set
  • value - Single value to pass to the interrupted node
§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured or if the checkpoint cannot be found.

§Examples
use juncture_core::{StateGraph, State};
use serde_json::json;

// Simple single-value resume
let output = compiled.resume_single(&config, json!("approved")).await?;
Source

pub async fn resume_stream( &self, config: &RunnableConfig, resume_value: ResumeValue, mode: StreamMode, ) -> Result<StreamHandle<S>, JunctureError>
where S: Clone + Send + for<'de> Deserialize<'de> + Serialize + 'static, S::Update: Serialize,

Resume execution from an interrupt checkpoint with streaming events.

Like resume but returns a stream of events for monitoring execution progress in real time. Loads the checkpoint identified by config.thread_id / config.checkpoint_id, validates that it originated from an interrupt, deserializes the saved state, and then runs the Pregel engine with the same streaming infrastructure used by stream.

§Arguments
  • config - Configuration with thread_id and checkpoint_id set
  • resume_value - Resume value(s) to pass to interrupted node(s). Supports single value, ID-based resume, and namespace-based resume.
  • mode - Stream mode controlling what events are emitted
§Returns

A StreamHandle containing the run_id and a pinned stream of results, where each result is either a StreamEvent or a JunctureError.

§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured, no checkpoint is found, the checkpoint is not from an interrupt state, or the state cannot be deserialized.

§Examples
use juncture_core::{StateGraph, State, StreamMode, interrupt::ResumeValue};
use futures::StreamExt;
use serde_json::json;

let handle = compiled.resume_stream(
    &config,
    ResumeValue::Single(json!("approved")),
    StreamMode::Values,
).await?;
println!("run_id = {}", handle.run_id());

let mut stream = handle.stream;
while let Some(result) = stream.next().await {
    match result? {
        StreamEvent::Values { state, step } => {
            println!("Step {}: {:?}", step, state);
        }
        StreamEvent::End { output } => {
            println!("Final state: {:?}", output);
        }
        _ => {}
    }
}
Source

pub async fn get_state( &self, config: &RunnableConfig, ) -> Result<Option<StateSnapshot<S>>, JunctureError>

Get the current state snapshot for a thread

Returns the state at the latest checkpoint for the given configuration.

§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured, the checkpoint cannot be retrieved, or the state cannot be deserialized.

Source

pub async fn get_state_history( &self, _config: &RunnableConfig, filter: Option<CheckpointFilter>, ) -> Result<Vec<StateSnapshot<S>>, JunctureError>

Get the full state history for a thread

Returns all checkpointed state snapshots for the given configuration, optionally filtered by the provided filter.

§Arguments
  • config - Configuration with thread_id set
  • filter - Optional filter for narrowing history results
§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured or if the history cannot be retrieved.

Source

pub async fn update_state( &self, config: &RunnableConfig, update: StateUpdate<S>, ) -> Result<RunnableConfig, JunctureError>

Manually update the state at a checkpoint

Applies the provided state update to the current checkpoint state. Used for administrative state modifications outside of normal execution. The updated checkpoint is saved with CheckpointSource::Update and an incremented step counter.

§Arguments
  • config - Configuration with thread_id and checkpoint_id set
  • update - State update to apply (carries update, label, and as_node)
§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured, the checkpoint cannot be found, state deserialization/serialization fails, or the checkpoint cannot be saved.

§Notes

This method requires S: DeserializeOwned + Serialize to deserialize the state from the checkpoint and re-serialize after applying the update.

Source

pub async fn bulk_update_state( &self, _config: &RunnableConfig, updates: Vec<StateUpdate<S>>, ) -> Result<Vec<RunnableConfig>, JunctureError>

Bulk update state across multiple checkpoints

Applies multiple state updates atomically. If any update fails, none of the updates are applied.

§Arguments
  • config - Configuration with thread_id set
  • updates - List of state updates to apply
§Errors

Returns [JunctureError::Checkpoint] if no checkpointer is configured or if any update cannot be applied.

Source

pub fn get_graph(&self, xray: Option<usize>) -> DrawableGraph

Get a drawable graph representation

Returns the graph structure for visualization, optionally including nested subgraph detail up to the specified depth.

§Arguments
  • xray - Optional depth for subgraph x-ray visualization. None renders only the top-level graph; Some(n) expands subgraphs up to n levels deep.
Source

pub fn get_subgraphs(&self) -> Vec<SubgraphInfo>

Get information about subgraphs in this compiled graph

Returns metadata about each mounted subgraph, including its name and persistence configuration.

Source

pub fn nodes(&self) -> &IndexMap<String, Arc<dyn Node<S>>>

Get the nodes in this graph

Source

pub fn trigger_table(&self) -> &TriggerTable<S>

Get the trigger table

Source

pub fn checkpointer(&self) -> Option<&Arc<dyn CheckpointSaver>>

Get the checkpointer (if configured)

Source

pub fn builder_metadata(&self) -> &IndexMap<String, NodeMetadata>

Get the builder metadata for nodes

Source

pub fn to_mermaid(&self) -> String

Export graph as Mermaid diagram

Returns a string in Mermaid format that can be rendered by Mermaid.js.

§Examples
let mermaid = compiled.to_mermaid();
let diagram = format!("```mermaid\n{mermaid}\n```");
Source

pub fn to_dot(&self) -> String

Export graph as DOT format

Returns a string in Graphviz DOT format.

§Examples
let dot = compiled.to_dot();
// Use the DOT format with Graphviz or other tools
Source

pub fn to_json(&self) -> Value

Export graph structure as JSON

Returns a JSON value representing the graph structure.

§Examples
let json = compiled.to_json();
let pretty = serde_json::to_string_pretty(&json)?;
Source

pub fn to_html(&self) -> String

Export graph as a self-contained HTML file with Mermaid.js

Generates an HTML string that renders the graph as an interactive Mermaid diagram. The HTML includes Mermaid.js via CDN and supports zoom/pan in the browser.

§Examples
let html = compiled.to_html();
std::fs::write("graph.html", html)?;
Source

pub fn display(&self) -> String

Display the graph as a formatted string for terminal output

Returns a human-readable string representation of the graph structure with Unicode box-drawing characters for edges and clear node labels.

§Examples
println!("{}", compiled.display());

Trait Implementations§

Source§

impl<S: Clone + State, I: Clone + IntoState<S>, O: Clone + FromState<S>> Clone for CompiledGraph<S, I, O>

Source§

fn clone(&self) -> CompiledGraph<S, I, O>

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<S: State, I: IntoState<S>, O: FromState<S>> Debug for CompiledGraph<S, I, O>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<S, I = S, O = S> !RefUnwindSafe for CompiledGraph<S, I, O>

§

impl<S, I = S, O = S> !UnwindSafe for CompiledGraph<S, I, O>

§

impl<S, I, O> Freeze for CompiledGraph<S, I, O>

§

impl<S, I, O> Send for CompiledGraph<S, I, O>

§

impl<S, I, O> Sync for CompiledGraph<S, I, O>

§

impl<S, I, O> Unpin for CompiledGraph<S, I, O>
where I: Unpin, O: Unpin,

§

impl<S, I, O> UnsafeUnpin for CompiledGraph<S, I, O>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> DynClone for T
where T: Clone,

Source§

fn __clone_box(&self, _: Private) -> *mut ()

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more