Module metrics

Module metrics 

Source
Expand description

Metrics instrumentation for Rigatoni pipeline observability.

This module provides comprehensive metrics collection for monitoring Rigatoni pipelines in production. It uses the metrics crate which supports multiple exporters (Prometheus, StatsD, etc.).

§Metric Types

  • Counters: Monotonically increasing values (events processed, errors)
  • Histograms: Value distributions (batch sizes, latencies)
  • Gauges: Point-in-time values (active collections, queue depth)

§Naming Conventions

All metrics follow Prometheus naming conventions:

  • Use underscores (not hyphens or camelCase)
  • Include unit suffix (_seconds, _bytes, _total)
  • Prefix with application name (rigatoni_)
  • Counter metrics end with _total

§Labels

Labels add dimensionality but increase cardinality. Use sparingly:

  • collection: MongoDB collection name (low cardinality)
  • destination_type: Destination type like “s3”, “bigquery” (very low cardinality)
  • error_type: Error category (low cardinality, max ~20 types)
  • operation: Operation type like “insert”, “update”, “delete” (very low cardinality)

⚠️ Cardinality Warning: Never use high-cardinality values as labels:

  • Document IDs
  • Timestamps
  • User IDs
  • Full error messages

§Performance Considerations

The metrics crate is designed for low overhead:

  • Lock-free atomic operations for counters
  • Thread-local histograms with periodic aggregation
  • No-op recorder when metrics are disabled
  • Typical overhead: <1μs per metric call

§Examples

§Recording Events

use rigatoni_core::metrics;

// Increment counter
metrics::increment_events_processed("users", "insert");

// Record histogram
metrics::record_batch_size(150, "orders");

// Update gauge
metrics::set_active_collections(3);

§Measuring Duration

use rigatoni_core::metrics;
use std::time::Instant;

let start = Instant::now();
// ... do work ...
metrics::record_batch_duration(start.elapsed().as_secs_f64(), "products");

§Error Tracking

use rigatoni_core::metrics::{self, ErrorCategory};

metrics::increment_events_failed("users", ErrorCategory::Serialization);
metrics::increment_retries(ErrorCategory::Timeout);

Structs§

Timer
Helper for timing operations and automatically recording the duration.

Enums§

ErrorCategory
Error categories for consistent metric labeling.
PipelineStatus
Pipeline status for the pipeline_status gauge.

Functions§

decrement_batch_queue_size
Decrements the batch queue size by a specific amount.
increment_batch_queue_size
Increments the batch queue size by 1.
increment_batches_written
Increments the count of successfully written batches.
increment_destination_errors
Increments the count of destination write errors.
increment_events_failed
Increments the count of failed events.
increment_events_processed
Increments the count of successfully processed events.
increment_events_processed_by
Increments the count of successfully processed events by a specific amount.
increment_retries
Increments the retry counter.
init_metrics
Initializes metric descriptions for documentation and introspection.
record_batch_duration
Records the duration of batch processing.
record_batch_size
Records a batch size.
record_change_stream_lag
Records change stream lag (approximation).
record_destination_write_bytes
Records the size of data written to a destination.
record_destination_write_duration
Records the duration of a destination write operation.
set_active_collections
Sets the number of active collections being monitored.
set_batch_queue_size
Sets the current batch queue size.
set_pipeline_status
Sets the pipeline status.