Skip to main content

oris_execution_runtime/
observability.rs

1//! Backpressure and kernel observability types for scheduling and telemetry.
2//!
3//! **RejectionReason**: why a dispatch or API request was rejected (e.g. tenant limit),
4//! for safe backpressure and clear API responses.
5//!
6//! **KernelObservability**: shared structure for runtime-derived kernel telemetry
7//! (reasoning timeline, lease graph, replay cost, interrupt gap). The runtime
8//! populates these fields from checkpoint and trace context data so APIs can
9//! surface stable observability without inventing a second schema.
10
11#[cfg(feature = "execution-server")]
12use crate::graph_bridge::ExecutionCheckpointView;
13use oris_kernel::KernelTraceEvent;
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16
17/// Reason for rejecting a dispatch or API request (e.g. rate limit, tenant cap).
18#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
19pub enum RejectionReason {
20    /// Tenant-level limit exceeded; optional description of the limit.
21    TenantLimit(Option<String>),
22    /// Worker or capacity limit.
23    CapacityLimit(Option<String>),
24    /// Other rejections (policy, invalid request, etc.).
25    Other(String),
26}
27
28impl RejectionReason {
29    pub fn tenant_limit(description: impl Into<String>) -> Self {
30        RejectionReason::TenantLimit(Some(description.into()))
31    }
32
33    pub fn capacity_limit(description: impl Into<String>) -> Self {
34        RejectionReason::CapacityLimit(Some(description.into()))
35    }
36}
37
38/// Runtime-derived kernel observability / telemetry.
39///
40/// Fields are optional so responses can remain backward compatible when a
41/// given execution path does not have enough source data.
42#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
43pub struct KernelObservability {
44    /// Optional reasoning or decision timeline (e.g. scheduler steps).
45    pub reasoning_timeline: Option<Vec<String>>,
46    /// Optional lease/ownership snapshot (e.g. attempt → worker).
47    pub lease_graph: Option<Vec<(String, String)>>,
48    /// Optional replay cost hint (e.g. event count or duration).
49    pub replay_cost: Option<u64>,
50    /// Optional interrupt handling latency (e.g. ms).
51    pub interrupt_latency_ms: Option<u64>,
52}
53
54impl KernelObservability {
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    pub fn with_reasoning_timeline(mut self, entries: Vec<String>) -> Self {
60        self.reasoning_timeline = Some(entries);
61        self
62    }
63
64    pub fn with_lease_graph(mut self, edges: Vec<(String, String)>) -> Self {
65        self.lease_graph = Some(edges);
66        self
67    }
68
69    pub fn with_replay_cost(mut self, cost: u64) -> Self {
70        self.replay_cost = Some(cost);
71        self
72    }
73
74    pub fn with_interrupt_latency_ms(mut self, ms: u64) -> Self {
75        self.interrupt_latency_ms = Some(ms);
76        self
77    }
78
79    pub fn from_kernel_trace(trace: &[KernelTraceEvent]) -> Self {
80        let reasoning_timeline = if trace.is_empty() {
81            None
82        } else {
83            Some(trace.iter().map(format_trace_event).collect())
84        };
85        let replay_cost = if trace.is_empty() {
86            None
87        } else {
88            Some(trace.len() as u64)
89        };
90        let interrupt_latency_ms = interrupt_latency_from_trace_timestamps(trace).or_else(|| {
91            trace
92                .iter()
93                .position(|event| event.kind == "Interrupted")
94                .zip(trace.iter().position(|event| event.kind == "Resumed"))
95                .and_then(|(interrupted, resumed)| resumed.checked_sub(interrupted))
96                .map(|delta| delta as u64)
97        });
98
99        Self {
100            reasoning_timeline,
101            lease_graph: None,
102            replay_cost,
103            interrupt_latency_ms,
104        }
105    }
106
107    #[cfg(feature = "execution-server")]
108    pub fn from_checkpoint_history(run_id: &str, history: &[ExecutionCheckpointView]) -> Self {
109        Self::from_checkpoint_history_with_lease_graph(run_id, history, None)
110    }
111
112    #[cfg(feature = "execution-server")]
113    pub fn from_checkpoint_history_with_lease_graph(
114        run_id: &str,
115        history: &[ExecutionCheckpointView],
116        lease_graph: Option<Vec<(String, String)>>,
117    ) -> Self {
118        let trace: Vec<KernelTraceEvent> = history
119            .iter()
120            .enumerate()
121            .map(|(index, checkpoint)| KernelTraceEvent {
122                run_id: run_id.to_string(),
123                seq: (index + 1) as u64,
124                step_id: checkpoint.checkpoint_id.clone(),
125                action_id: None,
126                kind: "CheckpointSaved".into(),
127                timestamp_ms: Some(checkpoint.created_at.timestamp_millis()),
128            })
129            .collect();
130        let mut observability = Self::from_kernel_trace(&trace);
131        observability.lease_graph = lease_graph.filter(|edges| !edges.is_empty());
132        observability.interrupt_latency_ms = history
133            .windows(2)
134            .filter_map(|window| {
135                let delta_ms = (window[1].created_at - window[0].created_at).num_milliseconds();
136                (delta_ms >= 0).then_some(delta_ms as u64)
137            })
138            .max();
139        observability
140    }
141}
142
143fn format_trace_event(event: &KernelTraceEvent) -> String {
144    let mut entry = format!("{}#{}", event.kind, event.seq);
145    if let Some(step_id) = &event.step_id {
146        entry.push('(');
147        entry.push_str(step_id);
148        entry.push(')');
149    } else if let Some(action_id) = &event.action_id {
150        entry.push('(');
151        entry.push_str(action_id);
152        entry.push(')');
153    }
154    entry
155}
156
157fn interrupt_latency_from_trace_timestamps(trace: &[KernelTraceEvent]) -> Option<u64> {
158    let mut interrupted_at = None;
159    let mut max_delta_ms = None;
160    for event in trace {
161        match event.kind.as_str() {
162            "Interrupted" => interrupted_at = event.timestamp_ms,
163            "Resumed" => {
164                if let (Some(start_ms), Some(end_ms)) = (interrupted_at.take(), event.timestamp_ms)
165                {
166                    if end_ms >= start_ms {
167                        let delta = (end_ms - start_ms) as u64;
168                        max_delta_ms =
169                            Some(max_delta_ms.map_or(delta, |current: u64| current.max(delta)));
170                    }
171                }
172            }
173            _ => {}
174        }
175    }
176    max_delta_ms
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn rejection_reason_tenant_limit() {
185        let r = RejectionReason::tenant_limit("max concurrent runs");
186        assert!(
187            matches!(r, RejectionReason::TenantLimit(Some(ref s)) if s == "max concurrent runs")
188        );
189    }
190
191    #[test]
192    fn kernel_observability_builder() {
193        let o = KernelObservability::new()
194            .with_reasoning_timeline(vec!["step1".into()])
195            .with_replay_cost(42)
196            .with_interrupt_latency_ms(10);
197        assert_eq!(o.reasoning_timeline, Some(vec!["step1".into()]));
198        assert_eq!(o.replay_cost, Some(42));
199        assert_eq!(o.interrupt_latency_ms, Some(10));
200    }
201
202    #[test]
203    fn kernel_observability_from_trace() {
204        let trace = vec![
205            KernelTraceEvent {
206                run_id: "r1".into(),
207                seq: 1,
208                step_id: Some("n1".into()),
209                action_id: None,
210                kind: "Interrupted".into(),
211                timestamp_ms: None,
212            },
213            KernelTraceEvent {
214                run_id: "r1".into(),
215                seq: 2,
216                step_id: Some("n1".into()),
217                action_id: None,
218                kind: "Resumed".into(),
219                timestamp_ms: None,
220            },
221        ];
222
223        let o = KernelObservability::from_kernel_trace(&trace);
224        assert_eq!(o.replay_cost, Some(2));
225        assert_eq!(o.interrupt_latency_ms, Some(1));
226        assert_eq!(
227            o.reasoning_timeline,
228            Some(vec!["Interrupted#1(n1)".into(), "Resumed#2(n1)".into()])
229        );
230    }
231
232    #[cfg(feature = "execution-server")]
233    #[test]
234    fn kernel_observability_from_checkpoint_history() {
235        use crate::graph_bridge::ExecutionCheckpointView;
236        use chrono::TimeZone;
237
238        let history = vec![
239            ExecutionCheckpointView {
240                checkpoint_id: Some("cp-1".into()),
241                created_at: chrono::Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
242            },
243            ExecutionCheckpointView {
244                checkpoint_id: Some("cp-2".into()),
245                created_at: chrono::Utc.timestamp_millis_opt(1_700_000_001_000).unwrap(),
246            },
247        ];
248
249        let o = KernelObservability::from_checkpoint_history_with_lease_graph(
250            "r-checkpoint",
251            &history,
252            Some(vec![("attempt-1".into(), "worker-1".into())]),
253        );
254        assert_eq!(o.replay_cost, Some(2));
255        assert_eq!(
256            o.reasoning_timeline,
257            Some(vec![
258                "CheckpointSaved#1(cp-1)".into(),
259                "CheckpointSaved#2(cp-2)".into(),
260            ])
261        );
262        assert_eq!(
263            o.lease_graph,
264            Some(vec![("attempt-1".into(), "worker-1".into())])
265        );
266        assert_eq!(o.interrupt_latency_ms, Some(1_000));
267    }
268}