vortex_array/operator/
metrics.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::hash::{Hash, Hasher};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect, VortexResult};
12use vortex_metrics::{Timer, VortexMetrics};
13
14use crate::Canonical;
15use crate::operator::{
16 BatchBindCtx, BatchExecution, BatchExecutionRef, BatchOperator, LengthBounds, Operator,
17 OperatorEq, OperatorHash, OperatorId, OperatorRef,
18};
19use crate::pipeline::bits::BitView;
20use crate::pipeline::view::ViewMut;
21use crate::pipeline::{BindContext, Kernel, KernelContext, PipelinedOperator, RowSelection};
22
23#[derive(Debug)]
25pub struct MetricsOperator {
26 inner: OperatorRef,
27 metrics: VortexMetrics,
28}
29
30impl OperatorHash for MetricsOperator {
31 fn operator_hash<H: Hasher>(&self, state: &mut H) {
32 self.inner.operator_hash(state);
33 self.id().hash(state);
35 }
36}
37
38impl OperatorEq for MetricsOperator {
39 fn operator_eq(&self, other: &Self) -> bool {
40 self.inner.operator_eq(&other.inner)
41 }
42}
43
44impl MetricsOperator {
45 pub fn new(inner: OperatorRef, metrics: VortexMetrics) -> Self {
46 let metrics = metrics.child_with_tags([("operator", inner.id().as_ref().to_string())]);
47 Self { inner, metrics }
48 }
49
50 pub fn metrics(&self) -> &VortexMetrics {
51 &self.metrics
52 }
53}
54
55impl Operator for MetricsOperator {
56 fn id(&self) -> OperatorId {
57 OperatorId::from("vortex.metrics")
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 fn dtype(&self) -> &DType {
65 self.inner.dtype()
66 }
67
68 fn bounds(&self) -> LengthBounds {
69 self.inner.bounds()
70 }
71
72 fn children(&self) -> &[OperatorRef] {
73 self.inner.children()
74 }
75
76 fn with_children(self: Arc<Self>, children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
77 Ok(Arc::new(MetricsOperator {
78 inner: self.inner.clone().with_children(children)?,
79 metrics: self.metrics.clone(),
80 }))
81 }
82
83 fn as_batch(&self) -> Option<&dyn BatchOperator> {
84 self.inner.as_batch().is_some().then_some(self)
85 }
86
87 fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
88 self.inner.as_pipelined().is_some().then_some(self)
90 }
91}
92
93impl BatchOperator for MetricsOperator {
94 fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef> {
95 let inner = self.inner.as_batch().vortex_expect("checked").bind(ctx)?;
96 let timer = self.metrics.timer("operator.batch.execute");
97 Ok(Box::new(MetricsBatchExecution { inner, timer }))
98 }
99}
100
101struct MetricsBatchExecution {
102 inner: BatchExecutionRef,
103 timer: Arc<Timer>,
104}
105
106#[async_trait]
107impl BatchExecution for MetricsBatchExecution {
108 async fn execute(self: Box<Self>) -> VortexResult<Canonical> {
109 let _timer = self.timer.time();
110 self.inner.execute().await
111 }
112}
113
114impl PipelinedOperator for MetricsOperator {
115 fn row_selection(&self) -> RowSelection {
116 self.inner
117 .as_pipelined()
118 .vortex_expect("checked")
119 .row_selection()
120 }
121
122 fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
123 let inner = self
124 .inner
125 .as_pipelined()
126 .vortex_expect("checked")
127 .bind(ctx)?;
128 let timer = self.metrics.timer("operator.operator.step");
129 Ok(Box::new(MetricsKernel { inner, timer }))
130 }
131
132 fn vector_children(&self) -> Vec<usize> {
133 self.inner
134 .as_pipelined()
135 .vortex_expect("checked")
136 .vector_children()
137 }
138
139 fn batch_children(&self) -> Vec<usize> {
140 self.inner
141 .as_pipelined()
142 .vortex_expect("checked")
143 .batch_children()
144 }
145}
146
147struct MetricsKernel {
148 inner: Box<dyn Kernel>,
149 timer: Arc<Timer>,
150}
151
152impl Kernel for MetricsKernel {
153 fn step(
154 &self,
155 ctx: &KernelContext,
156 chunk_idx: usize,
157 selection: &BitView,
158 out: &mut ViewMut,
159 ) -> VortexResult<()> {
160 let _timer = self.timer.time();
161 self.inner.step(ctx, chunk_idx, selection, out)
162 }
163}