1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use super::EdgePayload;
5use crate::perf::PerfSample;
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
8#[serde(rename_all = "snake_case")]
9pub enum MetricsLevel {
10 Off,
11 Basic,
12 Detailed,
13 Profile,
14}
15
16impl MetricsLevel {
17 pub fn is_basic(self) -> bool {
18 self >= MetricsLevel::Basic
19 }
20
21 pub fn is_detailed(self) -> bool {
22 self >= MetricsLevel::Detailed
23 }
24
25 pub fn is_profile(self) -> bool {
26 self >= MetricsLevel::Profile
27 }
28}
29
30impl Default for MetricsLevel {
31 fn default() -> Self {
32 MetricsLevel::Basic
33 }
34}
35
36const HIST_BUCKETS: usize = 32;
37
38#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
39pub struct Histogram {
40 pub buckets: [u64; HIST_BUCKETS],
41}
42
43impl Default for Histogram {
44 fn default() -> Self {
45 Self { buckets: [0; HIST_BUCKETS] }
46 }
47}
48
49impl Histogram {
50 pub fn record_value(&mut self, value: u64) {
51 let v = value.max(1);
52 let idx = (63 - v.leading_zeros() as usize).min(HIST_BUCKETS - 1);
53 self.buckets[idx] = self.buckets[idx].saturating_add(1);
54 }
55
56 pub fn record_duration(&mut self, duration: Duration) {
57 let micros = duration.as_micros() as u64;
58 self.record_value(micros);
59 }
60
61 pub fn merge(&mut self, other: &Histogram) {
62 for (dst, src) in self.buckets.iter_mut().zip(other.buckets.iter()) {
63 *dst = dst.saturating_add(*src);
64 }
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.buckets.iter().all(|v| *v == 0)
69 }
70}
71
72#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
73pub struct PayloadMetrics {
74 pub in_bytes: u64,
75 pub out_bytes: u64,
76 pub in_count: u64,
77 pub out_count: u64,
78}
79
80impl PayloadMetrics {
81 fn merge(&mut self, other: &PayloadMetrics) {
82 self.in_bytes = self.in_bytes.saturating_add(other.in_bytes);
83 self.out_bytes = self.out_bytes.saturating_add(other.out_bytes);
84 self.in_count = self.in_count.saturating_add(other.in_count);
85 self.out_count = self.out_count.saturating_add(other.out_count);
86 }
87}
88
89#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90pub struct TraceEvent {
91 pub node_idx: usize,
92 pub start_ns: u64,
93 pub duration_ns: u64,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
98pub struct ExecutionTelemetry {
99 pub nodes_executed: usize,
100 pub cpu_segments: usize,
101 pub gpu_segments: usize,
102 pub gpu_fallbacks: usize,
103 pub backpressure_events: usize,
104 pub warnings: smallvec::SmallVec<[String; 8]>,
105 pub graph_duration: Duration,
106 #[serde(default)]
107 pub metrics_level: MetricsLevel,
108 pub node_metrics: BTreeMap<usize, NodeMetrics>,
110 pub group_metrics: BTreeMap<String, NodeMetrics>,
112 pub edge_metrics: BTreeMap<usize, EdgeMetrics>,
114 #[serde(default, skip_serializing_if = "Option::is_none")]
115 pub trace: Option<Vec<TraceEvent>>,
116}
117
118#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
119pub struct NodeMetrics {
120 pub total_duration: Duration,
121 pub calls: usize,
122 #[serde(default)]
123 pub cpu_duration: Duration,
124 #[serde(default, skip_serializing_if = "Option::is_none")]
125 pub perf: Option<NodePerfMetrics>,
126 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub duration_histogram: Option<Histogram>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub payload: Option<PayloadMetrics>,
130}
131
132#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
133pub struct NodePerfMetrics {
134 pub cache_misses: u64,
135 pub branch_instructions: u64,
136 pub branch_misses: u64,
137}
138
139impl NodePerfMetrics {
140 fn record(&mut self, sample: PerfSample) {
141 self.cache_misses = self.cache_misses.saturating_add(sample.cache_misses);
142 self.branch_instructions = self.branch_instructions.saturating_add(sample.branch_instructions);
143 self.branch_misses = self.branch_misses.saturating_add(sample.branch_misses);
144 }
145
146 fn merge(&mut self, other: NodePerfMetrics) {
147 self.cache_misses = self.cache_misses.saturating_add(other.cache_misses);
148 self.branch_instructions = self.branch_instructions.saturating_add(other.branch_instructions);
149 self.branch_misses = self.branch_misses.saturating_add(other.branch_misses);
150 }
151}
152
153#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
154pub struct EdgeMetrics {
155 pub total_wait: Duration,
156 pub samples: usize,
157 #[serde(default)]
158 pub max_depth: u64,
159 #[serde(default)]
160 pub payload_bytes: u64,
161 #[serde(default)]
162 pub payload_count: u64,
163 #[serde(default)]
164 pub gpu_uploads: u64,
165 #[serde(default)]
166 pub gpu_downloads: u64,
167 #[serde(default, skip_serializing_if = "Option::is_none")]
168 pub wait_histogram: Option<Histogram>,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 pub depth_histogram: Option<Histogram>,
171}
172
173impl EdgeMetrics {
174 pub fn merge(&mut self, other: EdgeMetrics) {
175 self.total_wait += other.total_wait;
176 self.samples += other.samples;
177 self.max_depth = self.max_depth.max(other.max_depth);
178 self.payload_bytes = self.payload_bytes.saturating_add(other.payload_bytes);
179 self.payload_count = self.payload_count.saturating_add(other.payload_count);
180 self.gpu_uploads = self.gpu_uploads.saturating_add(other.gpu_uploads);
181 self.gpu_downloads = self.gpu_downloads.saturating_add(other.gpu_downloads);
182 if let Some(other_hist) = other.wait_histogram {
183 let hist = self.wait_histogram.get_or_insert_with(Histogram::default);
184 hist.merge(&other_hist);
185 }
186 if let Some(other_hist) = other.depth_histogram {
187 let hist = self.depth_histogram.get_or_insert_with(Histogram::default);
188 hist.merge(&other_hist);
189 }
190 }
191}
192
193impl NodeMetrics {
194 pub fn record(&mut self, duration: Duration) {
195 self.total_duration += duration;
196 self.calls += 1;
197 }
198
199 pub fn record_perf(&mut self, sample: PerfSample) {
200 let perf = self.perf.get_or_insert_with(NodePerfMetrics::default);
201 perf.record(sample);
202 }
203
204 pub fn merge(&mut self, other: NodeMetrics) {
205 self.total_duration += other.total_duration;
206 self.calls += other.calls;
207 self.cpu_duration += other.cpu_duration;
208 if let Some(other_perf) = other.perf {
209 let perf = self.perf.get_or_insert_with(NodePerfMetrics::default);
210 perf.merge(other_perf);
211 }
212 if let Some(other_hist) = other.duration_histogram {
213 let hist = self.duration_histogram.get_or_insert_with(Histogram::default);
214 hist.merge(&other_hist);
215 }
216 if let Some(other_payload) = other.payload {
217 let payload = self.payload.get_or_insert_with(PayloadMetrics::default);
218 payload.merge(&other_payload);
219 }
220 }
221}
222
223impl ExecutionTelemetry {
224 pub fn with_level(level: MetricsLevel) -> Self {
225 if !cfg!(feature = "metrics") {
226 return Self { metrics_level: MetricsLevel::Off, ..Default::default() };
227 }
228 Self {
229 metrics_level: level,
230 ..Default::default()
231 }
232 }
233
234 pub fn merge(&mut self, other: ExecutionTelemetry) {
235 self.nodes_executed += other.nodes_executed;
236 self.cpu_segments += other.cpu_segments;
237 self.gpu_segments += other.gpu_segments;
238 self.gpu_fallbacks += other.gpu_fallbacks;
239 self.backpressure_events += other.backpressure_events;
240 self.warnings.extend(other.warnings);
241 self.graph_duration = self.graph_duration.max(other.graph_duration);
242 self.metrics_level = self.metrics_level.max(other.metrics_level);
243 for (node, metrics) in other.node_metrics {
244 self.node_metrics.entry(node).or_default().merge(metrics);
245 }
246 for (group, metrics) in other.group_metrics {
247 self.group_metrics.entry(group).or_default().merge(metrics);
248 }
249 for (edge, metrics) in other.edge_metrics {
250 self.edge_metrics.entry(edge).or_default().merge(metrics);
251 }
252 if let Some(other_trace) = other.trace {
253 let trace = self.trace.get_or_insert_with(Vec::new);
254 trace.extend(other_trace);
255 }
256 }
257
258 pub fn record_node_duration(&mut self, node_idx: usize, duration: Duration) {
259 if !cfg!(feature = "metrics") {
260 return;
261 }
262 if !self.metrics_level.is_basic() {
263 return;
264 }
265 let entry = self.node_metrics.entry(node_idx).or_default();
266 entry.record(duration);
267 if self.metrics_level.is_detailed() {
268 entry
269 .duration_histogram
270 .get_or_insert_with(Histogram::default)
271 .record_duration(duration);
272 }
273 }
274
275 pub fn record_node_cpu_duration(&mut self, node_idx: usize, duration: Duration) {
276 if !cfg!(feature = "metrics") {
277 return;
278 }
279 if !self.metrics_level.is_detailed() {
280 return;
281 }
282 let entry = self.node_metrics.entry(node_idx).or_default();
283 entry.cpu_duration += duration;
284 }
285
286 pub fn record_node_perf(&mut self, node_idx: usize, sample: PerfSample) {
287 if !cfg!(feature = "metrics") {
288 return;
289 }
290 let entry = self.node_metrics.entry(node_idx).or_default();
291 entry.record_perf(sample);
292 }
293
294 pub fn record_node_payload_in(&mut self, node_idx: usize, bytes: Option<u64>) {
295 if !cfg!(feature = "metrics") {
296 return;
297 }
298 if !self.metrics_level.is_detailed() {
299 return;
300 }
301 let entry = self.node_metrics.entry(node_idx).or_default();
302 let payload = entry.payload.get_or_insert_with(PayloadMetrics::default);
303 payload.in_count = payload.in_count.saturating_add(1);
304 if let Some(bytes) = bytes {
305 payload.in_bytes = payload.in_bytes.saturating_add(bytes);
306 }
307 }
308
309 pub fn record_node_payload_out(&mut self, node_idx: usize, bytes: Option<u64>) {
310 if !cfg!(feature = "metrics") {
311 return;
312 }
313 if !self.metrics_level.is_detailed() {
314 return;
315 }
316 let entry = self.node_metrics.entry(node_idx).or_default();
317 let payload = entry.payload.get_or_insert_with(PayloadMetrics::default);
318 payload.out_count = payload.out_count.saturating_add(1);
319 if let Some(bytes) = bytes {
320 payload.out_bytes = payload.out_bytes.saturating_add(bytes);
321 }
322 }
323
324 pub fn record_edge_wait(&mut self, edge_idx: usize, duration: Duration) {
325 if !cfg!(feature = "metrics") {
326 return;
327 }
328 if !self.metrics_level.is_basic() {
329 return;
330 }
331 let entry = self.edge_metrics.entry(edge_idx).or_default();
332 entry.total_wait += duration;
333 entry.samples += 1;
334 if self.metrics_level.is_detailed() {
335 entry
336 .wait_histogram
337 .get_or_insert_with(Histogram::default)
338 .record_duration(duration);
339 }
340 }
341
342 pub fn record_edge_depth(&mut self, edge_idx: usize, depth: usize) {
343 if !cfg!(feature = "metrics") {
344 return;
345 }
346 if !self.metrics_level.is_detailed() {
347 return;
348 }
349 let entry = self.edge_metrics.entry(edge_idx).or_default();
350 let depth_u64 = depth as u64;
351 entry.max_depth = entry.max_depth.max(depth_u64);
352 entry
353 .depth_histogram
354 .get_or_insert_with(Histogram::default)
355 .record_value(depth_u64);
356 }
357
358 pub fn record_edge_payload(&mut self, edge_idx: usize, bytes: Option<u64>) {
359 if !cfg!(feature = "metrics") {
360 return;
361 }
362 if !self.metrics_level.is_detailed() {
363 return;
364 }
365 let entry = self.edge_metrics.entry(edge_idx).or_default();
366 entry.payload_count = entry.payload_count.saturating_add(1);
367 if let Some(bytes) = bytes {
368 entry.payload_bytes = entry.payload_bytes.saturating_add(bytes);
369 }
370 }
371
372 pub fn record_edge_gpu_transfer(&mut self, edge_idx: usize, upload: bool) {
373 if !cfg!(feature = "metrics") {
374 return;
375 }
376 if !self.metrics_level.is_detailed() {
377 return;
378 }
379 let entry = self.edge_metrics.entry(edge_idx).or_default();
380 if upload {
381 entry.gpu_uploads = entry.gpu_uploads.saturating_add(1);
382 } else {
383 entry.gpu_downloads = entry.gpu_downloads.saturating_add(1);
384 }
385 }
386
387 pub fn record_trace_event(&mut self, node_idx: usize, start: Duration, duration: Duration) {
388 if !cfg!(feature = "metrics") {
389 return;
390 }
391 if !self.metrics_level.is_profile() {
392 return;
393 }
394 let trace = self.trace.get_or_insert_with(Vec::new);
395 trace.push(TraceEvent {
396 node_idx,
397 start_ns: start.as_nanos() as u64,
398 duration_ns: duration.as_nanos() as u64,
399 });
400 }
401
402 pub fn aggregate_groups(&mut self, nodes: &[crate::plan::RuntimeNode]) {
403 const GROUP_KEY: &str = "daedalus.embedded_group";
404 for (idx, metrics) in &self.node_metrics {
405 let Some(node) = nodes.get(*idx) else {
406 continue;
407 };
408 let Some(daedalus_data::model::Value::String(group)) = node.metadata.get(GROUP_KEY) else {
409 continue;
410 };
411 let trimmed = group.trim();
412 if trimmed.is_empty() {
413 continue;
414 }
415 self.group_metrics
416 .entry(trimmed.to_string())
417 .or_default()
418 .merge(metrics.clone());
419 }
420 }
421}
422
423pub(crate) fn payload_size_bytes(payload: &EdgePayload) -> Option<u64> {
424 match payload {
425 EdgePayload::Unit => Some(0),
426 EdgePayload::Bytes(bytes) => Some(bytes.len() as u64),
427 EdgePayload::Value(value) => value_size_bytes(value),
428 EdgePayload::Any(any) => any_size_bytes(any),
429 #[cfg(feature = "gpu")]
430 EdgePayload::Payload(_) => None,
431 #[cfg(feature = "gpu")]
432 EdgePayload::GpuImage(_) => None,
433 }
434}
435
436fn value_size_bytes(value: &daedalus_data::model::Value) -> Option<u64> {
437 match value {
438 daedalus_data::model::Value::String(s) => Some(s.len() as u64),
439 daedalus_data::model::Value::Bytes(b) => Some(b.len() as u64),
440 _ => None,
441 }
442}
443
444fn any_size_bytes(any: &std::sync::Arc<dyn std::any::Any + Send + Sync>) -> Option<u64> {
445 if let Some(bytes) = any.downcast_ref::<Vec<u8>>() {
446 return Some(bytes.len() as u64);
447 }
448 if let Some(bytes) = any.downcast_ref::<std::sync::Arc<[u8]>>() {
449 return Some(bytes.len() as u64);
450 }
451 if let Some(img) = any.downcast_ref::<image::DynamicImage>() {
452 return Some(dynamic_image_size_bytes(img));
453 }
454 if let Some(img) = any.downcast_ref::<image::GrayImage>() {
455 return Some(img.as_raw().len() as u64);
456 }
457 if let Some(img) = any.downcast_ref::<image::GrayAlphaImage>() {
458 return Some(img.as_raw().len() as u64);
459 }
460 if let Some(img) = any.downcast_ref::<image::RgbImage>() {
461 return Some(img.as_raw().len() as u64);
462 }
463 if let Some(img) = any.downcast_ref::<image::RgbaImage>() {
464 return Some(img.as_raw().len() as u64);
465 }
466 None
467}
468
469fn dynamic_image_size_bytes(img: &image::DynamicImage) -> u64 {
470 match img {
471 image::DynamicImage::ImageLuma8(i) => i.as_raw().len() as u64,
472 image::DynamicImage::ImageLumaA8(i) => i.as_raw().len() as u64,
473 image::DynamicImage::ImageRgb8(i) => i.as_raw().len() as u64,
474 image::DynamicImage::ImageRgba8(i) => i.as_raw().len() as u64,
475 image::DynamicImage::ImageLuma16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
476 image::DynamicImage::ImageLumaA16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
477 image::DynamicImage::ImageRgb16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
478 image::DynamicImage::ImageRgba16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
479 image::DynamicImage::ImageRgb32F(i) => (i.as_raw().len() * std::mem::size_of::<f32>()) as u64,
480 image::DynamicImage::ImageRgba32F(i) => (i.as_raw().len() * std::mem::size_of::<f32>()) as u64,
481 _ => 0,
482 }
483}