Skip to main content

PregelLoop

Struct PregelLoop 

Source
pub struct PregelLoop<S: State> {
    pub state: S,
    pub nodes: IndexMap<String, Arc<dyn Node<S>>>,
    pub trigger_table: TriggerTable<S>,
    pub field_versions: FieldVersionTracker,
    pub versions_seen: VersionsSeen,
    pub runnable_config: RunnableConfig,
    pub cancellation_token: CancellationToken,
    pub stream_tx: Option<UnboundedSender<StreamEvent<S>>>,
    pub checkpointer: Option<Arc<dyn CheckpointSaver>>,
    pub step: usize,
    pub status: LoopStatus,
    pub pending_tasks: Vec<PendingTask<S>>,
    /* private fields */
}
Expand description

Main Pregel execution loop

Orchestrates graph execution using the Pregel algorithm, managing task scheduling, version tracking, and execution state.

Fields§

§state: S

Current execution state

§nodes: IndexMap<String, Arc<dyn Node<S>>>

Graph nodes

§trigger_table: TriggerTable<S>

Trigger table for routing

§field_versions: FieldVersionTracker

Field version tracker

§versions_seen: VersionsSeen

Versions seen by each node

§runnable_config: RunnableConfig

Execution configuration

§cancellation_token: CancellationToken

Cancellation token

§stream_tx: Option<UnboundedSender<StreamEvent<S>>>

Optional stream event sender

§checkpointer: Option<Arc<dyn CheckpointSaver>>

Optional checkpoint saver for crash recovery

§step: usize

Current step number

§status: LoopStatus

Loop status

§pending_tasks: Vec<PendingTask<S>>

Pending tasks for next superstep

Implementations§

Source§

impl<S: State> PregelLoop<S>

Source

pub fn new( state: S, nodes: IndexMap<String, Arc<dyn Node<S>>>, trigger_table: TriggerTable<S>, config: RunnableConfig, num_fields: usize, ) -> Result<Self, JunctureError>

Create a new Pregel loop

§Arguments
  • state - Initial state
  • nodes - Graph nodes
  • trigger_table - Trigger table for routing
  • config - Execution configuration
  • num_fields - Number of fields in the state
§Errors

Returns an error if:

  • The trigger table is invalid
§Examples
use juncture_core::pregel::loop_::PregelLoop;

let loop = PregelLoop::new(
    initial_state,
    nodes,
    trigger_table,
    config,
    5, // number of fields
)?;
Source

pub fn with_error_handlers( state: S, nodes: IndexMap<String, Arc<dyn Node<S>>>, trigger_table: TriggerTable<S>, config: RunnableConfig, num_fields: usize, error_handler_map: HashMap<String, String>, ) -> Result<Self, JunctureError>

Create a new Pregel loop with error handler mappings

Like new but accepts a pre-built error handler map extracted from builder metadata. Nodes with entries in this map will have their failures routed to the named handler instead of canceling the entire superstep.

§Arguments
  • state - Initial state
  • nodes - Graph nodes
  • trigger_table - Trigger table for routing
  • config - Execution configuration
  • num_fields - Number of fields in the state
  • error_handler_map - Maps node names to error handler node names
§Errors

Returns an error if the trigger table is invalid.

Source

pub fn set_stream_sender(&mut self, tx: UnboundedSender<StreamEvent<S>>)

Set the stream event sender

§Examples
use tokio::sync::mpsc;

let (tx, _rx) = mpsc::unbounded_channel();
loop.set_stream_sender(tx);
Source

pub fn set_checkpointer(&mut self, saver: Arc<dyn CheckpointSaver>)

Set the checkpoint saver for crash recovery during supersteps

Source

pub fn set_budget_tracker(&mut self, tracker: BudgetTracker)

Set the budget tracker

Wraps the tracker in an Arc so it can be shared between the PregelLoop (for budget checking) and the RunnableConfig (for node-level token reporting). Both share the same underlying counters via atomic operations.

§Examples
use juncture_core::pregel::budget::BudgetTracker;

let budget = BudgetTracker::new(BudgetConfig::new());
loop.set_budget_tracker(budget);
Source

pub fn set_retry_policies(&mut self, policies: HashMap<String, RetryPolicy>)

Set per-node retry policies

Each entry maps a node name to its [RetryPolicy]. During superstep execution, nodes with a configured policy are wrapped with [crate::graph::builder::execute_with_retry] for automatic retries with exponential backoff and jitter.

Source

pub fn set_timeout_policies(&mut self, policies: HashMap<String, TimeoutPolicy>)

Set per-node timeout policies

Each entry maps a node name to its TimeoutPolicy. During superstep execution, nodes with a configured policy are wrapped with tokio::time::timeout using the configured run_timeout. The timeout wraps the entire execution including any retry attempts.

Source

pub fn set_circuit_breaker_policies( &mut self, configs: HashMap<String, CircuitBreakerConfig>, )

Set per-node circuit breaker configurations

Each entry maps a node name to its CircuitBreakerConfig. During superstep execution, nodes with a configured circuit breaker are checked before execution. If the circuit is open (too many consecutive failures), the node is skipped and an error is returned.

Source

