1#[cfg(feature = "execution-server")]
12use crate::graph_bridge::ExecutionCheckpointView;
13use oris_kernel::KernelTraceEvent;
14use schemars::JsonSchema;
15use serde::{Deserialize, Serialize};
16
17#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema)]
19pub enum RejectionReason {
20 TenantLimit(Option<String>),
22 CapacityLimit(Option<String>),
24 Other(String),
26}
27
28impl RejectionReason {
29 pub fn tenant_limit(description: impl Into<String>) -> Self {
30 RejectionReason::TenantLimit(Some(description.into()))
31 }
32
33 pub fn capacity_limit(description: impl Into<String>) -> Self {
34 RejectionReason::CapacityLimit(Some(description.into()))
35 }
36}
37
38#[derive(Clone, Debug, Default, Serialize, Deserialize, JsonSchema)]
43pub struct KernelObservability {
44 pub reasoning_timeline: Option<Vec<String>>,
46 pub lease_graph: Option<Vec<(String, String)>>,
48 pub replay_cost: Option<u64>,
50 pub interrupt_latency_ms: Option<u64>,
52}
53
54impl KernelObservability {
55 pub fn new() -> Self {
56 Self::default()
57 }
58
59 pub fn with_reasoning_timeline(mut self, entries: Vec<String>) -> Self {
60 self.reasoning_timeline = Some(entries);
61 self
62 }
63
64 pub fn with_lease_graph(mut self, edges: Vec<(String, String)>) -> Self {
65 self.lease_graph = Some(edges);
66 self
67 }
68
69 pub fn with_replay_cost(mut self, cost: u64) -> Self {
70 self.replay_cost = Some(cost);
71 self
72 }
73
74 pub fn with_interrupt_latency_ms(mut self, ms: u64) -> Self {
75 self.interrupt_latency_ms = Some(ms);
76 self
77 }
78
79 pub fn from_kernel_trace(trace: &[KernelTraceEvent]) -> Self {
80 let reasoning_timeline = if trace.is_empty() {
81 None
82 } else {
83 Some(trace.iter().map(format_trace_event).collect())
84 };
85 let replay_cost = if trace.is_empty() {
86 None
87 } else {
88 Some(trace.len() as u64)
89 };
90 let interrupt_latency_ms = interrupt_latency_from_trace_timestamps(trace).or_else(|| {
91 trace
92 .iter()
93 .position(|event| event.kind == "Interrupted")
94 .zip(trace.iter().position(|event| event.kind == "Resumed"))
95 .and_then(|(interrupted, resumed)| resumed.checked_sub(interrupted))
96 .map(|delta| delta as u64)
97 });
98
99 Self {
100 reasoning_timeline,
101 lease_graph: None,
102 replay_cost,
103 interrupt_latency_ms,
104 }
105 }
106
107 #[cfg(feature = "execution-server")]
108 pub fn from_checkpoint_history(run_id: &str, history: &[ExecutionCheckpointView]) -> Self {
109 Self::from_checkpoint_history_with_lease_graph(run_id, history, None)
110 }
111
112 #[cfg(feature = "execution-server")]
113 pub fn from_checkpoint_history_with_lease_graph(
114 run_id: &str,
115 history: &[ExecutionCheckpointView],
116 lease_graph: Option<Vec<(String, String)>>,
117 ) -> Self {
118 let trace: Vec<KernelTraceEvent> = history
119 .iter()
120 .enumerate()
121 .map(|(index, checkpoint)| KernelTraceEvent {
122 run_id: run_id.to_string(),
123 seq: (index + 1) as u64,
124 step_id: checkpoint.checkpoint_id.clone(),
125 action_id: None,
126 kind: "CheckpointSaved".into(),
127 timestamp_ms: Some(checkpoint.created_at.timestamp_millis()),
128 })
129 .collect();
130 let mut observability = Self::from_kernel_trace(&trace);
131 observability.lease_graph = lease_graph.filter(|edges| !edges.is_empty());
132 observability.interrupt_latency_ms = history
133 .windows(2)
134 .filter_map(|window| {
135 let delta_ms = (window[1].created_at - window[0].created_at).num_milliseconds();
136 (delta_ms >= 0).then_some(delta_ms as u64)
137 })
138 .max();
139 observability
140 }
141}
142
143fn format_trace_event(event: &KernelTraceEvent) -> String {
144 let mut entry = format!("{}#{}", event.kind, event.seq);
145 if let Some(step_id) = &event.step_id {
146 entry.push('(');
147 entry.push_str(step_id);
148 entry.push(')');
149 } else if let Some(action_id) = &event.action_id {
150 entry.push('(');
151 entry.push_str(action_id);
152 entry.push(')');
153 }
154 entry
155}
156
157fn interrupt_latency_from_trace_timestamps(trace: &[KernelTraceEvent]) -> Option<u64> {
158 let mut interrupted_at = None;
159 let mut max_delta_ms = None;
160 for event in trace {
161 match event.kind.as_str() {
162 "Interrupted" => interrupted_at = event.timestamp_ms,
163 "Resumed" => {
164 if let (Some(start_ms), Some(end_ms)) = (interrupted_at.take(), event.timestamp_ms)
165 {
166 if end_ms >= start_ms {
167 let delta = (end_ms - start_ms) as u64;
168 max_delta_ms =
169 Some(max_delta_ms.map_or(delta, |current: u64| current.max(delta)));
170 }
171 }
172 }
173 _ => {}
174 }
175 }
176 max_delta_ms
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn rejection_reason_tenant_limit() {
185 let r = RejectionReason::tenant_limit("max concurrent runs");
186 assert!(
187 matches!(r, RejectionReason::TenantLimit(Some(ref s)) if s == "max concurrent runs")
188 );
189 }
190
191 #[test]
192 fn kernel_observability_builder() {
193 let o = KernelObservability::new()
194 .with_reasoning_timeline(vec!["step1".into()])
195 .with_replay_cost(42)
196 .with_interrupt_latency_ms(10);
197 assert_eq!(o.reasoning_timeline, Some(vec!["step1".into()]));
198 assert_eq!(o.replay_cost, Some(42));
199 assert_eq!(o.interrupt_latency_ms, Some(10));
200 }
201
202 #[test]
203 fn kernel_observability_from_trace() {
204 let trace = vec![
205 KernelTraceEvent {
206 run_id: "r1".into(),
207 seq: 1,
208 step_id: Some("n1".into()),
209 action_id: None,
210 kind: "Interrupted".into(),
211 timestamp_ms: None,
212 },
213 KernelTraceEvent {
214 run_id: "r1".into(),
215 seq: 2,
216 step_id: Some("n1".into()),
217 action_id: None,
218 kind: "Resumed".into(),
219 timestamp_ms: None,
220 },
221 ];
222
223 let o = KernelObservability::from_kernel_trace(&trace);
224 assert_eq!(o.replay_cost, Some(2));
225 assert_eq!(o.interrupt_latency_ms, Some(1));
226 assert_eq!(
227 o.reasoning_timeline,
228 Some(vec!["Interrupted#1(n1)".into(), "Resumed#2(n1)".into()])
229 );
230 }
231
232 #[cfg(feature = "execution-server")]
233 #[test]
234 fn kernel_observability_from_checkpoint_history() {
235 use crate::graph_bridge::ExecutionCheckpointView;
236 use chrono::TimeZone;
237
238 let history = vec![
239 ExecutionCheckpointView {
240 checkpoint_id: Some("cp-1".into()),
241 created_at: chrono::Utc.timestamp_millis_opt(1_700_000_000_000).unwrap(),
242 },
243 ExecutionCheckpointView {
244 checkpoint_id: Some("cp-2".into()),
245 created_at: chrono::Utc.timestamp_millis_opt(1_700_000_001_000).unwrap(),
246 },
247 ];
248
249 let o = KernelObservability::from_checkpoint_history_with_lease_graph(
250 "r-checkpoint",
251 &history,
252 Some(vec![("attempt-1".into(), "worker-1".into())]),
253 );
254 assert_eq!(o.replay_cost, Some(2));
255 assert_eq!(
256 o.reasoning_timeline,
257 Some(vec![
258 "CheckpointSaved#1(cp-1)".into(),
259 "CheckpointSaved#2(cp-2)".into(),
260 ])
261 );
262 assert_eq!(
263 o.lease_graph,
264 Some(vec![("attempt-1".into(), "worker-1".into())])
265 );
266 assert_eq!(o.interrupt_latency_ms, Some(1_000));
267 }
268}