pub struct PregelLoop<S: State> {
pub state: S,
pub nodes: IndexMap<String, Arc<dyn Node<S>>>,
pub trigger_table: TriggerTable<S>,
pub field_versions: FieldVersionTracker,
pub versions_seen: VersionsSeen,
pub runnable_config: RunnableConfig,
pub cancellation_token: CancellationToken,
pub stream_tx: Option<UnboundedSender<StreamEvent<S>>>,
pub checkpointer: Option<Arc<dyn CheckpointSaver>>,
pub step: usize,
pub status: LoopStatus,
pub pending_tasks: Vec<PendingTask<S>>,
/* private fields */
}Expand description
Main Pregel execution loop
Orchestrates graph execution using the Pregel algorithm, managing task scheduling, version tracking, and execution state.
Fields§
§state: SCurrent execution state
nodes: IndexMap<String, Arc<dyn Node<S>>>Graph nodes
trigger_table: TriggerTable<S>Trigger table for routing
field_versions: FieldVersionTrackerField version tracker
versions_seen: VersionsSeenVersions seen by each node
runnable_config: RunnableConfigExecution configuration
cancellation_token: CancellationTokenCancellation token
stream_tx: Option<UnboundedSender<StreamEvent<S>>>Optional stream event sender
checkpointer: Option<Arc<dyn CheckpointSaver>>Optional checkpoint saver for crash recovery
step: usizeCurrent step number
status: LoopStatusLoop status
pending_tasks: Vec<PendingTask<S>>Pending tasks for next superstep
Implementations§
Source§impl<S: State> PregelLoop<S>
impl<S: State> PregelLoop<S>
Sourcepub fn new(
state: S,
nodes: IndexMap<String, Arc<dyn Node<S>>>,
trigger_table: TriggerTable<S>,
config: RunnableConfig,
num_fields: usize,
) -> Result<Self, JunctureError>
pub fn new( state: S, nodes: IndexMap<String, Arc<dyn Node<S>>>, trigger_table: TriggerTable<S>, config: RunnableConfig, num_fields: usize, ) -> Result<Self, JunctureError>
Create a new Pregel loop
§Arguments
state- Initial statenodes- Graph nodestrigger_table- Trigger table for routingconfig- Execution configurationnum_fields- Number of fields in the state
§Errors
Returns an error if:
- The trigger table is invalid
§Examples
use juncture_core::pregel::loop_::PregelLoop;
let loop = PregelLoop::new(
initial_state,
nodes,
trigger_table,
config,
5, // number of fields
)?;Sourcepub fn with_error_handlers(
state: S,
nodes: IndexMap<String, Arc<dyn Node<S>>>,
trigger_table: TriggerTable<S>,
config: RunnableConfig,
num_fields: usize,
error_handler_map: HashMap<String, String>,
) -> Result<Self, JunctureError>
pub fn with_error_handlers( state: S, nodes: IndexMap<String, Arc<dyn Node<S>>>, trigger_table: TriggerTable<S>, config: RunnableConfig, num_fields: usize, error_handler_map: HashMap<String, String>, ) -> Result<Self, JunctureError>
Create a new Pregel loop with error handler mappings
Like new but accepts a pre-built error handler map
extracted from builder metadata. Nodes with entries in this map
will have their failures routed to the named handler instead of
canceling the entire superstep.
§Arguments
state- Initial statenodes- Graph nodestrigger_table- Trigger table for routingconfig- Execution configurationnum_fields- Number of fields in the stateerror_handler_map- Maps node names to error handler node names
§Errors
Returns an error if the trigger table is invalid.
Sourcepub fn set_stream_sender(&mut self, tx: UnboundedSender<StreamEvent<S>>)
pub fn set_stream_sender(&mut self, tx: UnboundedSender<StreamEvent<S>>)
Sourcepub fn set_checkpointer(&mut self, saver: Arc<dyn CheckpointSaver>)
pub fn set_checkpointer(&mut self, saver: Arc<dyn CheckpointSaver>)
Set the checkpoint saver for crash recovery during supersteps
Sourcepub fn set_budget_tracker(&mut self, tracker: BudgetTracker)
pub fn set_budget_tracker(&mut self, tracker: BudgetTracker)
Set the budget tracker
Wraps the tracker in an Arc so it can be shared between the
PregelLoop (for budget checking) and the RunnableConfig (for
node-level token reporting). Both share the same underlying
counters via atomic operations.
§Examples
use juncture_core::pregel::budget::BudgetTracker;
let budget = BudgetTracker::new(BudgetConfig::new());
loop.set_budget_tracker(budget);Sourcepub fn set_retry_policies(&mut self, policies: HashMap<String, RetryPolicy>)
pub fn set_retry_policies(&mut self, policies: HashMap<String, RetryPolicy>)
Set per-node retry policies
Each entry maps a node name to its [RetryPolicy]. During superstep
execution, nodes with a configured policy are wrapped with
[crate::graph::builder::execute_with_retry] for automatic retries
with exponential backoff and jitter.
Sourcepub fn set_timeout_policies(&mut self, policies: HashMap<String, TimeoutPolicy>)
pub fn set_timeout_policies(&mut self, policies: HashMap<String, TimeoutPolicy>)
Set per-node timeout policies
Each entry maps a node name to its TimeoutPolicy.
During superstep execution, nodes with a configured policy are wrapped with
tokio::time::timeout using the configured run_timeout. The timeout wraps
the entire execution including any retry attempts.
Sourcepub fn set_circuit_breaker_policies(
&mut self,
configs: HashMap<String, CircuitBreakerConfig>,
)
pub fn set_circuit_breaker_policies( &mut self, configs: HashMap<String, CircuitBreakerConfig>, )
Set per-node circuit breaker configurations
Each entry maps a node name to its CircuitBreakerConfig.
During superstep execution, nodes with a configured circuit breaker are
checked before execution. If the circuit is open (too many consecutive
failures), the node is skipped and an error is returned.
Sourcepub fn set_fallback_map(&mut self, map: HashMap<String, String>)
pub fn set_fallback_map(&mut self, map: HashMap<String, String>)
Set per-node fallback node mappings
Each entry maps a node name to its fallback node name. When a task fails and its node has a fallback, the engine creates a recovery task targeting the fallback node instead of propagating the error.
Sourcepub fn health(&self) -> HealthStatus
pub fn health(&self) -> HealthStatus
Get the current health status of the graph execution
Returns a snapshot of the health state including per-node health information based on circuit breaker states. This is useful for monitoring and diagnostics.
Sourcepub fn tick(&mut self) -> Result<bool, JunctureError>
pub fn tick(&mut self) -> Result<bool, JunctureError>
Execute one tick of the loop
Returns true if execution should continue, false if done.
§Errors
Returns an error if:
- Recursion limit is reached
- Cancellation is requested
- Budget limits are exceeded
§Examples
while loop.tick()? {
let result = loop.execute_superstep().await?;
loop.after_tick(result)?;
}Sourcepub async fn execute_superstep(
&mut self,
) -> Result<SuperstepResult<S>, JunctureError>
pub async fn execute_superstep( &mut self, ) -> Result<SuperstepResult<S>, JunctureError>
Sourcepub async fn after_tick(
&mut self,
result: SuperstepResult<S>,
) -> Result<(), JunctureError>
pub async fn after_tick( &mut self, result: SuperstepResult<S>, ) -> Result<(), JunctureError>
Sourcepub fn into_state(self) -> S
pub fn into_state(self) -> S
Sourcepub const fn status(&self) -> &LoopStatus
pub const fn status(&self) -> &LoopStatus
Get the current status
Sourcepub fn pending_interrupts(&self) -> &[InterruptSignal]
pub fn pending_interrupts(&self) -> &[InterruptSignal]
Get the pending interrupt signals for checkpoint persistence
Sourcepub const fn scratchpad(&self) -> &Scratchpad
pub const fn scratchpad(&self) -> &Scratchpad
Get a reference to the scratchpad for interrupt tracking
Sourcepub const fn scratchpad_mut(&mut self) -> &mut Scratchpad
pub const fn scratchpad_mut(&mut self) -> &mut Scratchpad
Get a mutable reference to the scratchpad for interrupt tracking
Sourcepub const fn is_running(&self) -> bool
pub const fn is_running(&self) -> bool
Check if the loop is still running
Sourcepub fn snapshot_state(&self) -> Swhere
S: Clone,
pub fn snapshot_state(&self) -> Swhere
S: Clone,
Get a clone of the current state without consuming the loop
Useful for streaming execution where state snapshots are needed after each superstep without terminating the loop.
Sourcepub const fn run_control(&self) -> &RunControl
pub const fn run_control(&self) -> &RunControl
Get the run control for graceful shutdown
Returns a clone of the run control that can be used to request drain from another thread or context.
§Examples
use juncture_core::pregel::loop_::PregelLoop;
let mut loop = PregelLoop::new(...)?;
let run_control = loop.run_control();
// From another thread
std::thread::spawn(move || {
run_control.request_drain();
});Sourcepub fn as_context(&self) -> ExecutionContext<S>where
S: Clone,
pub fn as_context(&self) -> ExecutionContext<S>where
S: Clone,
Get a view of the current execution context
Returns an ExecutionContext value that provides typed access
to the mutable execution state (state, field_versions, versions_seen).
This provides the design-intended separation between mutable context
and immutable configuration.
Note: Returns a cloned context, not a reference, since ExecutionContext
is designed to own its data.
§Examples
use juncture_core::pregel::loop_::PregelLoop;
let loop = PregelLoop::new(...)?;
let context = loop.as_context();
let versions = context.field_versions.versions();Sourcepub fn as_config(&self) -> ExecutionConfig
pub fn as_config(&self) -> ExecutionConfig
Get a view of the current execution config
Returns an ExecutionConfig value that provides typed access
to the immutable execution configuration (recursion_limit, interrupts, etc.).
This provides the design-intended separation between mutable context
and immutable configuration.
Note: Returns a cloned config, not a reference, since ExecutionConfig
is designed to own its data.
§Examples
use juncture_core::pregel::loop_::PregelLoop;
let loop = PregelLoop::new(...)?;
let config = loop.as_config();
let limit = config.recursion_limit;Sourcepub async fn save_pending_interrupt_checkpoint(&mut self)where
S: Serialize,
pub async fn save_pending_interrupt_checkpoint(&mut self)where
S: Serialize,
Save a pending interrupt checkpoint for interrupt_before scenarios.
When tick() detects an interrupt_before, the loop exits immediately
(tick is synchronous and cannot call the async checkpointer). The caller
should invoke this method after the loop exits when the status is
LoopStatus::InterruptBefore.
This is a no-op if no checkpointer is configured or if the status is not interrupted.
§Type Parameters
Requires S: serde::Serialize to serialize the current state.
§Errors
Does not return errors – checkpoint save failures are logged and the interrupt is still surfaced to the caller.