vortex_array/operator/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::{Hash, Hasher};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect, VortexResult};
12use vortex_metrics::{Timer, VortexMetrics};
13
14use crate::Canonical;
15use crate::operator::{
16    BatchBindCtx, BatchExecution, BatchExecutionRef, BatchOperator, Operator, OperatorEq,
17    OperatorHash, OperatorId, OperatorRef,
18};
19use crate::pipeline::view::ViewMut;
20use crate::pipeline::{BindContext, Kernel, KernelContext, PipelinedOperator};
21
22/// An operator that wraps another operator and records metrics about its execution.
23#[derive(Debug)]
24pub struct MetricsOperator {
25    inner: OperatorRef,
26    metrics: VortexMetrics,
27}
28
29impl OperatorHash for MetricsOperator {
30    fn operator_hash<H: Hasher>(&self, state: &mut H) {
31        self.inner.operator_hash(state);
32        // Include our ID just to differentiate from the inner operator
33        self.id().hash(state);
34    }
35}
36
37impl OperatorEq for MetricsOperator {
38    fn operator_eq(&self, other: &Self) -> bool {
39        self.inner.operator_eq(&other.inner)
40    }
41}
42
43impl MetricsOperator {
44    pub fn new(inner: OperatorRef, metrics: VortexMetrics) -> Self {
45        let metrics = metrics.child_with_tags([("operator", inner.id().as_ref().to_string())]);
46        Self { inner, metrics }
47    }
48
49    pub fn metrics(&self) -> &VortexMetrics {
50        &self.metrics
51    }
52}
53
54impl Operator for MetricsOperator {
55    fn id(&self) -> OperatorId {
56        OperatorId::from("vortex.metrics")
57    }
58
59    fn as_any(&self) -> &dyn Any {
60        self
61    }
62
63    fn dtype(&self) -> &DType {
64        self.inner.dtype()
65    }
66
67    fn len(&self) -> usize {
68        self.inner.len()
69    }
70
71    fn children(&self) -> &[OperatorRef] {
72        self.inner.children()
73    }
74
75    fn with_children(self: Arc<Self>, children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
76        Ok(Arc::new(MetricsOperator {
77            inner: self.inner.clone().with_children(children)?,
78            metrics: self.metrics.clone(),
79        }))
80    }
81
82    fn as_batch(&self) -> Option<&dyn BatchOperator> {
83        self.inner.as_batch().is_some().then_some(self)
84    }
85
86    fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
87        // Only support pipelined execution if the inner operator does
88        self.inner.as_pipelined().is_some().then_some(self)
89    }
90}
91
92impl BatchOperator for MetricsOperator {
93    fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef> {
94        let inner = self.inner.as_batch().vortex_expect("checked").bind(ctx)?;
95        let timer = self.metrics.timer("operator.batch.execute");
96        Ok(Box::new(MetricsBatchExecution { inner, timer }))
97    }
98}
99
100struct MetricsBatchExecution {
101    inner: BatchExecutionRef,
102    timer: Arc<Timer>,
103}
104
105#[async_trait]
106impl BatchExecution for MetricsBatchExecution {
107    async fn execute(self: Box<Self>) -> VortexResult<Canonical> {
108        let _timer = self.timer.time();
109        self.inner.execute().await
110    }
111}
112
113impl PipelinedOperator for MetricsOperator {
114    fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
115        let inner = self
116            .inner
117            .as_pipelined()
118            .vortex_expect("checked")
119            .bind(ctx)?;
120        let timer = self.metrics.timer("operator.operator.step");
121        Ok(Box::new(MetricsKernel { inner, timer }))
122    }
123
124    fn vector_children(&self) -> Vec<usize> {
125        self.inner
126            .as_pipelined()
127            .vortex_expect("checked")
128            .vector_children()
129    }
130
131    fn batch_children(&self) -> Vec<usize> {
132        self.inner
133            .as_pipelined()
134            .vortex_expect("checked")
135            .batch_children()
136    }
137}
138
139struct MetricsKernel {
140    inner: Box<dyn Kernel>,
141    timer: Arc<Timer>,
142}
143
144impl Kernel for MetricsKernel {
145    fn step(&mut self, ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()> {
146        let _timer = self.timer.time();
147        self.inner.step(ctx, out)
148    }
149}