datafusion-physical-expr-common 53.1.0

Common functionality of physical expression for DataFusion query engine
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Builder for creating arbitrary metrics

use std::{borrow::Cow, sync::Arc};

use crate::metrics::{
    MetricType,
    value::{PruningMetrics, RatioMergeStrategy, RatioMetrics},
};

use super::{
    Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
};

/// Structure for constructing metrics, counters, timers, etc.
///
/// Note the use of `Cow<..>` is to avoid allocations in the common
/// case of constant strings
///
/// ```rust
/// use datafusion_physical_expr_common::metrics::*;
///
/// let metrics = ExecutionPlanMetricsSet::new();
/// let partition = 1;
///
/// // Create the standard output_rows metric
/// let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
///
/// // Create a operator specific counter with some labels
/// let num_bytes = MetricBuilder::new(&metrics)
///     .with_new_label("filename", "my_awesome_file.parquet")
///     .counter("num_bytes", partition);
/// ```
pub struct MetricBuilder<'a> {
    /// Location that the metric created by this builder will be added do
    metrics: &'a ExecutionPlanMetricsSet,

    /// optional partition number
    partition: Option<usize>,

    /// arbitrary name=value pairs identifying this metric
    labels: Vec<Label>,

    /// The type controlling the verbosity/category for this builder
    /// See comments in [`MetricType`] for details
    metric_type: MetricType,
}

