vortex_array/operator/
metrics.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::hash::{Hash, Hasher};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect, VortexResult};
12use vortex_metrics::{Timer, VortexMetrics};
13
14use crate::Canonical;
15use crate::operator::{
16 BatchBindCtx, BatchExecution, BatchExecutionRef, BatchOperator, Operator, OperatorEq,
17 OperatorHash, OperatorId, OperatorRef,
18};
19use crate::pipeline::view::ViewMut;
20use crate::pipeline::{BindContext, Kernel, KernelContext, PipelinedOperator};
21
22#[derive(Debug)]
24pub struct MetricsOperator {
25 inner: OperatorRef,
26 metrics: VortexMetrics,
27}
28
29impl OperatorHash for MetricsOperator {
30 fn operator_hash<H: Hasher>(&self, state: &mut H) {
31 self.inner.operator_hash(state);
32 self.id().hash(state);
34 }
35}
36
37impl OperatorEq for MetricsOperator {
38 fn operator_eq(&self, other: &Self) -> bool {
39 self.inner.operator_eq(&other.inner)
40 }
41}
42
43impl MetricsOperator {
44 pub fn new(inner: OperatorRef, metrics: VortexMetrics) -> Self {
45 let metrics = metrics.child_with_tags([("operator", inner.id().as_ref().to_string())]);
46 Self { inner, metrics }
47 }
48
49 pub fn metrics(&self) -> &VortexMetrics {
50 &self.metrics
51 }
52}
53
54impl Operator for MetricsOperator {
55 fn id(&self) -> OperatorId {
56 OperatorId::from("vortex.metrics")
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn dtype(&self) -> &DType {
64 self.inner.dtype()
65 }
66
67 fn len(&self) -> usize {
68 self.inner.len()
69 }
70
71 fn children(&self) -> &[OperatorRef] {
72 self.inner.children()
73 }
74
75 fn with_children(self: Arc<Self>, children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
76 Ok(Arc::new(MetricsOperator {
77 inner: self.inner.clone().with_children(children)?,
78 metrics: self.metrics.clone(),
79 }))
80 }
81
82 fn as_batch(&self) -> Option<&dyn BatchOperator> {
83 self.inner.as_batch().is_some().then_some(self)
84 }
85
86 fn as_pipelined(&self) -> Option<&dyn PipelinedOperator> {
87 self.inner.as_pipelined().is_some().then_some(self)
89 }
90}
91
92impl BatchOperator for MetricsOperator {
93 fn bind(&self, ctx: &mut dyn BatchBindCtx) -> VortexResult<BatchExecutionRef> {
94 let inner = self.inner.as_batch().vortex_expect("checked").bind(ctx)?;
95 let timer = self.metrics.timer("operator.batch.execute");
96 Ok(Box::new(MetricsBatchExecution { inner, timer }))
97 }
98}
99
100struct MetricsBatchExecution {
101 inner: BatchExecutionRef,
102 timer: Arc<Timer>,
103}
104
105#[async_trait]
106impl BatchExecution for MetricsBatchExecution {
107 async fn execute(self: Box<Self>) -> VortexResult<Canonical> {
108 let _timer = self.timer.time();
109 self.inner.execute().await
110 }
111}
112
113impl PipelinedOperator for MetricsOperator {
114 fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
115 let inner = self
116 .inner
117 .as_pipelined()
118 .vortex_expect("checked")
119 .bind(ctx)?;
120 let timer = self.metrics.timer("operator.operator.step");
121 Ok(Box::new(MetricsKernel { inner, timer }))
122 }
123
124 fn vector_children(&self) -> Vec<usize> {
125 self.inner
126 .as_pipelined()
127 .vortex_expect("checked")
128 .vector_children()
129 }
130
131 fn batch_children(&self) -> Vec<usize> {
132 self.inner
133 .as_pipelined()
134 .vortex_expect("checked")
135 .batch_children()
136 }
137}
138
139struct MetricsKernel {
140 inner: Box<dyn Kernel>,
141 timer: Arc<Timer>,
142}
143
144impl Kernel for MetricsKernel {
145 fn step(&mut self, ctx: &KernelContext, out: &mut ViewMut) -> VortexResult<()> {
146 let _timer = self.timer.time();
147 self.inner.step(ctx, out)
148 }
149}