juncture_core/pregel/
context.rs1use crate::graph::RetryPolicy;
8use crate::pregel::budget::BudgetConfig;
9use crate::pregel::durability::Durability;
10use crate::pregel::scheduler::{FieldVersionTracker, VersionsSeen};
11use std::collections::{HashMap, HashSet};
12use std::time::Duration;
13
14pub struct ExecutionContext<S: crate::State> {
19 pub state: S,
21
22 pub field_versions: FieldVersionTracker,
24
25 pub versions_seen: VersionsSeen,
27
28 pub pending_writes: Vec<crate::checkpoint::PendingWrite>,
30}
31
32impl<S: crate::State> std::fmt::Debug for ExecutionContext<S> {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("ExecutionContext")
35 .field("state", &"<state>")
36 .field("field_versions", &self.field_versions)
37 .field("versions_seen", &self.versions_seen)
38 .field("pending_writes", &self.pending_writes.len())
39 .finish()
40 }
41}
42
43pub struct ExecutionConfig {
48 pub recursion_limit: usize,
50
51 pub interrupt_before: HashSet<String>,
53
54 pub interrupt_after: HashSet<String>,
56
57 pub budget: Option<BudgetConfig>,
59
60 pub durability: Durability,
62
63 pub retry_policies: HashMap<String, RetryPolicy>,
65
66 pub timeout_policies: HashMap<String, TimeoutPolicy>,
68}
69
70impl std::fmt::Debug for ExecutionConfig {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("ExecutionConfig")
73 .field("recursion_limit", &self.recursion_limit)
74 .field("interrupt_before", &self.interrupt_before)
75 .field("interrupt_after", &self.interrupt_after)
76 .field("budget", &self.budget)
77 .field("durability", &self.durability)
78 .field(
79 "retry_policies",
80 &self.retry_policies.keys().collect::<Vec<_>>(),
81 )
82 .field(
83 "timeout_policies",
84 &self.timeout_policies.keys().collect::<Vec<_>>(),
85 )
86 .finish()
87 }
88}
89
90impl Default for ExecutionConfig {
91 fn default() -> Self {
92 Self {
93 recursion_limit: 25,
94 interrupt_before: HashSet::new(),
95 interrupt_after: HashSet::new(),
96 budget: None,
97 durability: Durability::default(),
98 retry_policies: HashMap::new(),
99 timeout_policies: HashMap::new(),
100 }
101 }
102}
103
104impl ExecutionConfig {
105 #[must_use]
107 pub fn new() -> Self {
108 Self::default()
109 }
110
111 #[must_use]
113 pub const fn with_recursion_limit(mut self, limit: usize) -> Self {
114 self.recursion_limit = limit;
115 self
116 }
117
118 #[must_use]
120 pub fn with_interrupt_before(mut self, nodes: HashSet<String>) -> Self {
121 self.interrupt_before = nodes;
122 self
123 }
124
125 #[must_use]
127 pub fn with_interrupt_after(mut self, nodes: HashSet<String>) -> Self {
128 self.interrupt_after = nodes;
129 self
130 }
131
132 #[must_use]
134 pub fn with_budget(mut self, budget: BudgetConfig) -> Self {
135 self.budget = Some(budget);
136 self
137 }
138
139 #[must_use]
141 pub const fn with_durability(mut self, durability: Durability) -> Self {
142 self.durability = durability;
143 self
144 }
145}
146
147#[derive(Clone)]
162pub struct TimeoutPolicy {
163 pub run_timeout: Duration,
165
166 pub idle_timeout: Option<Duration>,
168
169 #[allow(
173 clippy::type_complexity,
174 reason = "trait object requires full signature"
175 )]
176 pub refresh_on: Option<std::sync::Arc<dyn Fn(&serde_json::Value) -> bool + Send + Sync>>,
177}
178
179impl std::fmt::Debug for TimeoutPolicy {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 f.debug_struct("TimeoutPolicy")
182 .field("run_timeout", &self.run_timeout)
183 .field("idle_timeout", &self.idle_timeout)
184 .field("refresh_on", &self.refresh_on.as_ref().map(|_| "<fn>"))
185 .finish()
186 }
187}
188
189impl Default for TimeoutPolicy {
190 fn default() -> Self {
191 Self {
192 run_timeout: Duration::from_secs(300),
193 idle_timeout: None,
194 refresh_on: None,
195 }
196 }
197}
198
199impl TimeoutPolicy {
200 #[must_use]
202 pub fn new() -> Self {
203 Self::default()
204 }
205
206 #[must_use]
208 pub const fn with_run_timeout(mut self, timeout: Duration) -> Self {
209 self.run_timeout = timeout;
210 self
211 }
212
213 #[must_use]
215 pub const fn with_idle_timeout(mut self, timeout: Duration) -> Self {
216 self.idle_timeout = Some(timeout);
217 self
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn test_execution_config_default() {
227 let config = ExecutionConfig::default();
228 assert_eq!(config.recursion_limit, 25);
229 assert!(config.interrupt_before.is_empty());
230 assert!(config.interrupt_after.is_empty());
231 assert!(config.budget.is_none());
232 assert!(config.retry_policies.is_empty());
233 assert!(config.timeout_policies.is_empty());
234 }
235
236 #[test]
237 fn test_execution_config_builder() {
238 let config = ExecutionConfig::new()
239 .with_recursion_limit(50)
240 .with_interrupt_before(HashSet::from(["node_a".to_string()]))
241 .with_durability(Durability::Async);
242
243 assert_eq!(config.recursion_limit, 50);
244 assert!(config.interrupt_before.contains("node_a"));
245 assert!(matches!(config.durability, Durability::Async));
246 }
247
248 #[test]
249 fn test_execution_config_debug() {
250 let config = ExecutionConfig::new();
251 let debug = format!("{config:?}");
252 assert!(debug.contains("recursion_limit"));
253 assert!(debug.contains("25"));
254 }
255
256 #[test]
257 fn test_timeout_policy_default() {
258 let policy = TimeoutPolicy::default();
259 assert_eq!(policy.run_timeout, Duration::from_secs(300));
260 assert!(policy.idle_timeout.is_none());
261 assert!(policy.refresh_on.is_none());
262 }
263
264 #[test]
265 fn test_timeout_policy_builder() {
266 let policy = TimeoutPolicy::new()
267 .with_run_timeout(Duration::from_secs(60))
268 .with_idle_timeout(Duration::from_secs(10));
269
270 assert_eq!(policy.run_timeout, Duration::from_secs(60));
271 assert_eq!(policy.idle_timeout, Some(Duration::from_secs(10)));
272 }
273
274 #[test]
275 fn test_timeout_policy_debug() {
276 let policy = TimeoutPolicy::default();
277 let debug = format!("{policy:?}");
278 assert!(debug.contains("run_timeout"));
279 assert!(debug.contains("300s"));
280 }
281}
282
283