impl<'a> MetricBuilder<'a> {
    /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics`
    ///
    /// `self.metric_type` controls when such metric is displayed. See comments in
    /// [`MetricType`] for details.
    pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
        Self {
            metrics,
            partition: None,
            labels: vec![],
            metric_type: MetricType::DEV,
        }
    }

    /// Add a label to the metric being constructed
    pub fn with_label(mut self, label: Label) -> Self {
        self.labels.push(label);
        self
    }

    /// Set the metric type to the metric being constructed
    pub fn with_type(mut self, metric_type: MetricType) -> Self {
        self.metric_type = metric_type;
        self
    }

    /// Add a label to the metric being constructed
    pub fn with_new_label(
        self,
        name: impl Into<Cow<'static, str>>,
        value: impl Into<Cow<'static, str>>,
    ) -> Self {
        self.with_label(Label::new(name.into(), value.into()))
    }

    /// Set the partition of the metric being constructed
    pub fn with_partition(mut self, partition: usize) -> Self {
        self.partition = Some(partition);
        self
    }

    /// Consume self and create a metric of the specified value
    /// registered with the MetricsSet
    pub fn build(self, value: MetricValue) {
        let Self {
            labels,
            partition,
            metrics,
            metric_type,
        } = self;
        let metric = Arc::new(
            Metric::new_with_labels(value, partition, labels).with_type(metric_type),
        );
        metrics.register(metric);
    }

    /// Consume self and create a new counter for recording output rows
    pub fn output_rows(self, partition: usize) -> Count {
        let count = Count::new();
        self.with_partition(partition)
            .build(MetricValue::OutputRows(count.clone()));
        count
    }

    /// Consume self and create a new counter for recording the number of spills
    /// triggered by an operator
    pub fn spill_count(self, partition: usize) -> Count {
        let count = Count::new();
        self.with_partition(partition)
            .build(MetricValue::SpillCount(count.clone()));
        count
    }

    /// Consume self and create a new counter for recording the total spilled bytes
    /// triggered by an operator
    pub fn spilled_bytes(self, partition: usize) -> Count {
        let count = Count::new();
        self.with_partition(partition)
            .build(MetricValue::SpilledBytes(count.clone()));
        count
    }

    /// Consume self and create a new counter for recording the total spilled rows
    /// triggered by an operator
    pub fn spilled_rows(self, partition: usize) -> Count {
        let count = Count::new();
        self.with_partition(partition)
            .build(MetricValue::SpilledRows(count.clone()));
        count
    }

    /// Consume self and create a new counter for recording total output bytes
    pub fn output_bytes(self, partition: usize) -> Count {
        let count = Count::new();
        self.with_partition(partition)
            .build(MetricValue::OutputBytes(count.clone()));
        count
    }

    /// Consume self and create a new counter for recording total output batches
    pub fn output_batches(self, partition: usize) -> Count {
        let count = Count::new();
        self.with_partition(partition)
            .build(MetricValue::OutputBatches(count.clone()));
        count
    }

    /// Consume self and create a new gauge for reporting current memory usage
    pub fn mem_used(self, partition: usize) -> Gauge {
        let gauge = Gauge::new();
        self.with_partition(partition)
            .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
        gauge
    }

    /// Consumes self and creates a new [`Count`] for recording some
    /// arbitrary metric of an operator.
    pub fn counter(
        self,
        counter_name: impl Into<Cow<'static, str>>,
        partition: usize,
    ) -> Count {
        self.with_partition(partition).global_counter(counter_name)
    }

    /// Consumes self and creates a new [`Gauge`] for reporting some
    /// arbitrary metric of an operator.
    pub fn gauge(
        self,
        gauge_name: impl Into<Cow<'static, str>>,
        partition: usize,
    ) -> Gauge {
        self.with_partition(partition).global_gauge(gauge_name)
    }

    /// Consumes self and creates a new [`Count`] for recording a
    /// metric of an overall operator (not per partition)
    pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
        let count = Count::new();
        self.build(MetricValue::Count {
            name: counter_name.into(),
            count: count.clone(),
        });
        count
    }

    /// Consumes self and creates a new [`Gauge`] for reporting a
    /// metric of an overall operator (not per partition)
    pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
        let gauge = Gauge::new();
        self.build(MetricValue::Gauge {
            name: gauge_name.into(),
            gauge: gauge.clone(),
        });
        gauge
    }

    /// Consume self and create a new Timer for recording the elapsed
    /// CPU time spent by an operator
    pub fn elapsed_compute(self, partition: usize) -> Time {
        let time = Time::new();
        self.with_partition(partition)
            .build(MetricValue::ElapsedCompute(time.clone()));
        time
    }

    /// Consumes self and creates a new Timer for recording some
    /// subset of an operators execution time.
    pub fn subset_time(
        self,
        subset_name: impl Into<Cow<'static, str>>,
        partition: usize,
    ) -> Time {
        let time = Time::new();
        self.with_partition(partition).build(MetricValue::Time {
            name: subset_name.into(),
            time: time.clone(),
        });
        time
    }

    /// Consumes self and creates a new Timestamp for recording the
    /// starting time of execution for a partition
    pub fn start_timestamp(self, partition: usize) -> Timestamp {
        let timestamp = Timestamp::new();
        self.with_partition(partition)
            .build(MetricValue::StartTimestamp(timestamp.clone()));
        timestamp
    }

    /// Consumes self and creates a new Timestamp for recording the
    /// ending time of execution for a partition
    pub fn end_timestamp(self, partition: usize) -> Timestamp {
        let timestamp = Timestamp::new();
        self.with_partition(partition)
            .build(MetricValue::EndTimestamp(timestamp.clone()));
        timestamp
    }

    /// Consumes self and creates a new `PruningMetrics`
    pub fn pruning_metrics(
        self,
        name: impl Into<Cow<'static, str>>,
        partition: usize,
    ) -> PruningMetrics {
        let pruning_metrics = PruningMetrics::new();
        self.with_partition(partition)
            .build(MetricValue::PruningMetrics {
                name: name.into(),
                // inner values will be `Arc::clone()`
                pruning_metrics: pruning_metrics.clone(),
            });
        pruning_metrics
    }

    /// Consumes self and creates a new [`RatioMetrics`]
    pub fn ratio_metrics(
        self,
        name: impl Into<Cow<'static, str>>,
        partition: usize,
    ) -> RatioMetrics {
        self.ratio_metrics_with_strategy(name, partition, RatioMergeStrategy::default())
    }

    /// Consumes self and creates a new [`RatioMetrics`] with a specific merge strategy
    pub fn ratio_metrics_with_strategy(
        self,
        name: impl Into<Cow<'static, str>>,
        partition: usize,
        merge_strategy: RatioMergeStrategy,
    ) -> RatioMetrics {
        let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy);
        self.with_partition(partition).build(MetricValue::Ratio {
            name: name.into(),
            ratio_metrics: ratio_metrics.clone(),
        });
        ratio_metrics
    }
}