vortex_array/operator/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::hash::{Hash, Hasher};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect, VortexResult};
12use vortex_metrics::{Timer, VortexMetrics};
13
14use crate::Canonical;
15use crate::operator::{
16    BatchBindCtx, BatchExecution, BatchExecutionRef, BatchOperator, LengthBounds, Operator,
17    OperatorEq, OperatorHash, OperatorId, OperatorRef,
18};
19use crate::pipeline::bits::BitView;
20use crate::pipeline::view::ViewMut;
21use crate::pipeline::{BindContext, Kernel, KernelContext, PipelinedOperator, RowSelection};
22
23/// An operator that wraps another operator and records metrics about its execution.
24#[derive(Debug)]
25pub struct MetricsOperator {
26    inner: OperatorRef,
27    metrics: VortexMetrics,
28}
29
30impl OperatorHash for MetricsOperator {
31    fn operator_hash<H: Hasher>(&self, state: &mut H) {
32        self.inner.operator_hash(state);
33        // Include our ID just to differentiate from the inner operator
34        self.id().hash(state);
35    }
36}
37
38impl OperatorEq for MetricsOperator {
39    fn operator_eq(&self, other: &Self) -> bool {
40        self.inner.operator_eq(&other.inner)
41    }
42}
43
44impl MetricsOperator {
45    pub fn new(inner: OperatorRef, metrics: VortexMetrics) -> Self {
46        let metrics = metrics.child_with_tags([("operator", inner.id().as_ref().to_string())]);
47        Self { inner, metrics }
48    }
49
50    pub fn metrics(&self) -> &VortexMetrics {
51        &self.metrics
52    }
53}
54
55impl Operator for MetricsOperator {
56    fn id(&self) -> OperatorId {
57        OperatorId::from("vortex.metrics")
58    }
59
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63
64    fn dtype(&self) -> &DType {
65        self.inner.dtype()
66    }
67
68    fn bounds(&self) -> LengthBounds {
69        self.inner.bounds()
70    }
71
72    fn children(&self) -> &[OperatorRef] {
73        self.inner.children()
74    }
75
76    fn with_children(self: Arc<Self>, children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
77        Ok(Arc::new(MetricsOperator {
78            inner: self.inner.clone().with_children(children)?,
79            metrics: self.metrics.clone(),
80        }))
81    }
82
83    fn as_batch(&self) -> Option<&dyn BatchOperator> {
84        self.inner.as_batch().is_some().then_some(self)
85    }
86
87    fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
88        // Only support pipelined execution if the inner operator does
89        self.inner.as_pipelined().is_some().then_some(self)
90    }
91}
92
93impl BatchOperator for MetricsOperator {
94    fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef> {
95        let inner = self.inner.as_batch().vortex_expect("checked").bind(ctx)?;
96        let timer = self.metrics.timer("operator.batch.execute");
97        Ok(Box::new(MetricsBatchExecution { inner, timer }))
98    }
99}
100
101struct MetricsBatchExecution {
102    inner: BatchExecutionRef,
103    timer: Arc<Timer>,
104}
105
106#[async_trait]
107impl BatchExecution for MetricsBatchExecution {
108    async fn execute(self: Box<Self>) -> VortexResult<Canonical> {
109        let _timer = self.timer.time();
110        self.inner.execute().await
111    }
112}
113
114impl PipelinedOperator for MetricsOperator {
115    fn row_selection(&self) -> RowSelection {
116        self.inner
117            .as_pipelined()
118            .vortex_expect("checked")
119            .row_selection()
120    }
121
122    fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
123        let inner = self
124            .inner
125            .as_pipelined()
126            .vortex_expect("checked")
127            .bind(ctx)?;
128        let timer = self.metrics.timer("operator.operator.step");
129        Ok(Box::new(MetricsKernel { inner, timer }))
130    }
131
132    fn vector_children(&self) -> Vec<usize> {
133        self.inner
134            .as_pipelined()
135            .vortex_expect("checked")
136            .vector_children()
137    }
138
139    fn batch_children(&self) -> Vec<usize> {
140        self.inner
141            .as_pipelined()
142            .vortex_expect("checked")
143            .batch_children()
144    }
145}
146
147struct MetricsKernel {
148    inner: Box<dyn Kernel>,
149    timer: Arc<Timer>,
150}
151
152impl Kernel for MetricsKernel {
153    fn step(
154        &self,
155        ctx: &KernelContext,
156        chunk_idx: usize,
157        selection: &BitView,
158        out: &mut ViewMut,
159    ) -> VortexResult<()> {
160        let _timer = self.timer.time();
161        self.inner.step(ctx, chunk_idx, selection, out)
162    }
163}