pollen-rs 0.1.0

Embedded decentralized distributed task scheduler
Documentation
//! Metrics collection for Pollen scheduler.
//!
//! This module provides Prometheus-compatible metrics for monitoring
//! Pollen scheduler performance and health.
//!
//! # Features
//!
//! Enable the `metrics` feature to use this module:
//!
//! ```toml
//! [dependencies]
//! pollen = { version = "0.1", features = ["metrics"] }
//! ```

use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use std::sync::Arc;

/// Labels for task metrics.
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct TaskLabels {
    /// Task name.
    pub task_name: String,
    /// Node ID (truncated).
    pub node_id: String,
}

/// Labels for cluster metrics.
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct ClusterLabels {
    /// Cluster name.
    pub cluster_name: String,
    /// Node ID (truncated).
    pub node_id: String,
}

/// Pollen metrics collector.
#[derive(Clone)]
pub struct PollenMetrics {
    /// Total tasks registered.
    pub tasks_registered: Counter,
    /// Total task executions.
    pub task_executions_total: Counter,
    /// Successful task executions.
    pub task_executions_success: Counter,
    /// Failed task executions.
    pub task_executions_failed: Counter,
    /// Task execution duration histogram (seconds).
    pub task_execution_duration: Histogram,
    /// Currently running tasks.
    pub tasks_running: Gauge,
    /// Pending task instances.
    pub tasks_pending: Gauge,
    /// Cluster size (number of alive nodes).
    pub cluster_size: Gauge,
    /// Claim attempts.
    pub claim_attempts_total: Counter,
    /// Successful claims.
    pub claim_success_total: Counter,
    /// Anti-entropy sync rounds.
    pub sync_rounds_total: Counter,
    /// Membership events.
    pub membership_events_total: Counter,
}

impl PollenMetrics {
    /// Create a new metrics collector and register with the given registry.
    pub fn new(registry: &mut Registry) -> Self {
        let metrics = Self {
            tasks_registered: Counter::default(),
            task_executions_total: Counter::default(),
            task_executions_success: Counter::default(),
            task_executions_failed: Counter::default(),
            task_execution_duration: Histogram::new(exponential_buckets(0.001, 2.0, 15)),
            tasks_running: Gauge::default(),
            tasks_pending: Gauge::default(),
            cluster_size: Gauge::default(),
            claim_attempts_total: Counter::default(),
            claim_success_total: Counter::default(),
            sync_rounds_total: Counter::default(),
            membership_events_total: Counter::default(),
        };

        // Register metrics
        registry.register(
            "pollen_tasks_registered",
            "Total number of tasks registered",
            metrics.tasks_registered.clone(),
        );
        registry.register(
            "pollen_task_executions_total",
            "Total number of task executions",
            metrics.task_executions_total.clone(),
        );
        registry.register(
            "pollen_task_executions_success",
            "Total number of successful task executions",
            metrics.task_executions_success.clone(),
        );
        registry.register(
            "pollen_task_executions_failed",
            "Total number of failed task executions",
            metrics.task_executions_failed.clone(),
        );
        registry.register(
            "pollen_task_execution_duration_seconds",
            "Task execution duration in seconds",
            metrics.task_execution_duration.clone(),
        );
        registry.register(
            "pollen_tasks_running",
            "Number of currently running tasks",
            metrics.tasks_running.clone(),
        );
        registry.register(
            "pollen_tasks_pending",
            "Number of pending task instances",
            metrics.tasks_pending.clone(),
        );
        registry.register(
            "pollen_cluster_size",
            "Number of alive nodes in the cluster",
            metrics.cluster_size.clone(),
        );
        registry.register(
            "pollen_claim_attempts_total",
            "Total number of claim attempts",
            metrics.claim_attempts_total.clone(),
        );
        registry.register(
            "pollen_claim_success_total",
            "Total number of successful claims",
            metrics.claim_success_total.clone(),
        );
        registry.register(
            "pollen_sync_rounds_total",
            "Total number of anti-entropy sync rounds",
            metrics.sync_rounds_total.clone(),
        );
        registry.register(
            "pollen_membership_events_total",
            "Total number of membership events",
            metrics.membership_events_total.clone(),
        );

        metrics
    }

    /// Record a task registration.
    pub fn record_task_registered(&self) {
        self.tasks_registered.inc();
    }

    /// Record a task execution start.
    pub fn record_execution_start(&self) {
        self.task_executions_total.inc();
        self.tasks_running.inc();
    }

    /// Record a task execution completion.
    pub fn record_execution_complete(&self, success: bool, duration_secs: f64) {
        self.tasks_running.dec();
        self.task_execution_duration.observe(duration_secs);
        if success {
            self.task_executions_success.inc();
        } else {
            self.task_executions_failed.inc();
        }
    }

    /// Update the cluster size gauge.
    pub fn set_cluster_size(&self, size: i64) {
        self.cluster_size.set(size);
    }

    /// Update the pending tasks gauge.
    pub fn set_pending_tasks(&self, count: i64) {
        self.tasks_pending.set(count);
    }

    /// Record a claim attempt.
    pub fn record_claim_attempt(&self, success: bool) {
        self.claim_attempts_total.inc();
        if success {
            self.claim_success_total.inc();
        }
    }

    /// Record a sync round.
    pub fn record_sync_round(&self) {
        self.sync_rounds_total.inc();
    }

    /// Record a membership event.
    pub fn record_membership_event(&self) {
        self.membership_events_total.inc();
    }
}

/// Shared metrics handle.
pub type SharedMetrics = Arc<PollenMetrics>;

/// Create a new metrics registry with Pollen metrics.
pub fn create_metrics_registry() -> (Registry, SharedMetrics) {
    let mut registry = Registry::default();
    let metrics = Arc::new(PollenMetrics::new(&mut registry));
    (registry, metrics)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_metrics_creation() {
        let (_, metrics) = create_metrics_registry();

        metrics.record_task_registered();
        metrics.record_execution_start();
        metrics.record_execution_complete(true, 0.5);
        metrics.set_cluster_size(3);
        metrics.record_claim_attempt(true);
        metrics.record_sync_round();
        metrics.record_membership_event();
    }
}