daedalus_runtime/executor/
telemetry.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use super::EdgePayload;
5use crate::perf::PerfSample;
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
8#[serde(rename_all = "snake_case")]
9pub enum MetricsLevel {
10    Off,
11    Basic,
12    Detailed,
13    Profile,
14}
15
16impl MetricsLevel {
17    pub fn is_basic(self) -> bool {
18        self >= MetricsLevel::Basic
19    }
20
21    pub fn is_detailed(self) -> bool {
22        self >= MetricsLevel::Detailed
23    }
24
25    pub fn is_profile(self) -> bool {
26        self >= MetricsLevel::Profile
27    }
28}
29
30impl Default for MetricsLevel {
31    fn default() -> Self {
32        MetricsLevel::Basic
33    }
34}
35
36const HIST_BUCKETS: usize = 32;
37
38#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
39pub struct Histogram {
40    pub buckets: [u64; HIST_BUCKETS],
41}
42
43impl Default for Histogram {
44    fn default() -> Self {
45        Self { buckets: [0; HIST_BUCKETS] }
46    }
47}
48
49impl Histogram {
50    pub fn record_value(&mut self, value: u64) {
51        let v = value.max(1);
52        let idx = (63 - v.leading_zeros() as usize).min(HIST_BUCKETS - 1);
53        self.buckets[idx] = self.buckets[idx].saturating_add(1);
54    }
55
56    pub fn record_duration(&mut self, duration: Duration) {
57        let micros = duration.as_micros() as u64;
58        self.record_value(micros);
59    }
60
61    pub fn merge(&mut self, other: &Histogram) {
62        for (dst, src) in self.buckets.iter_mut().zip(other.buckets.iter()) {
63            *dst = dst.saturating_add(*src);
64        }
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.buckets.iter().all(|v| *v == 0)
69    }
70}
71
72#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
73pub struct PayloadMetrics {
74    pub in_bytes: u64,
75    pub out_bytes: u64,
76    pub in_count: u64,
77    pub out_count: u64,
78}
79
80impl PayloadMetrics {
81    fn merge(&mut self, other: &PayloadMetrics) {
82        self.in_bytes = self.in_bytes.saturating_add(other.in_bytes);
83        self.out_bytes = self.out_bytes.saturating_add(other.out_bytes);
84        self.in_count = self.in_count.saturating_add(other.in_count);
85        self.out_count = self.out_count.saturating_add(other.out_count);
86    }
87}
88
89#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90pub struct TraceEvent {
91    pub node_idx: usize,
92    pub start_ns: u64,
93    pub duration_ns: u64,
94}
95
96/// Aggregated timing + diagnostics for a run.
97#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
98pub struct ExecutionTelemetry {
99    pub nodes_executed: usize,
100    pub cpu_segments: usize,
101    pub gpu_segments: usize,
102    pub gpu_fallbacks: usize,
103    pub backpressure_events: usize,
104    pub warnings: smallvec::SmallVec<[String; 8]>,
105    pub graph_duration: Duration,
106    #[serde(default)]
107    pub metrics_level: MetricsLevel,
108    /// Per-node-instance metrics keyed by the planned node index (`NodeRef.0`).
109    pub node_metrics: BTreeMap<usize, NodeMetrics>,
110    /// Per-group aggregate metrics keyed by group id (e.g. embedded graphs).
111    pub group_metrics: BTreeMap<String, NodeMetrics>,
112    /// Per-edge queue wait metrics keyed by the planned edge index.
113    pub edge_metrics: BTreeMap<usize, EdgeMetrics>,
114    #[serde(default, skip_serializing_if = "Option::is_none")]
115    pub trace: Option<Vec<TraceEvent>>,
116}
117
118#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
119pub struct NodeMetrics {
120    pub total_duration: Duration,
121    pub calls: usize,
122    #[serde(default)]
123    pub cpu_duration: Duration,
124    #[serde(default, skip_serializing_if = "Option::is_none")]
125    pub perf: Option<NodePerfMetrics>,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub duration_histogram: Option<Histogram>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub payload: Option<PayloadMetrics>,
130}
131
132#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
133pub struct NodePerfMetrics {
134    pub cache_misses: u64,
135    pub branch_instructions: u64,
136    pub branch_misses: u64,
137}
138
139impl NodePerfMetrics {
140    fn record(&mut self, sample: PerfSample) {
141        self.cache_misses = self.cache_misses.saturating_add(sample.cache_misses);
142        self.branch_instructions = self.branch_instructions.saturating_add(sample.branch_instructions);
143        self.branch_misses = self.branch_misses.saturating_add(sample.branch_misses);
144    }
145
146    fn merge(&mut self, other: NodePerfMetrics) {
147        self.cache_misses = self.cache_misses.saturating_add(other.cache_misses);
148        self.branch_instructions = self.branch_instructions.saturating_add(other.branch_instructions);
149        self.branch_misses = self.branch_misses.saturating_add(other.branch_misses);
150    }
151}
152
153#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
154pub struct EdgeMetrics {
155    pub total_wait: Duration,
156    pub samples: usize,
157    #[serde(default)]
158    pub max_depth: u64,
159    #[serde(default)]
160    pub payload_bytes: u64,
161    #[serde(default)]
162    pub payload_count: u64,
163    #[serde(default)]
164    pub gpu_uploads: u64,
165    #[serde(default)]
166    pub gpu_downloads: u64,
167    #[serde(default, skip_serializing_if = "Option::is_none")]
168    pub wait_histogram: Option<Histogram>,
169    #[serde(default, skip_serializing_if = "Option::is_none")]
170    pub depth_histogram: Option<Histogram>,
171}
172
173impl EdgeMetrics {
174    pub fn merge(&mut self, other: EdgeMetrics) {
175        self.total_wait += other.total_wait;
176        self.samples += other.samples;
177        self.max_depth = self.max_depth.max(other.max_depth);
178        self.payload_bytes = self.payload_bytes.saturating_add(other.payload_bytes);
179        self.payload_count = self.payload_count.saturating_add(other.payload_count);
180        self.gpu_uploads = self.gpu_uploads.saturating_add(other.gpu_uploads);
181        self.gpu_downloads = self.gpu_downloads.saturating_add(other.gpu_downloads);
182        if let Some(other_hist) = other.wait_histogram {
183            let hist = self.wait_histogram.get_or_insert_with(Histogram::default);
184            hist.merge(&other_hist);
185        }
186        if let Some(other_hist) = other.depth_histogram {
187            let hist = self.depth_histogram.get_or_insert_with(Histogram::default);
188            hist.merge(&other_hist);
189        }
190    }
191}
192
193impl NodeMetrics {
194    pub fn record(&mut self, duration: Duration) {
195        self.total_duration += duration;
196        self.calls += 1;
197    }
198
199    pub fn record_perf(&mut self, sample: PerfSample) {
200        let perf = self.perf.get_or_insert_with(NodePerfMetrics::default);
201        perf.record(sample);
202    }
203
204    pub fn merge(&mut self, other: NodeMetrics) {
205        self.total_duration += other.total_duration;
206        self.calls += other.calls;
207        self.cpu_duration += other.cpu_duration;
208        if let Some(other_perf) = other.perf {
209            let perf = self.perf.get_or_insert_with(NodePerfMetrics::default);
210            perf.merge(other_perf);
211        }
212        if let Some(other_hist) = other.duration_histogram {
213            let hist = self.duration_histogram.get_or_insert_with(Histogram::default);
214            hist.merge(&other_hist);
215        }
216        if let Some(other_payload) = other.payload {
217            let payload = self.payload.get_or_insert_with(PayloadMetrics::default);
218            payload.merge(&other_payload);
219        }
220    }
221}
222
223impl ExecutionTelemetry {
224    pub fn with_level(level: MetricsLevel) -> Self {
225        if !cfg!(feature = "metrics") {
226            return Self { metrics_level: MetricsLevel::Off, ..Default::default() };
227        }
228        Self {
229            metrics_level: level,
230            ..Default::default()
231        }
232    }
233
234    pub fn merge(&mut self, other: ExecutionTelemetry) {
235        self.nodes_executed += other.nodes_executed;
236        self.cpu_segments += other.cpu_segments;
237        self.gpu_segments += other.gpu_segments;
238        self.gpu_fallbacks += other.gpu_fallbacks;
239        self.backpressure_events += other.backpressure_events;
240        self.warnings.extend(other.warnings);
241        self.graph_duration = self.graph_duration.max(other.graph_duration);
242        self.metrics_level = self.metrics_level.max(other.metrics_level);
243        for (node, metrics) in other.node_metrics {
244            self.node_metrics.entry(node).or_default().merge(metrics);
245        }
246        for (group, metrics) in other.group_metrics {
247            self.group_metrics.entry(group).or_default().merge(metrics);
248        }
249        for (edge, metrics) in other.edge_metrics {
250            self.edge_metrics.entry(edge).or_default().merge(metrics);
251        }
252        if let Some(other_trace) = other.trace {
253            let trace = self.trace.get_or_insert_with(Vec::new);
254            trace.extend(other_trace);
255        }
256    }
257
258    pub fn record_node_duration(&mut self, node_idx: usize, duration: Duration) {
259        if !cfg!(feature = "metrics") {
260            return;
261        }
262        if !self.metrics_level.is_basic() {
263            return;
264        }
265        let entry = self.node_metrics.entry(node_idx).or_default();
266        entry.record(duration);
267        if self.metrics_level.is_detailed() {
268            entry
269                .duration_histogram
270                .get_or_insert_with(Histogram::default)
271                .record_duration(duration);
272        }
273    }
274
275    pub fn record_node_cpu_duration(&mut self, node_idx: usize, duration: Duration) {
276        if !cfg!(feature = "metrics") {
277            return;
278        }
279        if !self.metrics_level.is_detailed() {
280            return;
281        }
282        let entry = self.node_metrics.entry(node_idx).or_default();
283        entry.cpu_duration += duration;
284    }
285
286    pub fn record_node_perf(&mut self, node_idx: usize, sample: PerfSample) {
287        if !cfg!(feature = "metrics") {
288            return;
289        }
290        let entry = self.node_metrics.entry(node_idx).or_default();
291        entry.record_perf(sample);
292    }
293
294    pub fn record_node_payload_in(&mut self, node_idx: usize, bytes: Option<u64>) {
295        if !cfg!(feature = "metrics") {
296            return;
297        }
298        if !self.metrics_level.is_detailed() {
299            return;
300        }
301        let entry = self.node_metrics.entry(node_idx).or_default();
302        let payload = entry.payload.get_or_insert_with(PayloadMetrics::default);
303        payload.in_count = payload.in_count.saturating_add(1);
304        if let Some(bytes) = bytes {
305            payload.in_bytes = payload.in_bytes.saturating_add(bytes);
306        }
307    }
308
309    pub fn record_node_payload_out(&mut self, node_idx: usize, bytes: Option<u64>) {
310        if !cfg!(feature = "metrics") {
311            return;
312        }
313        if !self.metrics_level.is_detailed() {
314            return;
315        }
316        let entry = self.node_metrics.entry(node_idx).or_default();
317        let payload = entry.payload.get_or_insert_with(PayloadMetrics::default);
318        payload.out_count = payload.out_count.saturating_add(1);
319        if let Some(bytes) = bytes {
320            payload.out_bytes = payload.out_bytes.saturating_add(bytes);
321        }
322    }
323
324    pub fn record_edge_wait(&mut self, edge_idx: usize, duration: Duration) {
325        if !cfg!(feature = "metrics") {
326            return;
327        }
328        if !self.metrics_level.is_basic() {
329            return;
330        }
331        let entry = self.edge_metrics.entry(edge_idx).or_default();
332        entry.total_wait += duration;
333        entry.samples += 1;
334        if self.metrics_level.is_detailed() {
335            entry
336                .wait_histogram
337                .get_or_insert_with(Histogram::default)
338                .record_duration(duration);
339        }
340    }
341
342    pub fn record_edge_depth(&mut self, edge_idx: usize, depth: usize) {
343        if !cfg!(feature = "metrics") {
344            return;
345        }
346        if !self.metrics_level.is_detailed() {
347            return;
348        }
349        let entry = self.edge_metrics.entry(edge_idx).or_default();
350        let depth_u64 = depth as u64;
351        entry.max_depth = entry.max_depth.max(depth_u64);
352        entry
353            .depth_histogram
354            .get_or_insert_with(Histogram::default)
355            .record_value(depth_u64);
356    }
357
358    pub fn record_edge_payload(&mut self, edge_idx: usize, bytes: Option<u64>) {
359        if !cfg!(feature = "metrics") {
360            return;
361        }
362        if !self.metrics_level.is_detailed() {
363            return;
364        }
365        let entry = self.edge_metrics.entry(edge_idx).or_default();
366        entry.payload_count = entry.payload_count.saturating_add(1);
367        if let Some(bytes) = bytes {
368            entry.payload_bytes = entry.payload_bytes.saturating_add(bytes);
369        }
370    }
371
372    pub fn record_edge_gpu_transfer(&mut self, edge_idx: usize, upload: bool) {
373        if !cfg!(feature = "metrics") {
374            return;
375        }
376        if !self.metrics_level.is_detailed() {
377            return;
378        }
379        let entry = self.edge_metrics.entry(edge_idx).or_default();
380        if upload {
381            entry.gpu_uploads = entry.gpu_uploads.saturating_add(1);
382        } else {
383            entry.gpu_downloads = entry.gpu_downloads.saturating_add(1);
384        }
385    }
386
387    pub fn record_trace_event(&mut self, node_idx: usize, start: Duration, duration: Duration) {
388        if !cfg!(feature = "metrics") {
389            return;
390        }
391        if !self.metrics_level.is_profile() {
392            return;
393        }
394        let trace = self.trace.get_or_insert_with(Vec::new);
395        trace.push(TraceEvent {
396            node_idx,
397            start_ns: start.as_nanos() as u64,
398            duration_ns: duration.as_nanos() as u64,
399        });
400    }
401
402    pub fn aggregate_groups(&mut self, nodes: &[crate::plan::RuntimeNode]) {
403        const GROUP_KEY: &str = "daedalus.embedded_group";
404        for (idx, metrics) in &self.node_metrics {
405            let Some(node) = nodes.get(*idx) else {
406                continue;
407            };
408            let Some(daedalus_data::model::Value::String(group)) = node.metadata.get(GROUP_KEY) else {
409                continue;
410            };
411            let trimmed = group.trim();
412            if trimmed.is_empty() {
413                continue;
414            }
415            self.group_metrics
416                .entry(trimmed.to_string())
417                .or_default()
418                .merge(metrics.clone());
419        }
420    }
421}
422
423pub(crate) fn payload_size_bytes(payload: &EdgePayload) -> Option<u64> {
424    match payload {
425        EdgePayload::Unit => Some(0),
426        EdgePayload::Bytes(bytes) => Some(bytes.len() as u64),
427        EdgePayload::Value(value) => value_size_bytes(value),
428        EdgePayload::Any(any) => any_size_bytes(any),
429        #[cfg(feature = "gpu")]
430        EdgePayload::Payload(_) => None,
431        #[cfg(feature = "gpu")]
432        EdgePayload::GpuImage(_) => None,
433    }
434}
435
436fn value_size_bytes(value: &daedalus_data::model::Value) -> Option<u64> {
437    match value {
438        daedalus_data::model::Value::String(s) => Some(s.len() as u64),
439        daedalus_data::model::Value::Bytes(b) => Some(b.len() as u64),
440        _ => None,
441    }
442}
443
444fn any_size_bytes(any: &std::sync::Arc<dyn std::any::Any + Send + Sync>) -> Option<u64> {
445    if let Some(bytes) = any.downcast_ref::<Vec<u8>>() {
446        return Some(bytes.len() as u64);
447    }
448    if let Some(bytes) = any.downcast_ref::<std::sync::Arc<[u8]>>() {
449        return Some(bytes.len() as u64);
450    }
451    if let Some(img) = any.downcast_ref::<image::DynamicImage>() {
452        return Some(dynamic_image_size_bytes(img));
453    }
454    if let Some(img) = any.downcast_ref::<image::GrayImage>() {
455        return Some(img.as_raw().len() as u64);
456    }
457    if let Some(img) = any.downcast_ref::<image::GrayAlphaImage>() {
458        return Some(img.as_raw().len() as u64);
459    }
460    if let Some(img) = any.downcast_ref::<image::RgbImage>() {
461        return Some(img.as_raw().len() as u64);
462    }
463    if let Some(img) = any.downcast_ref::<image::RgbaImage>() {
464        return Some(img.as_raw().len() as u64);
465    }
466    None
467}
468
469fn dynamic_image_size_bytes(img: &image::DynamicImage) -> u64 {
470    match img {
471        image::DynamicImage::ImageLuma8(i) => i.as_raw().len() as u64,
472        image::DynamicImage::ImageLumaA8(i) => i.as_raw().len() as u64,
473        image::DynamicImage::ImageRgb8(i) => i.as_raw().len() as u64,
474        image::DynamicImage::ImageRgba8(i) => i.as_raw().len() as u64,
475        image::DynamicImage::ImageLuma16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
476        image::DynamicImage::ImageLumaA16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
477        image::DynamicImage::ImageRgb16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
478        image::DynamicImage::ImageRgba16(i) => (i.as_raw().len() * std::mem::size_of::<u16>()) as u64,
479        image::DynamicImage::ImageRgb32F(i) => (i.as_raw().len() * std::mem::size_of::<f32>()) as u64,
480        image::DynamicImage::ImageRgba32F(i) => (i.as_raw().len() * std::mem::size_of::<f32>()) as u64,
481        _ => 0,
482    }
483}