Expand description
Metrics instrumentation for Rigatoni pipeline observability.
This module provides comprehensive metrics collection for monitoring
Rigatoni pipelines in production. It uses the metrics crate which
supports multiple exporters (Prometheus, StatsD, etc.).
§Metric Types
- Counters: Monotonically increasing values (events processed, errors)
- Histograms: Value distributions (batch sizes, latencies)
- Gauges: Point-in-time values (active collections, queue depth)
§Naming Conventions
All metrics follow Prometheus naming conventions:
- Use underscores (not hyphens or camelCase)
- Include unit suffix (_seconds, _bytes, _total)
- Prefix with application name (rigatoni_)
- Counter metrics end with _total
§Labels
Labels add dimensionality but increase cardinality. Use sparingly:
- collection: MongoDB collection name (low cardinality)
- destination_type: Destination type like “s3”, “bigquery” (very low cardinality)
- error_type: Error category (low cardinality, max ~20 types)
- operation: Operation type like “insert”, “update”, “delete” (very low cardinality)
⚠️ Cardinality Warning: Never use high-cardinality values as labels:
- Document IDs
- Timestamps
- User IDs
- Full error messages
§Performance Considerations
The metrics crate is designed for low overhead:
- Lock-free atomic operations for counters
- Thread-local histograms with periodic aggregation
- No-op recorder when metrics are disabled
- Typical overhead: <1μs per metric call
§Examples
§Recording Events
use rigatoni_core::metrics;
// Increment counter
metrics::increment_events_processed("users", "insert");
// Record histogram
metrics::record_batch_size(150, "orders");
// Update gauge
metrics::set_active_collections(3);§Measuring Duration
use rigatoni_core::metrics;
use std::time::Instant;
let start = Instant::now();
// ... do work ...
metrics::record_batch_duration(start.elapsed().as_secs_f64(), "products");§Error Tracking
use rigatoni_core::metrics::{self, ErrorCategory};
metrics::increment_events_failed("users", ErrorCategory::Serialization);
metrics::increment_retries(ErrorCategory::Timeout);Structs§
- Timer
- Helper for timing operations and automatically recording the duration.
Enums§
- Error
Category - Error categories for consistent metric labeling.
- Pipeline
Status - Pipeline status for the
pipeline_statusgauge.
Functions§
- decrement_
batch_ queue_ size - Decrements the batch queue size by a specific amount.
- increment_
batch_ queue_ size - Increments the batch queue size by 1.
- increment_
batches_ written - Increments the count of successfully written batches.
- increment_
destination_ errors - Increments the count of destination write errors.
- increment_
events_ failed - Increments the count of failed events.
- increment_
events_ processed - Increments the count of successfully processed events.
- increment_
events_ processed_ by - Increments the count of successfully processed events by a specific amount.
- increment_
retries - Increments the retry counter.
- init_
metrics - Initializes metric descriptions for documentation and introspection.
- record_
batch_ duration - Records the duration of batch processing.
- record_
batch_ size - Records a batch size.
- record_
change_ stream_ lag - Records change stream lag (approximation).
- record_
destination_ write_ bytes - Records the size of data written to a destination.
- record_
destination_ write_ duration - Records the duration of a destination write operation.
- set_
active_ collections - Sets the number of active collections being monitored.
- set_
batch_ queue_ size - Sets the current batch queue size.
- set_
pipeline_ status - Sets the pipeline status.