Skip to main content

juncture_core/pregel/
context.rs

1//! Execution context and configuration for Pregel engine
2//!
3//! Provides the mutable execution state ([`ExecutionContext`]) and
4//! immutable configuration ([`ExecutionConfig`]) used by the Pregel
5//! loop during graph execution.
6
7use crate::graph::RetryPolicy;
8use crate::pregel::budget::BudgetConfig;
9use crate::pregel::durability::Durability;
10use crate::pregel::scheduler::{FieldVersionTracker, VersionsSeen};
11use std::collections::{HashMap, HashSet};
12use std::time::Duration;
13
14/// Mutable execution context: holds state and version tracking
15///
16/// Encapsulates the mutable portions of the Pregel loop that change
17/// during execution, separating them from the immutable configuration.
18pub struct ExecutionContext<S: crate::State> {
19    /// Current state
20    pub state: S,
21
22    /// Field version tracker (equivalent to `channel_versions`)
23    pub field_versions: FieldVersionTracker,
24
25    /// Versions seen by each node (equivalent to `versions_seen`)
26    pub versions_seen: VersionsSeen,
27
28    /// Pending writes for checkpoint recovery
29    pub pending_writes: Vec<crate::checkpoint::PendingWrite>,
30}
31
32impl<S: crate::State> std::fmt::Debug for ExecutionContext<S> {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("ExecutionContext")
35            .field("state", &"<state>")
36            .field("field_versions", &self.field_versions)
37            .field("versions_seen", &self.versions_seen)
38            .field("pending_writes", &self.pending_writes.len())
39            .finish()
40    }
41}
42
43/// Immutable execution configuration
44///
45/// Holds the runtime parameters that do not change during execution,
46/// including recursion limits, interrupt settings, and per-node policies.
47pub struct ExecutionConfig {
48    /// Maximum superstep count
49    pub recursion_limit: usize,
50
51    /// Nodes to interrupt before execution
52    pub interrupt_before: HashSet<String>,
53
54    /// Nodes to interrupt after execution
55    pub interrupt_after: HashSet<String>,
56
57    /// Budget configuration
58    pub budget: Option<BudgetConfig>,
59
60    /// Checkpoint durability mode
61    pub durability: Durability,
62
63    /// Per-node retry policies
64    pub retry_policies: HashMap<String, RetryPolicy>,
65
66    /// Per-node timeout policies
67    pub timeout_policies: HashMap<String, TimeoutPolicy>,
68}
69
70impl std::fmt::Debug for ExecutionConfig {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("ExecutionConfig")
73            .field("recursion_limit", &self.recursion_limit)
74            .field("interrupt_before", &self.interrupt_before)
75            .field("interrupt_after", &self.interrupt_after)
76            .field("budget", &self.budget)
77            .field("durability", &self.durability)
78            .field(
79                "retry_policies",
80                &self.retry_policies.keys().collect::<Vec<_>>(),
81            )
82            .field(
83                "timeout_policies",
84                &self.timeout_policies.keys().collect::<Vec<_>>(),
85            )
86            .finish()
87    }
88}
89
90impl Default for ExecutionConfig {
91    fn default() -> Self {
92        Self {
93            recursion_limit: 25,
94            interrupt_before: HashSet::new(),
95            interrupt_after: HashSet::new(),
96            budget: None,
97            durability: Durability::default(),
98            retry_policies: HashMap::new(),
99            timeout_policies: HashMap::new(),
100        }
101    }
102}
103
104impl ExecutionConfig {
105    /// Create a new execution config with defaults
106    #[must_use]
107    pub fn new() -> Self {
108        Self::default()
109    }
110
111    /// Set the recursion limit
112    #[must_use]
113    pub const fn with_recursion_limit(mut self, limit: usize) -> Self {
114        self.recursion_limit = limit;
115        self
116    }
117
118    /// Set `interrupt_before` nodes
119    #[must_use]
120    pub fn with_interrupt_before(mut self, nodes: HashSet<String>) -> Self {
121        self.interrupt_before = nodes;
122        self
123    }
124
125    /// Set `interrupt_after` nodes
126    #[must_use]
127    pub fn with_interrupt_after(mut self, nodes: HashSet<String>) -> Self {
128        self.interrupt_after = nodes;
129        self
130    }
131
132    /// Set budget configuration
133    #[must_use]
134    pub fn with_budget(mut self, budget: BudgetConfig) -> Self {
135        self.budget = Some(budget);
136        self
137    }
138
139    /// Set durability mode
140    #[must_use]
141    pub const fn with_durability(mut self, durability: Durability) -> Self {
142        self.durability = durability;
143        self
144    }
145}
146
147/// Node-level timeout policy
148///
149/// Prevents LLM calls or tool execution from blocking indefinitely.
150/// Configured per-node through the graph builder.
151///
152/// # Examples
153///
154/// ```ignore
155/// use juncture_core::pregel::context::TimeoutPolicy;
156/// use std::time::Duration;
157///
158/// let policy = TimeoutPolicy::default()
159///     .with_run_timeout(Duration::from_secs(60));
160/// ```
161#[derive(Clone)]
162pub struct TimeoutPolicy {
163    /// Maximum runtime for a single execution
164    pub run_timeout: Duration,
165
166    /// Idle timeout: if no progress signal within this time, consider timed out
167    pub idle_timeout: Option<Duration>,
168
169    /// Progress signal detector (refreshes `idle_timeout` on heartbeat)
170    ///
171    /// Receives a serializable event representation containing event type and node name.
172    #[allow(
173        clippy::type_complexity,
174        reason = "trait object requires full signature"
175    )]
176    pub refresh_on: Option<std::sync::Arc<dyn Fn(&serde_json::Value) -> bool + Send + Sync>>,
177}
178
179impl std::fmt::Debug for TimeoutPolicy {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        f.debug_struct("TimeoutPolicy")
182            .field("run_timeout", &self.run_timeout)
183            .field("idle_timeout", &self.idle_timeout)
184            .field("refresh_on", &self.refresh_on.as_ref().map(|_| "<fn>"))
185            .finish()
186    }
187}
188
189impl Default for TimeoutPolicy {
190    fn default() -> Self {
191        Self {
192            run_timeout: Duration::from_secs(300),
193            idle_timeout: None,
194            refresh_on: None,
195        }
196    }
197}
198
199impl TimeoutPolicy {
200    /// Create a new timeout policy with defaults
201    #[must_use]
202    pub fn new() -> Self {
203        Self::default()
204    }
205
206    /// Set the run timeout
207    #[must_use]
208    pub const fn with_run_timeout(mut self, timeout: Duration) -> Self {
209        self.run_timeout = timeout;
210        self
211    }
212
213    /// Set the idle timeout
214    #[must_use]
215    pub const fn with_idle_timeout(mut self, timeout: Duration) -> Self {
216        self.idle_timeout = Some(timeout);
217        self
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn test_execution_config_default() {
227        let config = ExecutionConfig::default();
228        assert_eq!(config.recursion_limit, 25);
229        assert!(config.interrupt_before.is_empty());
230        assert!(config.interrupt_after.is_empty());
231        assert!(config.budget.is_none());
232        assert!(config.retry_policies.is_empty());
233        assert!(config.timeout_policies.is_empty());
234    }
235
236    #[test]
237    fn test_execution_config_builder() {
238        let config = ExecutionConfig::new()
239            .with_recursion_limit(50)
240            .with_interrupt_before(HashSet::from(["node_a".to_string()]))
241            .with_durability(Durability::Async);
242
243        assert_eq!(config.recursion_limit, 50);
244        assert!(config.interrupt_before.contains("node_a"));
245        assert!(matches!(config.durability, Durability::Async));
246    }
247
248    #[test]
249    fn test_execution_config_debug() {
250        let config = ExecutionConfig::new();
251        let debug = format!("{config:?}");
252        assert!(debug.contains("recursion_limit"));
253        assert!(debug.contains("25"));
254    }
255
256    #[test]
257    fn test_timeout_policy_default() {
258        let policy = TimeoutPolicy::default();
259        assert_eq!(policy.run_timeout, Duration::from_secs(300));
260        assert!(policy.idle_timeout.is_none());
261        assert!(policy.refresh_on.is_none());
262    }
263
264    #[test]
265    fn test_timeout_policy_builder() {
266        let policy = TimeoutPolicy::new()
267            .with_run_timeout(Duration::from_secs(60))
268            .with_idle_timeout(Duration::from_secs(10));
269
270        assert_eq!(policy.run_timeout, Duration::from_secs(60));
271        assert_eq!(policy.idle_timeout, Some(Duration::from_secs(10)));
272    }
273
274    #[test]
275    fn test_timeout_policy_debug() {
276        let policy = TimeoutPolicy::default();
277        let debug = format!("{policy:?}");
278        assert!(debug.contains("run_timeout"));
279        assert!(debug.contains("300s"));
280    }
281}
282
283// Rust guideline compliant 2026-05-20