rigatoni_core/metrics.rs
1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Metrics instrumentation for Rigatoni pipeline observability.
18//!
19//! This module provides comprehensive metrics collection for monitoring
20//! Rigatoni pipelines in production. It uses the `metrics` crate which
21//! supports multiple exporters (Prometheus, StatsD, etc.).
22//!
23//! # Metric Types
24//!
25//! - **Counters**: Monotonically increasing values (events processed, errors)
26//! - **Histograms**: Value distributions (batch sizes, latencies)
27//! - **Gauges**: Point-in-time values (active collections, queue depth)
28//!
29//! # Naming Conventions
30//!
31//! All metrics follow Prometheus naming conventions:
32//! - Use underscores (not hyphens or camelCase)
33//! - Include unit suffix (\_seconds, \_bytes, \_total)
34//! - Prefix with application name (rigatoni\_)
35//! - Counter metrics end with \_total
36//!
37//! # Labels
38//!
39//! Labels add dimensionality but increase cardinality. Use sparingly:
40//! - **collection**: MongoDB collection name (low cardinality)
41//! - **destination\_type**: Destination type like "s3", "bigquery" (very low cardinality)
42//! - **error\_type**: Error category (low cardinality, max ~20 types)
43//! - **operation**: Operation type like "insert", "update", "delete" (very low cardinality)
44//!
45//! ⚠️ **Cardinality Warning**: Never use high-cardinality values as labels:
46//! - Document IDs
47//! - Timestamps
48//! - User IDs
49//! - Full error messages
50//!
51//! # Performance Considerations
52//!
53//! The `metrics` crate is designed for low overhead:
54//! - Lock-free atomic operations for counters
55//! - Thread-local histograms with periodic aggregation
56//! - No-op recorder when metrics are disabled
57//! - Typical overhead: <1μs per metric call
58//!
59//! # Examples
60//!
61//! ## Recording Events
62//!
63//! ```rust
64//! use rigatoni_core::metrics;
65//!
66//! // Increment counter
67//! metrics::increment_events_processed("users", "insert");
68//!
69//! // Record histogram
70//! metrics::record_batch_size(150, "orders");
71//!
72//! // Update gauge
73//! metrics::set_active_collections(3);
74//! ```
75//!
76//! ## Measuring Duration
77//!
78//! ```rust
79//! use rigatoni_core::metrics;
80//! use std::time::Instant;
81//!
82//! let start = Instant::now();
83//! // ... do work ...
84//! metrics::record_batch_duration(start.elapsed().as_secs_f64(), "products");
85//! ```
86//!
87//! ## Error Tracking
88//!
89//! ```rust
90//! use rigatoni_core::metrics::{self, ErrorCategory};
91//!
92//! metrics::increment_events_failed("users", ErrorCategory::Serialization);
93//! metrics::increment_retries(ErrorCategory::Timeout);
94//! ```
95
96use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
97use std::time::Duration;
98
99/// Metric name prefix for all Rigatoni metrics.
100#[doc(hidden)]
101pub const METRIC_PREFIX: &str = "rigatoni";
102
103// ============================================================================
104// Metric Name Constants
105// ============================================================================
106
107/// Total number of events successfully processed through the pipeline.
108///
109/// Type: Counter
110/// Labels: collection, operation
111#[doc(hidden)]
112pub const EVENTS_PROCESSED_TOTAL: &str = "rigatoni_events_processed_total";
113
114/// Total number of events that failed processing.
115///
116/// Type: Counter
117/// Labels: collection, error_type
118const EVENTS_FAILED_TOTAL: &str = "rigatoni_events_failed_total";
119
120/// Total number of retry attempts across all operations.
121///
122/// Type: Counter
123/// Labels: error_type
124const RETRIES_TOTAL: &str = "rigatoni_retries_total";
125
126/// Distribution of batch sizes sent to destinations.
127///
128/// Type: Histogram
129/// Labels: collection
130/// Unit: events
131const BATCH_SIZE: &str = "rigatoni_batch_size";
132
133/// Time taken to process a batch (from accumulation to write).
134///
135/// Type: Histogram
136/// Labels: collection
137/// Unit: seconds
138#[doc(hidden)]
139pub const BATCH_DURATION_SECONDS: &str = "rigatoni_batch_duration_seconds";
140
141/// Time taken for destination write operations.
142///
143/// Type: Histogram
144/// Labels: destination_type
145/// Unit: seconds
146const DESTINATION_WRITE_DURATION_SECONDS: &str = "rigatoni_destination_write_duration_seconds";
147
148/// Number of collections currently being monitored.
149///
150/// Type: Gauge
151/// Unit: count
152const ACTIVE_COLLECTIONS: &str = "rigatoni_active_collections";
153
154/// Current pipeline status (0=stopped, 1=running, 2=error).
155///
156/// Type: Gauge
157/// Unit: status code
158const PIPELINE_STATUS: &str = "rigatoni_pipeline_status";
159
160/// Number of events currently buffered in batch queue.
161///
162/// Type: Gauge
163/// Labels: collection
164/// Unit: events
165const BATCH_QUEUE_SIZE: &str = "rigatoni_batch_queue_size";
166
167/// Total number of batches successfully written to destinations.
168///
169/// Type: Counter
170/// Labels: destination_type
171const BATCHES_WRITTEN_TOTAL: &str = "rigatoni_batches_written_total";
172
173/// Total number of destination write errors.
174///
175/// Type: Counter
176/// Labels: destination_type, error_type
177const DESTINATION_WRITE_ERRORS_TOTAL: &str = "rigatoni_destination_write_errors_total";
178
179/// Size of data written to destination (compressed if applicable).
180///
181/// Type: Histogram
182/// Labels: destination_type
183/// Unit: bytes
184const DESTINATION_WRITE_BYTES: &str = "rigatoni_destination_write_bytes";
185
186/// Time taken for change stream to receive an event.
187///
188/// Type: Histogram
189/// Labels: collection
190/// Unit: seconds
191const CHANGE_STREAM_LAG_SECONDS: &str = "rigatoni_change_stream_lag_seconds";
192
193// ============================================================================
194// Initialization
195// ============================================================================
196
197/// Initializes metric descriptions for documentation and introspection.
198///
199/// This should be called once at application startup, before recording any metrics.
200/// It provides human-readable descriptions for metrics exporters like Prometheus.
201///
202/// # Examples
203///
204/// ```rust
205/// use rigatoni_core::metrics;
206///
207/// metrics::init_metrics();
208/// // ... start pipeline ...
209/// ```
210pub fn init_metrics() {
211 // Counters
212 describe_counter!(
213 EVENTS_PROCESSED_TOTAL,
214 "Total number of change stream events successfully processed through the pipeline"
215 );
216
217 describe_counter!(
218 EVENTS_FAILED_TOTAL,
219 "Total number of events that failed processing due to errors"
220 );
221
222 describe_counter!(
223 RETRIES_TOTAL,
224 "Total number of retry attempts for failed operations"
225 );
226
227 describe_counter!(
228 BATCHES_WRITTEN_TOTAL,
229 "Total number of batches successfully written to destinations"
230 );
231
232 describe_counter!(
233 DESTINATION_WRITE_ERRORS_TOTAL,
234 "Total number of errors writing to destinations"
235 );
236
237 // Histograms
238 describe_histogram!(
239 BATCH_SIZE,
240 metrics::Unit::Count,
241 "Distribution of batch sizes (number of events per batch)"
242 );
243
244 describe_histogram!(
245 BATCH_DURATION_SECONDS,
246 metrics::Unit::Seconds,
247 "Time taken to process a batch from accumulation to successful write"
248 );
249
250 describe_histogram!(
251 DESTINATION_WRITE_DURATION_SECONDS,
252 metrics::Unit::Seconds,
253 "Time taken for destination write operations (including retries)"
254 );
255
256 describe_histogram!(
257 DESTINATION_WRITE_BYTES,
258 metrics::Unit::Bytes,
259 "Size of data written to destination (compressed if applicable)"
260 );
261
262 describe_histogram!(
263 CHANGE_STREAM_LAG_SECONDS,
264 metrics::Unit::Seconds,
265 "Time between MongoDB operation and change stream receipt (approximation)"
266 );
267
268 // Gauges
269 describe_gauge!(
270 ACTIVE_COLLECTIONS,
271 metrics::Unit::Count,
272 "Number of MongoDB collections currently being monitored"
273 );
274
275 describe_gauge!(
276 PIPELINE_STATUS,
277 "Current pipeline status: 0=stopped, 1=running, 2=error"
278 );
279
280 describe_gauge!(
281 BATCH_QUEUE_SIZE,
282 metrics::Unit::Count,
283 "Number of events currently buffered awaiting batch write"
284 );
285}
286
287// ============================================================================
288// Counter Metrics
289// ============================================================================
290
291/// Increments the count of successfully processed events.
292///
293/// This should be called after a batch is successfully written to a destination
294/// and the resume token is saved.
295///
296/// # Arguments
297///
298/// * `collection` - MongoDB collection name
299/// * `operation` - Operation type: "insert", "update", "delete", "replace", etc.
300///
301/// # Examples
302///
303/// ```rust
304/// use rigatoni_core::metrics;
305///
306/// metrics::increment_events_processed("users", "insert");
307/// ```
308pub fn increment_events_processed(collection: &str, operation: &str) {
309 counter!(EVENTS_PROCESSED_TOTAL, "collection" => collection.to_string(), "operation" => operation.to_string())
310 .increment(1);
311}
312
313/// Increments the count of successfully processed events by a specific amount.
314///
315/// Use this when processing a batch of events at once.
316///
317/// # Arguments
318///
319/// * `count` - Number of events processed
320/// * `collection` - MongoDB collection name
321/// * `operation` - Operation type
322///
323/// # Examples
324///
325/// ```rust
326/// use rigatoni_core::metrics;
327///
328/// // After writing a batch of 150 events
329/// metrics::increment_events_processed_by(150, "orders", "insert");
330/// ```
331pub fn increment_events_processed_by(count: u64, collection: &str, operation: &str) {
332 counter!(EVENTS_PROCESSED_TOTAL, "collection" => collection.to_string(), "operation" => operation.to_string())
333 .increment(count);
334}
335
336/// Increments the count of failed events.
337///
338/// This should be called when an event fails processing due to an error.
339///
340/// # Arguments
341///
342/// * `collection` - MongoDB collection name
343/// * `error_category` - Category of error that occurred
344///
345/// # Examples
346///
347/// ```rust
348/// use rigatoni_core::metrics::{self, ErrorCategory};
349///
350/// metrics::increment_events_failed("users", ErrorCategory::Serialization);
351/// ```
352pub fn increment_events_failed(collection: &str, error_category: ErrorCategory) {
353 counter!(EVENTS_FAILED_TOTAL, "collection" => collection.to_string(), "error_type" => error_category.as_str())
354 .increment(1);
355}
356
357/// Increments the retry counter.
358///
359/// This should be called each time an operation is retried.
360///
361/// # Arguments
362///
363/// * `error_category` - Category of error that triggered the retry
364///
365/// # Examples
366///
367/// ```rust
368/// use rigatoni_core::metrics::{self, ErrorCategory};
369///
370/// metrics::increment_retries(ErrorCategory::Timeout);
371/// ```
372pub fn increment_retries(error_category: ErrorCategory) {
373 counter!(RETRIES_TOTAL, "error_type" => error_category.as_str()).increment(1);
374}
375
376/// Increments the count of successfully written batches.
377///
378/// # Arguments
379///
380/// * `destination_type` - Type of destination (e.g., "s3", "bigquery", "kafka")
381///
382/// # Examples
383///
384/// ```rust
385/// use rigatoni_core::metrics;
386///
387/// metrics::increment_batches_written("s3");
388/// ```
389pub fn increment_batches_written(destination_type: &str) {
390 counter!(BATCHES_WRITTEN_TOTAL, "destination_type" => destination_type.to_string())
391 .increment(1);
392}
393
394/// Increments the count of destination write errors.
395///
396/// # Arguments
397///
398/// * `destination_type` - Type of destination
399/// * `error_type` - Error category
400///
401/// # Examples
402///
403/// ```rust
404/// use rigatoni_core::metrics::{self, ErrorCategory};
405///
406/// metrics::increment_destination_errors("s3", ErrorCategory::Timeout);
407/// ```
408pub fn increment_destination_errors(destination_type: &str, error_category: ErrorCategory) {
409 counter!(
410 DESTINATION_WRITE_ERRORS_TOTAL,
411 "destination_type" => destination_type.to_string(),
412 "error_type" => error_category.as_str()
413 )
414 .increment(1);
415}
416
417// ============================================================================
418// Histogram Metrics
419// ============================================================================
420
421/// Records a batch size.
422///
423/// # Arguments
424///
425/// * `size` - Number of events in the batch
426/// * `collection` - MongoDB collection name
427///
428/// # Examples
429///
430/// ```rust
431/// use rigatoni_core::metrics;
432///
433/// metrics::record_batch_size(150, "users");
434/// ```
435pub fn record_batch_size(size: usize, collection: &str) {
436 histogram!(BATCH_SIZE, "collection" => collection.to_string()).record(size as f64);
437}
438
439/// Records the duration of batch processing.
440///
441/// This measures the time from when a batch starts accumulating events
442/// to when it's successfully written to the destination.
443///
444/// # Arguments
445///
446/// * `duration_seconds` - Duration in seconds
447/// * `collection` - MongoDB collection name
448///
449/// # Examples
450///
451/// ```rust
452/// use rigatoni_core::metrics;
453/// use std::time::Instant;
454///
455/// let start = Instant::now();
456/// // ... process batch ...
457/// metrics::record_batch_duration(start.elapsed().as_secs_f64(), "users");
458/// ```
459pub fn record_batch_duration(duration_seconds: f64, collection: &str) {
460 histogram!(BATCH_DURATION_SECONDS, "collection" => collection.to_string())
461 .record(duration_seconds);
462}
463
464/// Records the duration of a destination write operation.
465///
466/// This includes time spent in retries if applicable.
467///
468/// # Arguments
469///
470/// * `duration` - Duration of the write operation
471/// * `destination_type` - Type of destination
472///
473/// # Examples
474///
475/// ```rust
476/// use rigatoni_core::metrics;
477/// use std::time::{Duration, Instant};
478///
479/// let start = Instant::now();
480/// // ... write to destination ...
481/// metrics::record_destination_write_duration(start.elapsed(), "s3");
482/// ```
483pub fn record_destination_write_duration(duration: Duration, destination_type: &str) {
484 histogram!(DESTINATION_WRITE_DURATION_SECONDS, "destination_type" => destination_type.to_string())
485 .record(duration.as_secs_f64());
486}
487
488/// Records the size of data written to a destination.
489///
490/// # Arguments
491///
492/// * `bytes` - Number of bytes written (compressed if applicable)
493/// * `destination_type` - Type of destination
494///
495/// # Examples
496///
497/// ```rust
498/// use rigatoni_core::metrics;
499///
500/// metrics::record_destination_write_bytes(1024 * 1024, "s3"); // 1 MB
501/// ```
502pub fn record_destination_write_bytes(bytes: usize, destination_type: &str) {
503 histogram!(DESTINATION_WRITE_BYTES, "destination_type" => destination_type.to_string())
504 .record(bytes as f64);
505}
506
507/// Records change stream lag (approximation).
508///
509/// This estimates the time between when a MongoDB operation occurred
510/// and when the change stream event was received.
511///
512/// # Arguments
513///
514/// * `lag_seconds` - Estimated lag in seconds
515/// * `collection` - MongoDB collection name
516///
517/// # Examples
518///
519/// ```rust
520/// use rigatoni_core::metrics;
521///
522/// metrics::record_change_stream_lag(0.05, "users"); // 50ms lag
523/// ```
524pub fn record_change_stream_lag(lag_seconds: f64, collection: &str) {
525 histogram!(CHANGE_STREAM_LAG_SECONDS, "collection" => collection.to_string())
526 .record(lag_seconds);
527}
528
529// ============================================================================
530// Gauge Metrics
531// ============================================================================
532
533/// Sets the number of active collections being monitored.
534///
535/// # Arguments
536///
537/// * `count` - Number of active collections
538///
539/// # Examples
540///
541/// ```rust
542/// use rigatoni_core::metrics;
543///
544/// metrics::set_active_collections(3);
545/// ```
546pub fn set_active_collections(count: usize) {
547 gauge!(ACTIVE_COLLECTIONS).set(count as f64);
548}
549
550/// Sets the pipeline status.
551///
552/// # Arguments
553///
554/// * `status` - Pipeline status: 0=stopped, 1=running, 2=error
555///
556/// # Examples
557///
558/// ```rust
559/// use rigatoni_core::metrics::{self, PipelineStatus};
560///
561/// metrics::set_pipeline_status(PipelineStatus::Running);
562/// ```
563pub fn set_pipeline_status(status: PipelineStatus) {
564 gauge!(PIPELINE_STATUS).set(f64::from(status as u8));
565}
566
567/// Sets the current batch queue size.
568///
569/// This represents how many events are currently buffered awaiting batch write.
570///
571/// # Arguments
572///
573/// * `size` - Number of events in queue
574/// * `collection` - MongoDB collection name
575///
576/// # Examples
577///
578/// ```rust
579/// use rigatoni_core::metrics;
580///
581/// metrics::set_batch_queue_size(47, "users");
582/// ```
583pub fn set_batch_queue_size(size: usize, collection: &str) {
584 gauge!(BATCH_QUEUE_SIZE, "collection" => collection.to_string()).set(size as f64);
585}
586
587/// Increments the batch queue size by 1.
588///
589/// More efficient than `set_batch_queue_size` when adding a single event.
590///
591/// # Arguments
592///
593/// * `collection` - MongoDB collection name
594pub fn increment_batch_queue_size(collection: &str) {
595 gauge!(BATCH_QUEUE_SIZE, "collection" => collection.to_string()).increment(1.0);
596}
597
598/// Decrements the batch queue size by a specific amount.
599///
600/// Use this after flushing a batch.
601///
602/// # Arguments
603///
604/// * `count` - Number of events removed from queue
605/// * `collection` - MongoDB collection name
606pub fn decrement_batch_queue_size(count: usize, collection: &str) {
607 gauge!(BATCH_QUEUE_SIZE, "collection" => collection.to_string()).decrement(count as f64);
608}
609
610// ============================================================================
611// Pipeline Status Type
612// ============================================================================
613
614/// Pipeline status for the `pipeline_status` gauge.
615#[derive(Debug, Clone, Copy, PartialEq, Eq)]
616#[repr(u8)]
617pub enum PipelineStatus {
618 /// Pipeline is stopped.
619 Stopped = 0,
620 /// Pipeline is running normally.
621 Running = 1,
622 /// Pipeline encountered an error.
623 Error = 2,
624}
625
626/// Error categories for consistent metric labeling.
627///
628/// This enum provides a fixed set of error types to prevent cardinality explosion
629/// from using free-form error messages as labels.
630#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
631pub enum ErrorCategory {
632 /// Timeout error (operation exceeded time limit)
633 Timeout,
634 /// Connection error (network, TCP, DNS failures)
635 Connection,
636 /// Serialization error (JSON, BSON, encoding failures)
637 Serialization,
638 /// Permission error (authentication, authorization failures)
639 Permission,
640 /// Validation error (invalid data, schema violations)
641 Validation,
642 /// Not found error (resource doesn't exist)
643 NotFound,
644 /// Rate limit error (throttling, quota exceeded)
645 RateLimit,
646 /// Unknown error (unclassified)
647 Unknown,
648}
649
650impl ErrorCategory {
651 /// Returns the error category as a static string for metrics labels.
652 #[must_use]
653 pub const fn as_str(&self) -> &'static str {
654 match self {
655 Self::Timeout => "timeout_error",
656 Self::Connection => "connection_error",
657 Self::Serialization => "serialization_error",
658 Self::Permission => "permission_error",
659 Self::Validation => "validation_error",
660 Self::NotFound => "not_found_error",
661 Self::RateLimit => "rate_limit_error",
662 Self::Unknown => "unknown_error",
663 }
664 }
665}
666
667// ============================================================================
668// Metric Helper Utilities
669// ============================================================================
670
671/// Helper for timing operations and automatically recording the duration.
672///
673/// # Examples
674///
675/// ```rust
676/// use rigatoni_core::metrics::Timer;
677///
678/// {
679/// let _timer = Timer::new("s3", |duration, dest_type| {
680/// rigatoni_core::metrics::record_destination_write_duration(duration, dest_type);
681/// });
682/// // ... operation to time ...
683/// } // Timer automatically records when dropped
684/// ```
685pub struct Timer<F>
686where
687 F: FnOnce(Duration, &str),
688{
689 start: std::time::Instant,
690 label: String,
691 record_fn: Option<F>,
692}
693
694impl<F> Timer<F>
695where
696 F: FnOnce(Duration, &str),
697{
698 /// Creates a new timer that will record the duration when dropped.
699 pub fn new(label: impl Into<String>, record_fn: F) -> Self {
700 Self {
701 start: std::time::Instant::now(),
702 label: label.into(),
703 record_fn: Some(record_fn),
704 }
705 }
706}
707
708impl<F> Drop for Timer<F>
709where
710 F: FnOnce(Duration, &str),
711{
712 fn drop(&mut self) {
713 let duration = self.start.elapsed();
714 if let Some(record_fn) = self.record_fn.take() {
715 record_fn(duration, &self.label);
716 }
717 }
718}