pub fn set_fallback_map(&mut self, map: HashMap<String, String>)

Set per-node fallback node mappings

Each entry maps a node name to its fallback node name. When a task fails and its node has a fallback, the engine creates a recovery task targeting the fallback node instead of propagating the error.

Source

pub fn health(&self) -> HealthStatus

Get the current health status of the graph execution

Returns a snapshot of the health state including per-node health information based on circuit breaker states. This is useful for monitoring and diagnostics.

Source

pub fn tick(&mut self) -> Result<bool, JunctureError>

Execute one tick of the loop

Returns true if execution should continue, false if done.

§Errors

Returns an error if:

  • Recursion limit is reached
  • Cancellation is requested
  • Budget limits are exceeded
§Examples
while loop.tick()? {
    let result = loop.execute_superstep().await?;
    loop.after_tick(result)?;
}
Source

pub async fn execute_superstep( &mut self, ) -> Result<SuperstepResult<S>, JunctureError>

Execute one superstep

Delegates to [runner::execute_superstep] with the current step number for observability span attributes.

§Errors

Returns an error if:

  • Task execution fails
  • Cancellation is requested
§Examples
let result = loop.execute_superstep().await?;
Source

pub async fn after_tick( &mut self, result: SuperstepResult<S>, ) -> Result<(), JunctureError>
where S: Clone + Serialize,

Process results after a superstep

§Errors

Returns an error if:

  • Task computation fails
§Panics

Panics if a task duration exceeds u64::MAX milliseconds (extremely unlikely)

§Examples
loop.after_tick(result).await?;
Source

pub fn into_state(self) -> S

Consume the loop and return the final state

§Examples
let final_state = loop.into_state();
Source

pub const fn step(&self) -> usize

Get the current step number

Source

pub fn run_id(&self) -> &str

Get the unique run ID for this execution

Source

pub const fn status(&self) -> &LoopStatus

Get the current status

Source

pub fn pending_interrupts(&self) -> &[InterruptSignal]

Get the pending interrupt signals for checkpoint persistence

Source

pub const fn scratchpad(&self) -> &Scratchpad

Get a reference to the scratchpad for interrupt tracking

Source

pub const fn scratchpad_mut(&mut self) -> &mut Scratchpad

Get a mutable reference to the scratchpad for interrupt tracking

Source

pub const fn is_running(&self) -> bool

Check if the loop is still running

Source

pub fn snapshot_state(&self) -> S
where S: Clone,

Get a clone of the current state without consuming the loop

Useful for streaming execution where state snapshots are needed after each superstep without terminating the loop.

Source

pub const fn run_control(&self) -> &RunControl

Get the run control for graceful shutdown

Returns a clone of the run control that can be used to request drain from another thread or context.

§Examples
use juncture_core::pregel::loop_::PregelLoop;

let mut loop = PregelLoop::new(...)?;
let run_control = loop.run_control();

// From another thread
std::thread::spawn(move || {
    run_control.request_drain();
});
Source

pub fn as_context(&self) -> ExecutionContext<S>
where S: Clone,

Get a view of the current execution context

Returns an ExecutionContext value that provides typed access to the mutable execution state (state, field_versions, versions_seen). This provides the design-intended separation between mutable context and immutable configuration.

Note: Returns a cloned context, not a reference, since ExecutionContext is designed to own its data.

§Examples
use juncture_core::pregel::loop_::PregelLoop;

let loop = PregelLoop::new(...)?;
let context = loop.as_context();
let versions = context.field_versions.versions();
Source

pub fn as_config(&self) -> ExecutionConfig

Get a view of the current execution config

Returns an ExecutionConfig value that provides typed access to the immutable execution configuration (recursion_limit, interrupts, etc.). This provides the design-intended separation between mutable context and immutable configuration.

Note: Returns a cloned config, not a reference, since ExecutionConfig is designed to own its data.

§Examples
use juncture_core::pregel::loop_::PregelLoop;

let loop = PregelLoop::new(...)?;
let config = loop.as_config();
let limit = config.recursion_limit;
Source

pub async fn save_pending_interrupt_checkpoint(&mut self)
where S: Serialize,

Save a pending interrupt checkpoint for interrupt_before scenarios.

When tick() detects an interrupt_before, the loop exits immediately (tick is synchronous and cannot call the async checkpointer). The caller should invoke this method after the loop exits when the status is LoopStatus::InterruptBefore.

This is a no-op if no checkpointer is configured or if the status is not interrupted.

§Type Parameters

Requires S: serde::Serialize to serialize the current state.

§Errors

Does not return errors – checkpoint save failures are logged and the interrupt is still surfaced to the caller.

Trait Implementations§

Source§

impl<S: State> Debug for PregelLoop<S>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<S> !RefUnwindSafe for PregelLoop<S>

§

impl<S> !UnwindSafe for PregelLoop<S>

§

impl<S> Freeze for PregelLoop<S>
where S: Freeze,

§

impl<S> Send for PregelLoop<S>

§

impl<S> Sync for PregelLoop<S>

§

impl<S> Unpin for PregelLoop<S>
where S: Unpin,

§

impl<S> UnsafeUnpin for PregelLoop<S>
where S: UnsafeUnpin,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more