Skip to main content

oris_execution_runtime/
observability.rs

1//! Backpressure and kernel observability types for scheduling and telemetry.
2//!
3//! **RejectionReason**: why a dispatch or API request was rejected (e.g. tenant limit),
4//! for safe backpressure and clear API responses.
5//!
6//! **KernelObservability**: placeholder structure for kernel telemetry (reasoning timeline,
7//! lease graph, replay cost, interrupt latency). Implementations can fill these for
8//! metrics and tracing; no built-in collection in this crate.
9
10#[cfg(feature = "execution-server")]
11use crate::graph_bridge::ExecutionCheckpointView;
12use oris_kernel::KernelTraceEvent;
13use schemars::JsonSchema;
14use serde::{Deserialize, Serialize};
15
16/// Reason for rejecting a dispatch or API request (e.g. rate limit, tenant cap).
17#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
18pub enum RejectionReason {
19    /// Tenant-level limit exceeded; optional description of the limit.
20    TenantLimit(Option<String>),
21    /// Worker or capacity limit.
22    CapacityLimit(Option<String>),
23    /// Other rejections (policy, invalid request, etc.).
24    Other(String),
25}
26
27impl RejectionReason {
28    pub fn tenant_limit(description: impl Into<String>) -> Self {
29        RejectionReason::TenantLimit(Some(description.into()))
30    }
31
32    pub fn capacity_limit(description: impl Into<String>) -> Self {
33        RejectionReason::CapacityLimit(Some(description.into()))
34    }
35}
36
37/// Placeholder structure for kernel observability / telemetry.
38///
39/// Fields can be populated by the runtime for metrics and tracing.
40/// No built-in collection or export; types exist for API stability.
41#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
42pub struct KernelObservability {
43    /// Optional reasoning or decision timeline (e.g. scheduler steps).
44    pub reasoning_timeline: Option<Vec<String>>,
45    /// Optional lease/ownership snapshot (e.g. attempt → worker).
46    pub lease_graph: Option<Vec<(String, String)>>,
47    /// Optional replay cost hint (e.g. event count or duration).
48    pub replay_cost: Option<u64>,
49    /// Optional interrupt handling latency (e.g. ms).
50    pub interrupt_latency_ms: Option<u64>,
51}
52
53impl KernelObservability {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn with_reasoning_timeline(mut self, entries: Vec<String>) -> Self {
59        self.reasoning_timeline = Some(entries);
60        self
61    }
62
63    pub fn with_lease_graph(mut self, edges: Vec<(String, String)>) -> Self {
64        self.lease_graph = Some(edges);
65        self
66    }
67
68    pub fn with_replay_cost(mut self, cost: u64) -> Self {
69        self.replay_cost = Some(cost);
70        self
71    }
72
73    pub fn with_interrupt_latency_ms(mut self, ms: u64) -> Self {
74        self.interrupt_latency_ms = Some(ms);
75        self
76    }
77
78    pub fn from_kernel_trace(trace: &[KernelTraceEvent]) -> Self {
79        let reasoning_timeline = if trace.is_empty() {
80            None
81        } else {
82            Some(
83                trace
84                    .iter()
85                    .map(|event| format!("{}#{}", event.kind, event.seq))
86                    .collect(),
87            )
88        };
89        let replay_cost = if trace.is_empty() {
90            None
91        } else {
92            Some(trace.len() as u64)
93        };
94        let interrupt_latency_ms = trace
95            .iter()
96            .position(|event| event.kind == "Interrupted")
97            .zip(trace.iter().position(|event| event.kind == "Resumed"))
98            .and_then(|(interrupted, resumed)| resumed.checked_sub(interrupted))
99            .map(|delta| delta as u64);
100
101        Self {
102            reasoning_timeline,
103            lease_graph: None,
104            replay_cost,
105            interrupt_latency_ms,
106        }
107    }
108
109    #[cfg(feature = "execution-server")]
110    pub fn from_checkpoint_history(run_id: &str, history: &[ExecutionCheckpointView]) -> Self {
111        let trace: Vec<KernelTraceEvent> = history
112            .iter()
113            .enumerate()
114            .map(|(index, checkpoint)| KernelTraceEvent {
115                run_id: run_id.to_string(),
116                seq: (index + 1) as u64,
117                step_id: checkpoint.checkpoint_id.clone(),
118                action_id: None,
119                kind: "CheckpointSaved".into(),
120                timestamp_ms: Some(checkpoint.created_at.timestamp_millis()),
121            })
122            .collect();
123        Self::from_kernel_trace(&trace)
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn rejection_reason_tenant_limit() {
133        let r = RejectionReason::tenant_limit("max concurrent runs");
134        assert!(
135            matches!(r, RejectionReason::TenantLimit(Some(ref s)) if s == "max concurrent runs")
136        );
137    }
138
139    #[test]
140    fn kernel_observability_builder() {
141        let o = KernelObservability::new()
142            .with_reasoning_timeline(vec!["step1".into()])
143            .with_replay_cost(42)
144            .with_interrupt_latency_ms(10);
145        assert_eq!(o.reasoning_timeline, Some(vec!["step1".into()]));
146        assert_eq!(o.replay_cost, Some(42));
147        assert_eq!(o.interrupt_latency_ms, Some(10));
148    }
149
150    #[test]
151    fn kernel_observability_from_trace() {
152        let trace = vec![
153            KernelTraceEvent {
154                run_id: "r1".into(),
155                seq: 1,
156                step_id: Some("n1".into()),
157                action_id: None,
158                kind: "Interrupted".into(),
159                timestamp_ms: None,
160            },
161            KernelTraceEvent {
162                run_id: "r1".into(),
163                seq: 2,
164                step_id: Some("n1".into()),
165                action_id: None,
166                kind: "Resumed".into(),
167                timestamp_ms: None,
168            },
169        ];
170
171        let o = KernelObservability::from_kernel_trace(&trace);
172        assert_eq!(o.replay_cost, Some(2));
173        assert_eq!(o.interrupt_latency_ms, Some(1));
174        assert_eq!(
175            o.reasoning_timeline,
176            Some(vec!["Interrupted#1".into(), "Resumed#2".into()])
177        );
178    }
179
180    #[cfg(feature = "execution-server")]
181    #[test]
182    fn kernel_observability_from_checkpoint_history() {
183        use crate::graph_bridge::ExecutionCheckpointView;
184        use chrono::TimeZone;
185
186        let history = vec![
187            ExecutionCheckpointView {
188                checkpoint_id: Some("cp-1".into()),
189                created_at: chrono::Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
190            },
191            ExecutionCheckpointView {
192                checkpoint_id: Some("cp-2".into()),
193                created_at: chrono::Utc.timestamp_millis_opt(1_700_000_001_000).unwrap(),
194            },
195        ];
196
197        let o = KernelObservability::from_checkpoint_history("r-checkpoint", &history);
198        assert_eq!(o.replay_cost, Some(2));
199        assert_eq!(
200            o.reasoning_timeline,
201            Some(vec!["CheckpointSaved#1".into(), "CheckpointSaved#2".into()])
202        );
203        assert_eq!(o.interrupt_latency_ms, None);
204    }
205}