oris_execution_runtime/
observability.rs1#[cfg(feature = "execution-server")]
11use crate::graph_bridge::ExecutionCheckpointView;
12use oris_kernel::KernelTraceEvent;
13use schemars::JsonSchema;
14use serde::{Deserialize, Serialize};
15
16#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
18pub enum RejectionReason {
19 TenantLimit(Option<String>),
21 CapacityLimit(Option<String>),
23 Other(String),
25}
26
27impl RejectionReason {
28 pub fn tenant_limit(description: impl Into<String>) -> Self {
29 RejectionReason::TenantLimit(Some(description.into()))
30 }
31
32 pub fn capacity_limit(description: impl Into<String>) -> Self {
33 RejectionReason::CapacityLimit(Some(description.into()))
34 }
35}
36
37#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
42pub struct KernelObservability {
43 pub reasoning_timeline: Option<Vec<String>>,
45 pub lease_graph: Option<Vec<(String, String)>>,
47 pub replay_cost: Option<u64>,
49 pub interrupt_latency_ms: Option<u64>,
51}
52
53impl KernelObservability {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn with_reasoning_timeline(mut self, entries: Vec<String>) -> Self {
59 self.reasoning_timeline = Some(entries);
60 self
61 }
62
63 pub fn with_lease_graph(mut self, edges: Vec<(String, String)>) -> Self {
64 self.lease_graph = Some(edges);
65 self
66 }
67
68 pub fn with_replay_cost(mut self, cost: u64) -> Self {
69 self.replay_cost = Some(cost);
70 self
71 }
72
73 pub fn with_interrupt_latency_ms(mut self, ms: u64) -> Self {
74 self.interrupt_latency_ms = Some(ms);
75 self
76 }
77
78 pub fn from_kernel_trace(trace: &[KernelTraceEvent]) -> Self {
79 let reasoning_timeline = if trace.is_empty() {
80 None
81 } else {
82 Some(
83 trace
84 .iter()
85 .map(|event| format!("{}#{}", event.kind, event.seq))
86 .collect(),
87 )
88 };
89 let replay_cost = if trace.is_empty() {
90 None
91 } else {
92 Some(trace.len() as u64)
93 };
94 let interrupt_latency_ms = trace
95 .iter()
96 .position(|event| event.kind == "Interrupted")
97 .zip(trace.iter().position(|event| event.kind == "Resumed"))
98 .and_then(|(interrupted, resumed)| resumed.checked_sub(interrupted))
99 .map(|delta| delta as u64);
100
101 Self {
102 reasoning_timeline,
103 lease_graph: None,
104 replay_cost,
105 interrupt_latency_ms,
106 }
107 }
108
109 #[cfg(feature = "execution-server")]
110 pub fn from_checkpoint_history(run_id: &str, history: &[ExecutionCheckpointView]) -> Self {
111 let trace: Vec<KernelTraceEvent> = history
112 .iter()
113 .enumerate()
114 .map(|(index, checkpoint)| KernelTraceEvent {
115 run_id: run_id.to_string(),
116 seq: (index + 1) as u64,
117 step_id: checkpoint.checkpoint_id.clone(),
118 action_id: None,
119 kind: "CheckpointSaved".into(),
120 timestamp_ms: Some(checkpoint.created_at.timestamp_millis()),
121 })
122 .collect();
123 Self::from_kernel_trace(&trace)
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn rejection_reason_tenant_limit() {
133 let r = RejectionReason::tenant_limit("max concurrent runs");
134 assert!(
135 matches!(r, RejectionReason::TenantLimit(Some(ref s)) if s == "max concurrent runs")
136 );
137 }
138
139 #[test]
140 fn kernel_observability_builder() {
141 let o = KernelObservability::new()
142 .with_reasoning_timeline(vec!["step1".into()])
143 .with_replay_cost(42)
144 .with_interrupt_latency_ms(10);
145 assert_eq!(o.reasoning_timeline, Some(vec!["step1".into()]));
146 assert_eq!(o.replay_cost, Some(42));
147 assert_eq!(o.interrupt_latency_ms, Some(10));
148 }
149
150 #[test]
151 fn kernel_observability_from_trace() {
152 let trace = vec![
153 KernelTraceEvent {
154 run_id: "r1".into(),
155 seq: 1,
156 step_id: Some("n1".into()),
157 action_id: None,
158 kind: "Interrupted".into(),
159 timestamp_ms: None,
160 },
161 KernelTraceEvent {
162 run_id: "r1".into(),
163 seq: 2,
164 step_id: Some("n1".into()),
165 action_id: None,
166 kind: "Resumed".into(),
167 timestamp_ms: None,
168 },
169 ];
170
171 let o = KernelObservability::from_kernel_trace(&trace);
172 assert_eq!(o.replay_cost, Some(2));
173 assert_eq!(o.interrupt_latency_ms, Some(1));
174 assert_eq!(
175 o.reasoning_timeline,
176 Some(vec!["Interrupted#1".into(), "Resumed#2".into()])
177 );
178 }
179
180 #[cfg(feature = "execution-server")]
181 #[test]
182 fn kernel_observability_from_checkpoint_history() {
183 use crate::graph_bridge::ExecutionCheckpointView;
184 use chrono::TimeZone;
185
186 let history = vec![
187 ExecutionCheckpointView {
188 checkpoint_id: Some("cp-1".into()),
189 created_at: chrono::Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
190 },
191 ExecutionCheckpointView {
192 checkpoint_id: Some("cp-2".into()),
193 created_at: chrono::Utc.timestamp_millis_opt(1_700_000_001_000).unwrap(),
194 },
195 ];
196
197 let o = KernelObservability::from_checkpoint_history("r-checkpoint", &history);
198 assert_eq!(o.replay_cost, Some(2));
199 assert_eq!(
200 o.reasoning_timeline,
201 Some(vec!["CheckpointSaved#1".into(), "CheckpointSaved#2".into()])
202 );
203 assert_eq!(o.interrupt_latency_ms, None);
204 }
205}