1use std::collections::{BTreeMap, VecDeque};
23use std::sync::Arc;
24use std::time::Duration;
25
26use ergo_adapter::{
27 capture::ExternalEventRecord, AdapterProvides, ErrKind, EventId, EventTime, ExecutionContext,
28 ExternalEvent, ExternalEventKind, GraphId, RunTermination, RuntimeHandle, RuntimeInvoker,
29};
30use ergo_runtime::catalog::{CorePrimitiveCatalog, CoreRegistries};
31use ergo_runtime::cluster::ExpandedGraph;
32use ergo_runtime::common::ActionEffect;
33use serde::{Deserialize, Serialize};
34use sha2::{Digest, Sha256};
35
36pub(crate) const CAPTURE_FORMAT_VERSION: &str = "v3";
38
39pub fn compute_effect_hash(effect: &ActionEffect) -> String {
44 let effect_bytes =
45 serde_json::to_vec(effect).expect("ActionEffect serialization is infallible");
46 let mut hasher = Sha256::new();
47 hasher.update(&effect_bytes);
48 hex::encode(hasher.finalize())
49}
50pub const NO_ADAPTER_PROVENANCE: &str = "none";
51
52mod capture;
53#[allow(clippy::result_large_err)]
56pub mod replay;
57
58pub use capture::{
59 write_capture_bundle, CaptureJsonStyle, CaptureWriteError, CapturingDecisionLog,
60 CapturingSession,
61};
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
65pub struct CapturedActionEffect {
66 pub effect: ActionEffect,
67 pub effect_hash: String,
68}
69
70#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
72pub struct CapturedIntentAck {
73 pub intent_id: String,
74 pub channel: String,
75 pub status: String,
76 pub acceptance: String,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub egress_ref: Option<String>,
79}
80
81pub trait DecisionLog {
83 fn log(&self, entry: DecisionLogEntry);
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
87struct DeterministicClock {
88 now: EventTime,
89}
90
91impl DeterministicClock {
92 fn new() -> Self {
93 Self {
94 now: EventTime::default(),
95 }
96 }
97
98 fn advance_to(&mut self, at: EventTime) {
99 if at > self.now {
100 self.now = at;
101 }
102 }
103
104 fn now(&self) -> EventTime {
105 self.now
106 }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
110#[serde(transparent)]
111pub struct EpisodeId(u64);
112
113impl EpisodeId {
114 pub fn new(id: u64) -> Self {
115 EpisodeId(id)
116 }
117
118 pub fn as_u64(&self) -> u64 {
119 self.0
120 }
121}
122
123#[derive(Debug, Clone)]
124struct DeferredEpisode {
125 origin_event_id: EventId,
126 ctx: ExecutionContext,
127 defer_count: u32,
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131pub enum Decision {
132 Invoke,
133 Skip,
134 Defer,
135}
136
137#[derive(Debug, Clone)]
138pub struct DecisionLogEntry {
139 pub graph_id: GraphId,
140 pub event_id: EventId,
141 pub event: ExternalEvent,
142 pub decision: Decision,
143 pub schedule_at: Option<EventTime>,
144 pub episode_id: EpisodeId,
145 pub deadline: Option<Duration>,
146 pub termination: Option<RunTermination>,
147 pub retry_count: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151pub struct EpisodeInvocationRecord {
152 pub event_id: EventId,
153 pub decision: Decision,
154 pub schedule_at: Option<EventTime>,
155 pub episode_id: EpisodeId,
156 pub deadline: Option<Duration>,
157 #[serde(default)]
158 pub termination: Option<RunTermination>,
159 pub retry_count: usize,
160 pub effects: Vec<CapturedActionEffect>,
161 #[serde(default, skip_serializing_if = "Vec::is_empty")]
162 pub intent_acks: Vec<CapturedIntentAck>,
163 #[serde(default, skip_serializing_if = "Option::is_none")]
164 pub interruption: Option<String>,
165}
166
167impl From<&DecisionLogEntry> for EpisodeInvocationRecord {
168 fn from(entry: &DecisionLogEntry) -> Self {
169 Self {
170 event_id: entry.event_id.clone(),
171 decision: entry.decision,
172 schedule_at: entry.schedule_at,
173 episode_id: entry.episode_id,
174 deadline: entry.deadline,
175 termination: entry.termination.clone(),
176 retry_count: entry.retry_count,
177 effects: vec![],
178 intent_acks: vec![],
179 interruption: None,
180 }
181 }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(deny_unknown_fields)]
186pub struct CaptureBundle {
187 pub capture_version: String,
188 pub graph_id: GraphId,
189 pub config: Constraints,
190 pub events: Vec<ExternalEventRecord>,
191 pub decisions: Vec<EpisodeInvocationRecord>,
192 pub adapter_provenance: String,
193 pub runtime_provenance: String,
194 #[serde(default, skip_serializing_if = "Option::is_none")]
195 pub egress_provenance: Option<String>,
196}
197
198#[derive(Debug, Clone, Default, Serialize, Deserialize)]
199pub struct Constraints {
200 pub max_in_flight: Option<usize>,
201 pub max_per_window: Option<usize>,
202 pub rate_window: Option<Duration>,
203 pub deadline: Option<Duration>,
204 pub max_retries: usize,
205}
206
207pub struct Supervisor<L: DecisionLog, R: RuntimeInvoker> {
208 graph_id: GraphId,
209 constraints: Constraints,
210 decision_log: L,
211 runtime: R,
212 next_episode_id: u64,
213 in_flight: usize,
214 recent_invocations: VecDeque<EventTime>,
215 clock: DeterministicClock,
216 deferred_queue: BTreeMap<(EventTime, EpisodeId), DeferredEpisode>,
217}
218
219impl<L: DecisionLog> Supervisor<L, RuntimeHandle> {
220 #[allow(clippy::arc_with_non_send_sync)]
221 pub fn new(
222 graph_id: GraphId,
223 constraints: Constraints,
224 decision_log: L,
225 graph: Arc<ExpandedGraph>,
226 catalog: Arc<CorePrimitiveCatalog>,
227 registries: Arc<CoreRegistries>,
228 ) -> Self {
229 Self {
230 graph_id,
231 constraints,
232 decision_log,
233 runtime: RuntimeHandle::new(graph, catalog, registries, AdapterProvides::default()),
234 next_episode_id: 0,
235 in_flight: 0,
236 recent_invocations: VecDeque::new(),
237 clock: DeterministicClock::new(),
238 deferred_queue: BTreeMap::new(),
239 }
240 }
241}
242
243impl<L: DecisionLog, R: RuntimeInvoker> Supervisor<L, R> {
244 pub fn with_runtime(
245 graph_id: GraphId,
246 constraints: Constraints,
247 decision_log: L,
248 runtime: R,
249 ) -> Self {
250 Self {
251 graph_id,
252 constraints,
253 decision_log,
254 runtime,
255 next_episode_id: 0,
256 in_flight: 0,
257 recent_invocations: VecDeque::new(),
258 clock: DeterministicClock::new(),
259 deferred_queue: BTreeMap::new(),
260 }
261 }
262
263 pub fn on_event(&mut self, event: ExternalEvent) {
264 self.clock.advance_to(event.at());
265 let now = self.clock.now();
266
267 if event.kind() == ExternalEventKind::Pump {
268 self.process_tick(&event, now);
269 return;
270 }
271
272 let episode_id = self.next_episode_id();
273
274 if self.is_concurrency_saturated() {
275 self.enqueue_deferred(now, episode_id, &event);
276 self.log_decision(&event, Decision::Defer, Some(now), episode_id, None, 0);
277 return;
278 }
279
280 if let Some(delay) = self.rate_limit_delay(now) {
281 let schedule_at = now.saturating_add(delay);
282 self.enqueue_deferred(schedule_at, episode_id, &event);
283 self.log_decision(
284 &event,
285 Decision::Defer,
286 Some(schedule_at),
287 episode_id,
288 None,
289 0,
290 );
291 return;
292 }
293
294 self.in_flight = self.in_flight.saturating_add(1);
295 if self.constraints.max_per_window.is_some() && self.constraints.rate_window.is_some() {
296 self.recent_invocations.push_back(now);
297 }
298
299 let (termination, retry_count) =
300 self.invoke_with_retries(event.event_id(), event.context());
301
302 self.in_flight = self.in_flight.saturating_sub(1);
303
304 self.log_decision(
305 &event,
306 Decision::Invoke,
307 None,
308 episode_id,
309 Some(termination),
310 retry_count,
311 );
312 }
313
314 fn next_episode_id(&mut self) -> EpisodeId {
315 let id = EpisodeId::new(self.next_episode_id);
316 self.next_episode_id = self.next_episode_id.saturating_add(1);
317 id
318 }
319
320 fn is_concurrency_saturated(&self) -> bool {
321 matches!(self.constraints.max_in_flight, Some(max) if self.in_flight >= max)
322 }
323
324 fn enqueue_deferred(
325 &mut self,
326 schedule_at: EventTime,
327 episode_id: EpisodeId,
328 event: &ExternalEvent,
329 ) {
330 self.deferred_queue.insert(
331 (schedule_at, episode_id),
332 DeferredEpisode {
333 origin_event_id: event.event_id().clone(),
334 ctx: event.context().clone(),
335 defer_count: 0,
336 },
337 );
338 }
339
340 fn process_tick(&mut self, tick_event: &ExternalEvent, now: EventTime) {
341 let due_key = self
343 .deferred_queue
344 .keys()
345 .find(|(schedule_at, _)| *schedule_at <= now)
346 .cloned();
347
348 let Some(key) = due_key else {
350 let episode_id = self.next_episode_id();
351 self.log_decision(tick_event, Decision::Defer, None, episode_id, None, 0);
352 return;
353 };
354
355 let mut item = self.deferred_queue.remove(&key).unwrap();
356 let episode_id = key.1;
357
358 if self.is_concurrency_saturated() {
360 item.defer_count += 1;
361 self.deferred_queue.insert((now, episode_id), item);
362 self.log_decision(tick_event, Decision::Defer, Some(now), episode_id, None, 0);
363 return;
364 }
365
366 if let Some(delay) = self.rate_limit_delay(now) {
368 item.defer_count += 1;
369 let schedule_at = now.saturating_add(delay);
370 self.deferred_queue.insert((schedule_at, episode_id), item);
371 self.log_decision(
372 tick_event,
373 Decision::Defer,
374 Some(schedule_at),
375 episode_id,
376 None,
377 0,
378 );
379 return;
380 }
381
382 self.in_flight = self.in_flight.saturating_add(1);
384 if self.constraints.max_per_window.is_some() && self.constraints.rate_window.is_some() {
385 self.recent_invocations.push_back(now);
386 }
387
388 let (termination, retry_count) = self.invoke_with_retries(&item.origin_event_id, &item.ctx);
389
390 self.in_flight = self.in_flight.saturating_sub(1);
391
392 self.log_decision(
393 tick_event,
394 Decision::Invoke,
395 None,
396 episode_id,
397 Some(termination),
398 retry_count,
399 );
400 }
401
402 fn rate_limit_delay(&mut self, now: EventTime) -> Option<Duration> {
403 let max_per_window = self.constraints.max_per_window?;
404 let window = self.constraints.rate_window?;
405
406 while let Some(front) = self.recent_invocations.front() {
407 if now.as_duration().saturating_sub(front.as_duration()) >= window {
408 self.recent_invocations.pop_front();
409 } else {
410 break;
411 }
412 }
413
414 if self.recent_invocations.len() >= max_per_window {
415 if let Some(front) = self.recent_invocations.front() {
416 let elapsed = now.as_duration().saturating_sub(front.as_duration());
417 let delay = window.saturating_sub(elapsed);
418 return Some(delay);
419 }
420 }
421
422 None
423 }
424
425 fn invoke_with_retries(
426 &self,
427 event_id: &EventId,
428 ctx: &ergo_adapter::ExecutionContext,
429 ) -> (RunTermination, usize) {
430 let mut attempts = 0_usize;
431 let mut termination =
432 self.runtime
433 .run(&self.graph_id, event_id, ctx, self.constraints.deadline);
434
435 while attempts < self.constraints.max_retries && Self::should_retry(&termination) {
436 attempts = attempts.saturating_add(1);
437 termination =
438 self.runtime
439 .run(&self.graph_id, event_id, ctx, self.constraints.deadline);
440 }
441
442 (termination, attempts)
443 }
444
445 fn should_retry(termination: &RunTermination) -> bool {
446 match termination {
447 RunTermination::Failed(err) => match err {
448 ErrKind::SemanticError => false,
449 ErrKind::NetworkTimeout | ErrKind::AdapterUnavailable | ErrKind::RuntimeError => {
450 true
451 }
452 _ => false,
453 },
454 RunTermination::TimedOut => true,
455 _ => false,
456 }
457 }
458
459 fn log_decision(
461 &self,
462 event: &ExternalEvent,
463 decision: Decision,
464 schedule_at: Option<EventTime>,
465 episode_id: EpisodeId,
466 termination: Option<RunTermination>,
467 retry_count: usize,
468 ) {
469 let entry = DecisionLogEntry {
470 graph_id: self.graph_id.clone(),
471 event_id: event.event_id().clone(),
472 event: event.clone(),
473 decision,
474 schedule_at,
475 episode_id,
476 deadline: self.constraints.deadline,
477 termination,
478 retry_count,
479 };
480 self.decision_log.log(entry);
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::{
487 DecisionLog, DecisionLogEntry, ErrKind, RunTermination, RuntimeInvoker, Supervisor,
488 };
489
490 struct TestLog;
491
492 impl DecisionLog for TestLog {
493 fn log(&self, _entry: DecisionLogEntry) {}
494 }
495
496 struct TestRuntime;
497
498 impl RuntimeInvoker for TestRuntime {
499 fn run(
500 &self,
501 _graph_id: &ergo_adapter::GraphId,
502 _event_id: &ergo_adapter::EventId,
503 _ctx: &ergo_adapter::ExecutionContext,
504 _deadline: Option<std::time::Duration>,
505 ) -> RunTermination {
506 RunTermination::Completed
507 }
508 }
509
510 #[test]
511 fn semantic_error_not_retryable() {
512 let termination = RunTermination::Failed(ErrKind::SemanticError);
513 assert!(!Supervisor::<TestLog, TestRuntime>::should_retry(
514 &termination
515 ));
516 }
517
518 #[test]
519 fn runtime_error_is_retryable() {
520 let termination = RunTermination::Failed(ErrKind::RuntimeError);
521 assert!(Supervisor::<TestLog, TestRuntime>::should_retry(
522 &termination
523 ));
524 }
525}