Skip to main content

ergo_supervisor/
lib.rs

1//! ergo_supervisor
2//!
3//! Purpose:
4//! - Define the kernel supervisor surface for deterministic execution, capture,
5//!   and replay.
6//!
7//! Owns:
8//! - The public kernel replay/capture APIs and the typed errors they expose.
9//! - Canonical capture bundle/session types used by higher layers.
10//!
11//! Does not own:
12//! - Host orchestration, CLI descriptors, or product-facing runtime setup.
13//! - Adapter/runtime semantic authorities already owned in sibling kernel crates.
14//!
15//! Connects to:
16//! - `ergo_host`, CLI, SDK, and tests that build on supervisor capture/replay.
17//!
18//! Safety notes:
19//! - Public capture/replay exports should preserve typed kernel errors so higher
20//!   layers do not have to flatten them into strings.
21
22use std::collections::{BTreeMap, VecDeque};
23use std::sync::Arc;
24use std::time::Duration;
25
26use ergo_adapter::{
27    capture::ExternalEventRecord, AdapterProvides, ErrKind, EventId, EventTime, ExecutionContext,
28    ExternalEvent, ExternalEventKind, GraphId, RunTermination, RuntimeHandle, RuntimeInvoker,
29};
30use ergo_runtime::catalog::{CorePrimitiveCatalog, CoreRegistries};
31use ergo_runtime::cluster::ExpandedGraph;
32use ergo_runtime::common::ActionEffect;
33use serde::{Deserialize, Serialize};
34use sha2::{Digest, Sha256};
35
36/// Capture bundle format version. This repo treats captures as ephemeral artifacts.
37pub(crate) const CAPTURE_FORMAT_VERSION: &str = "v3";
38
39/// Compute a deterministic SHA-256 hash for an `ActionEffect`.
40///
41/// Used by both capture (to stamp each effect at recording time) and
42/// replay (to verify effect determinism against the recorded hash).
43pub fn compute_effect_hash(effect: &ActionEffect) -> String {
44    let effect_bytes =
45        serde_json::to_vec(effect).expect("ActionEffect serialization is infallible");
46    let mut hasher = Sha256::new();
47    hasher.update(&effect_bytes);
48    hex::encode(hasher.finalize())
49}
50pub const NO_ADAPTER_PROVENANCE: &str = "none";
51
52mod capture;
53// ReplayError carries rich diagnostic context; boxing would complicate call sites
54// for minimal benefit on error-only paths.
55#[allow(clippy::result_large_err)]
56pub mod replay;
57
58pub use capture::{
59    write_capture_bundle, CaptureJsonStyle, CaptureWriteError, CapturingDecisionLog,
60    CapturingSession,
61};
62
63/// A captured action effect with a deterministic hash for replay verification.
64#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
65pub struct CapturedActionEffect {
66    pub effect: ActionEffect,
67    pub effect_hash: String,
68}
69
70/// A captured durable-accept acknowledgment for a dispatched intent.
71#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
72pub struct CapturedIntentAck {
73    pub intent_id: String,
74    pub channel: String,
75    pub status: String,
76    pub acceptance: String,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub egress_ref: Option<String>,
79}
80
81/// SUP-7: DecisionLog is write-only. No read/query surface is ever exposed.
82pub trait DecisionLog {
83    fn log(&self, entry: DecisionLogEntry);
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
87struct DeterministicClock {
88    now: EventTime,
89}
90
91impl DeterministicClock {
92    fn new() -> Self {
93        Self {
94            now: EventTime::default(),
95        }
96    }
97
98    fn advance_to(&mut self, at: EventTime) {
99        if at > self.now {
100            self.now = at;
101        }
102    }
103
104    fn now(&self) -> EventTime {
105        self.now
106    }
107}
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
110#[serde(transparent)]
111pub struct EpisodeId(u64);
112
113impl EpisodeId {
114    pub fn new(id: u64) -> Self {
115        EpisodeId(id)
116    }
117
118    pub fn as_u64(&self) -> u64 {
119        self.0
120    }
121}
122
123#[derive(Debug, Clone)]
124struct DeferredEpisode {
125    origin_event_id: EventId,
126    ctx: ExecutionContext,
127    defer_count: u32,
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131pub enum Decision {
132    Invoke,
133    Skip,
134    Defer,
135}
136
137#[derive(Debug, Clone)]
138pub struct DecisionLogEntry {
139    pub graph_id: GraphId,
140    pub event_id: EventId,
141    pub event: ExternalEvent,
142    pub decision: Decision,
143    pub schedule_at: Option<EventTime>,
144    pub episode_id: EpisodeId,
145    pub deadline: Option<Duration>,
146    pub termination: Option<RunTermination>,
147    pub retry_count: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151pub struct EpisodeInvocationRecord {
152    pub event_id: EventId,
153    pub decision: Decision,
154    pub schedule_at: Option<EventTime>,
155    pub episode_id: EpisodeId,
156    pub deadline: Option<Duration>,
157    #[serde(default)]
158    pub termination: Option<RunTermination>,
159    pub retry_count: usize,
160    pub effects: Vec<CapturedActionEffect>,
161    #[serde(default, skip_serializing_if = "Vec::is_empty")]
162    pub intent_acks: Vec<CapturedIntentAck>,
163    #[serde(default, skip_serializing_if = "Option::is_none")]
164    pub interruption: Option<String>,
165}
166
167impl From<&DecisionLogEntry> for EpisodeInvocationRecord {
168    fn from(entry: &DecisionLogEntry) -> Self {
169        Self {
170            event_id: entry.event_id.clone(),
171            decision: entry.decision,
172            schedule_at: entry.schedule_at,
173            episode_id: entry.episode_id,
174            deadline: entry.deadline,
175            termination: entry.termination.clone(),
176            retry_count: entry.retry_count,
177            effects: vec![],
178            intent_acks: vec![],
179            interruption: None,
180        }
181    }
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(deny_unknown_fields)]
186pub struct CaptureBundle {
187    pub capture_version: String,
188    pub graph_id: GraphId,
189    pub config: Constraints,
190    pub events: Vec<ExternalEventRecord>,
191    pub decisions: Vec<EpisodeInvocationRecord>,
192    pub adapter_provenance: String,
193    pub runtime_provenance: String,
194    #[serde(default, skip_serializing_if = "Option::is_none")]
195    pub egress_provenance: Option<String>,
196}
197
198#[derive(Debug, Clone, Default, Serialize, Deserialize)]
199pub struct Constraints {
200    pub max_in_flight: Option<usize>,
201    pub max_per_window: Option<usize>,
202    pub rate_window: Option<Duration>,
203    pub deadline: Option<Duration>,
204    pub max_retries: usize,
205}
206
207pub struct Supervisor<L: DecisionLog, R: RuntimeInvoker> {
208    graph_id: GraphId,
209    constraints: Constraints,
210    decision_log: L,
211    runtime: R,
212    next_episode_id: u64,
213    in_flight: usize,
214    recent_invocations: VecDeque<EventTime>,
215    clock: DeterministicClock,
216    deferred_queue: BTreeMap<(EventTime, EpisodeId), DeferredEpisode>,
217}
218
219impl<L: DecisionLog> Supervisor<L, RuntimeHandle> {
220    #[allow(clippy::arc_with_non_send_sync)]
221    pub fn new(
222        graph_id: GraphId,
223        constraints: Constraints,
224        decision_log: L,
225        graph: Arc<ExpandedGraph>,
226        catalog: Arc<CorePrimitiveCatalog>,
227        registries: Arc<CoreRegistries>,
228    ) -> Self {
229        Self {
230            graph_id,
231            constraints,
232            decision_log,
233            runtime: RuntimeHandle::new(graph, catalog, registries, AdapterProvides::default()),
234            next_episode_id: 0,
235            in_flight: 0,
236            recent_invocations: VecDeque::new(),
237            clock: DeterministicClock::new(),
238            deferred_queue: BTreeMap::new(),
239        }
240    }
241}
242
243impl<L: DecisionLog, R: RuntimeInvoker> Supervisor<L, R> {
244    pub fn with_runtime(
245        graph_id: GraphId,
246        constraints: Constraints,
247        decision_log: L,
248        runtime: R,
249    ) -> Self {
250        Self {
251            graph_id,
252            constraints,
253            decision_log,
254            runtime,
255            next_episode_id: 0,
256            in_flight: 0,
257            recent_invocations: VecDeque::new(),
258            clock: DeterministicClock::new(),
259            deferred_queue: BTreeMap::new(),
260        }
261    }
262
263    pub fn on_event(&mut self, event: ExternalEvent) {
264        self.clock.advance_to(event.at());
265        let now = self.clock.now();
266
267        if event.kind() == ExternalEventKind::Pump {
268            self.process_tick(&event, now);
269            return;
270        }
271
272        let episode_id = self.next_episode_id();
273
274        if self.is_concurrency_saturated() {
275            self.enqueue_deferred(now, episode_id, &event);
276            self.log_decision(&event, Decision::Defer, Some(now), episode_id, None, 0);
277            return;
278        }
279
280        if let Some(delay) = self.rate_limit_delay(now) {
281            let schedule_at = now.saturating_add(delay);
282            self.enqueue_deferred(schedule_at, episode_id, &event);
283            self.log_decision(
284                &event,
285                Decision::Defer,
286                Some(schedule_at),
287                episode_id,
288                None,
289                0,
290            );
291            return;
292        }
293
294        self.in_flight = self.in_flight.saturating_add(1);
295        if self.constraints.max_per_window.is_some() && self.constraints.rate_window.is_some() {
296            self.recent_invocations.push_back(now);
297        }
298
299        let (termination, retry_count) =
300            self.invoke_with_retries(event.event_id(), event.context());
301
302        self.in_flight = self.in_flight.saturating_sub(1);
303
304        self.log_decision(
305            &event,
306            Decision::Invoke,
307            None,
308            episode_id,
309            Some(termination),
310            retry_count,
311        );
312    }
313
314    fn next_episode_id(&mut self) -> EpisodeId {
315        let id = EpisodeId::new(self.next_episode_id);
316        self.next_episode_id = self.next_episode_id.saturating_add(1);
317        id
318    }
319
320    fn is_concurrency_saturated(&self) -> bool {
321        matches!(self.constraints.max_in_flight, Some(max) if self.in_flight >= max)
322    }
323
324    fn enqueue_deferred(
325        &mut self,
326        schedule_at: EventTime,
327        episode_id: EpisodeId,
328        event: &ExternalEvent,
329    ) {
330        self.deferred_queue.insert(
331            (schedule_at, episode_id),
332            DeferredEpisode {
333                origin_event_id: event.event_id().clone(),
334                ctx: event.context().clone(),
335                defer_count: 0,
336            },
337        );
338    }
339
340    fn process_tick(&mut self, tick_event: &ExternalEvent, now: EventTime) {
341        // Find first due episode: schedule_at <= now
342        let due_key = self
343            .deferred_queue
344            .keys()
345            .find(|(schedule_at, _)| *schedule_at <= now)
346            .cloned();
347
348        // CASE 1: Nothing due — log no-op
349        let Some(key) = due_key else {
350            let episode_id = self.next_episode_id();
351            self.log_decision(tick_event, Decision::Defer, None, episode_id, None, 0);
352            return;
353        };
354
355        let mut item = self.deferred_queue.remove(&key).unwrap();
356        let episode_id = key.1;
357
358        // CASE 2: Still saturated — re-defer
359        if self.is_concurrency_saturated() {
360            item.defer_count += 1;
361            self.deferred_queue.insert((now, episode_id), item);
362            self.log_decision(tick_event, Decision::Defer, Some(now), episode_id, None, 0);
363            return;
364        }
365
366        // CASE 3: Rate limited — re-defer with delay
367        if let Some(delay) = self.rate_limit_delay(now) {
368            item.defer_count += 1;
369            let schedule_at = now.saturating_add(delay);
370            self.deferred_queue.insert((schedule_at, episode_id), item);
371            self.log_decision(
372                tick_event,
373                Decision::Defer,
374                Some(schedule_at),
375                episode_id,
376                None,
377                0,
378            );
379            return;
380        }
381
382        // CASE 4: Can run — invoke with SUP-4 retries
383        self.in_flight = self.in_flight.saturating_add(1);
384        if self.constraints.max_per_window.is_some() && self.constraints.rate_window.is_some() {
385            self.recent_invocations.push_back(now);
386        }
387
388        let (termination, retry_count) = self.invoke_with_retries(&item.origin_event_id, &item.ctx);
389
390        self.in_flight = self.in_flight.saturating_sub(1);
391
392        self.log_decision(
393            tick_event,
394            Decision::Invoke,
395            None,
396            episode_id,
397            Some(termination),
398            retry_count,
399        );
400    }
401
402    fn rate_limit_delay(&mut self, now: EventTime) -> Option<Duration> {
403        let max_per_window = self.constraints.max_per_window?;
404        let window = self.constraints.rate_window?;
405
406        while let Some(front) = self.recent_invocations.front() {
407            if now.as_duration().saturating_sub(front.as_duration()) >= window {
408                self.recent_invocations.pop_front();
409            } else {
410                break;
411            }
412        }
413
414        if self.recent_invocations.len() >= max_per_window {
415            if let Some(front) = self.recent_invocations.front() {
416                let elapsed = now.as_duration().saturating_sub(front.as_duration());
417                let delay = window.saturating_sub(elapsed);
418                return Some(delay);
419            }
420        }
421
422        None
423    }
424
425    fn invoke_with_retries(
426        &self,
427        event_id: &EventId,
428        ctx: &ergo_adapter::ExecutionContext,
429    ) -> (RunTermination, usize) {
430        let mut attempts = 0_usize;
431        let mut termination =
432            self.runtime
433                .run(&self.graph_id, event_id, ctx, self.constraints.deadline);
434
435        while attempts < self.constraints.max_retries && Self::should_retry(&termination) {
436            attempts = attempts.saturating_add(1);
437            termination =
438                self.runtime
439                    .run(&self.graph_id, event_id, ctx, self.constraints.deadline);
440        }
441
442        (termination, attempts)
443    }
444
445    fn should_retry(termination: &RunTermination) -> bool {
446        match termination {
447            RunTermination::Failed(err) => match err {
448                ErrKind::SemanticError => false,
449                ErrKind::NetworkTimeout | ErrKind::AdapterUnavailable | ErrKind::RuntimeError => {
450                    true
451                }
452                _ => false,
453            },
454            RunTermination::TimedOut => true,
455            _ => false,
456        }
457    }
458
459    // Supervisor decision logging records the scheduling outcome for a single event.
460    fn log_decision(
461        &self,
462        event: &ExternalEvent,
463        decision: Decision,
464        schedule_at: Option<EventTime>,
465        episode_id: EpisodeId,
466        termination: Option<RunTermination>,
467        retry_count: usize,
468    ) {
469        let entry = DecisionLogEntry {
470            graph_id: self.graph_id.clone(),
471            event_id: event.event_id().clone(),
472            event: event.clone(),
473            decision,
474            schedule_at,
475            episode_id,
476            deadline: self.constraints.deadline,
477            termination,
478            retry_count,
479        };
480        self.decision_log.log(entry);
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::{
487        DecisionLog, DecisionLogEntry, ErrKind, RunTermination, RuntimeInvoker, Supervisor,
488    };
489
490    struct TestLog;
491
492    impl DecisionLog for TestLog {
493        fn log(&self, _entry: DecisionLogEntry) {}
494    }
495
496    struct TestRuntime;
497
498    impl RuntimeInvoker for TestRuntime {
499        fn run(
500            &self,
501            _graph_id: &ergo_adapter::GraphId,
502            _event_id: &ergo_adapter::EventId,
503            _ctx: &ergo_adapter::ExecutionContext,
504            _deadline: Option<std::time::Duration>,
505        ) -> RunTermination {
506            RunTermination::Completed
507        }
508    }
509
510    #[test]
511    fn semantic_error_not_retryable() {
512        let termination = RunTermination::Failed(ErrKind::SemanticError);
513        assert!(!Supervisor::<TestLog, TestRuntime>::should_retry(
514            &termination
515        ));
516    }
517
518    #[test]
519    fn runtime_error_is_retryable() {
520        let termination = RunTermination::Failed(ErrKind::RuntimeError);
521        assert!(Supervisor::<TestLog, TestRuntime>::should_retry(
522            &termination
523        ));
524    }
525}