rigatoni_core/
metrics.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Metrics instrumentation for Rigatoni pipeline observability.
18//!
19//! This module provides comprehensive metrics collection for monitoring
20//! Rigatoni pipelines in production. It uses the `metrics` crate which
21//! supports multiple exporters (Prometheus, StatsD, etc.).
22//!
23//! # Metric Types
24//!
25//! - **Counters**: Monotonically increasing values (events processed, errors)
26//! - **Histograms**: Value distributions (batch sizes, latencies)
27//! - **Gauges**: Point-in-time values (active collections, queue depth)
28//!
29//! # Naming Conventions
30//!
31//! All metrics follow Prometheus naming conventions:
32//! - Use underscores (not hyphens or camelCase)
33//! - Include unit suffix (\_seconds, \_bytes, \_total)
34//! - Prefix with application name (rigatoni\_)
35//! - Counter metrics end with \_total
36//!
37//! # Labels
38//!
39//! Labels add dimensionality but increase cardinality. Use sparingly:
40//! - **collection**: MongoDB collection name (low cardinality)
41//! - **destination\_type**: Destination type like "s3", "bigquery" (very low cardinality)
42//! - **error\_type**: Error category (low cardinality, max ~20 types)
43//! - **operation**: Operation type like "insert", "update", "delete" (very low cardinality)
44//!
45//! ⚠️ **Cardinality Warning**: Never use high-cardinality values as labels:
46//! - Document IDs
47//! - Timestamps
48//! - User IDs
49//! - Full error messages
50//!
51//! # Performance Considerations
52//!
53//! The `metrics` crate is designed for low overhead:
54//! - Lock-free atomic operations for counters
55//! - Thread-local histograms with periodic aggregation
56//! - No-op recorder when metrics are disabled
57//! - Typical overhead: <1μs per metric call
58//!
59//! # Examples
60//!
61//! ## Recording Events
62//!
63//! ```rust
64//! use rigatoni_core::metrics;
65//!
66//! // Increment counter
67//! metrics::increment_events_processed("users", "insert");
68//!
69//! // Record histogram
70//! metrics::record_batch_size(150, "orders");
71//!
72//! // Update gauge
73//! metrics::set_active_collections(3);
74//! ```
75//!
76//! ## Measuring Duration
77//!
78//! ```rust
79//! use rigatoni_core::metrics;
80//! use std::time::Instant;
81//!
82//! let start = Instant::now();
83//! // ... do work ...
84//! metrics::record_batch_duration(start.elapsed().as_secs_f64(), "products");
85//! ```
86//!
87//! ## Error Tracking
88//!
89//! ```rust
90//! use rigatoni_core::metrics::{self, ErrorCategory};
91//!
92//! metrics::increment_events_failed("users", ErrorCategory::Serialization);
93//! metrics::increment_retries(ErrorCategory::Timeout);
94//! ```
95
96use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
97use std::time::Duration;
98
99/// Metric name prefix for all Rigatoni metrics.
100#[doc(hidden)]
101pub const METRIC_PREFIX: &str = "rigatoni";
102
103// ============================================================================
104// Metric Name Constants
105// ============================================================================
106
107/// Total number of events successfully processed through the pipeline.
108///
109/// Type: Counter
110/// Labels: collection, operation
111#[doc(hidden)]
112pub const EVENTS_PROCESSED_TOTAL: &str = "rigatoni_events_processed_total";
113
114/// Total number of events that failed processing.
115///
116/// Type: Counter
117/// Labels: collection, error_type
118const EVENTS_FAILED_TOTAL: &str = "rigatoni_events_failed_total";
119
120/// Total number of retry attempts across all operations.
121///
122/// Type: Counter
123/// Labels: error_type
124const RETRIES_TOTAL: &str = "rigatoni_retries_total";
125
126/// Distribution of batch sizes sent to destinations.
127///
128/// Type: Histogram
129/// Labels: collection
130/// Unit: events
131const BATCH_SIZE: &str = "rigatoni_batch_size";
132
133/// Time taken to process a batch (from accumulation to write).
134///
135/// Type: Histogram
136/// Labels: collection
137/// Unit: seconds
138#[doc(hidden)]
139pub const BATCH_DURATION_SECONDS: &str = "rigatoni_batch_duration_seconds";
140
141/// Time taken for destination write operations.
142///
143/// Type: Histogram
144/// Labels: destination_type
145/// Unit: seconds
146const DESTINATION_WRITE_DURATION_SECONDS: &str = "rigatoni_destination_write_duration_seconds";
147
148/// Number of collections currently being monitored.
149///
150/// Type: Gauge
151/// Unit: count
152const ACTIVE_COLLECTIONS: &str = "rigatoni_active_collections";
153
154/// Current pipeline status (0=stopped, 1=running, 2=error).
155///
156/// Type: Gauge
157/// Unit: status code
158const PIPELINE_STATUS: &str = "rigatoni_pipeline_status";
159
160/// Number of events currently buffered in batch queue.
161///
162/// Type: Gauge
163/// Labels: collection
164/// Unit: events
165const BATCH_QUEUE_SIZE: &str = "rigatoni_batch_queue_size";
166
167/// Total number of batches successfully written to destinations.
168///
169/// Type: Counter
170/// Labels: destination_type
171const BATCHES_WRITTEN_TOTAL: &str = "rigatoni_batches_written_total";
172
173/// Total number of destination write errors.
174///
175/// Type: Counter
176/// Labels: destination_type, error_type
177const DESTINATION_WRITE_ERRORS_TOTAL: &str = "rigatoni_destination_write_errors_total";
178
179/// Size of data written to destination (compressed if applicable).
180///
181/// Type: Histogram
182/// Labels: destination_type
183/// Unit: bytes
184const DESTINATION_WRITE_BYTES: &str = "rigatoni_destination_write_bytes";
185
186/// Time taken for change stream to receive an event.
187///
188/// Type: Histogram
189/// Labels: collection
190/// Unit: seconds
191const CHANGE_STREAM_LAG_SECONDS: &str = "rigatoni_change_stream_lag_seconds";
192
193// ============================================================================
194// Distributed Locking Metrics
195// ============================================================================
196
197/// Total number of locks currently held by this instance.
198///
199/// Type: Gauge
200/// Unit: count
201const LOCKS_HELD_TOTAL: &str = "rigatoni_locks_held_total";
202
203/// Total number of successful lock acquisitions.
204///
205/// Type: Counter
206const LOCK_ACQUISITIONS_TOTAL: &str = "rigatoni_lock_acquisitions_total";
207
208/// Total number of failed lock acquisition attempts.
209///
210/// Type: Counter
211/// Labels: reason (already_held, error)
212const LOCK_ACQUISITION_FAILURES_TOTAL: &str = "rigatoni_lock_acquisition_failures_total";
213
214/// Total number of locks lost (expired or stolen).
215///
216/// Type: Counter
217const LOCKS_LOST_TOTAL: &str = "rigatoni_locks_lost_total";
218
219/// Total number of successful lock refreshes.
220///
221/// Type: Counter
222const LOCK_REFRESHES_TOTAL: &str = "rigatoni_lock_refreshes_total";
223
224/// Total number of locks released gracefully.
225///
226/// Type: Counter
227const LOCKS_RELEASED_TOTAL: &str = "rigatoni_locks_released_total";
228
229// ============================================================================
230// Initialization
231// ============================================================================
232
233/// Initializes metric descriptions for documentation and introspection.
234///
235/// This should be called once at application startup, before recording any metrics.
236/// It provides human-readable descriptions for metrics exporters like Prometheus.
237///
238/// # Examples
239///
240/// ```rust
241/// use rigatoni_core::metrics;
242///
243/// metrics::init_metrics();
244/// // ... start pipeline ...
245/// ```
246pub fn init_metrics() {
247    // Counters
248    describe_counter!(
249        EVENTS_PROCESSED_TOTAL,
250        "Total number of change stream events successfully processed through the pipeline"
251    );
252
253    describe_counter!(
254        EVENTS_FAILED_TOTAL,
255        "Total number of events that failed processing due to errors"
256    );
257
258    describe_counter!(
259        RETRIES_TOTAL,
260        "Total number of retry attempts for failed operations"
261    );
262
263    describe_counter!(
264        BATCHES_WRITTEN_TOTAL,
265        "Total number of batches successfully written to destinations"
266    );
267
268    describe_counter!(
269        DESTINATION_WRITE_ERRORS_TOTAL,
270        "Total number of errors writing to destinations"
271    );
272
273    // Histograms
274    describe_histogram!(
275        BATCH_SIZE,
276        metrics::Unit::Count,
277        "Distribution of batch sizes (number of events per batch)"
278    );
279
280    describe_histogram!(
281        BATCH_DURATION_SECONDS,
282        metrics::Unit::Seconds,
283        "Time taken to process a batch from accumulation to successful write"
284    );
285
286    describe_histogram!(
287        DESTINATION_WRITE_DURATION_SECONDS,
288        metrics::Unit::Seconds,
289        "Time taken for destination write operations (including retries)"
290    );
291
292    describe_histogram!(
293        DESTINATION_WRITE_BYTES,
294        metrics::Unit::Bytes,
295        "Size of data written to destination (compressed if applicable)"
296    );
297
298    describe_histogram!(
299        CHANGE_STREAM_LAG_SECONDS,
300        metrics::Unit::Seconds,
301        "Time between MongoDB operation and change stream receipt (approximation)"
302    );
303
304    // Gauges
305    describe_gauge!(
306        ACTIVE_COLLECTIONS,
307        metrics::Unit::Count,
308        "Number of MongoDB collections currently being monitored"
309    );
310
311    describe_gauge!(
312        PIPELINE_STATUS,
313        "Current pipeline status: 0=stopped, 1=running, 2=error"
314    );
315
316    describe_gauge!(
317        BATCH_QUEUE_SIZE,
318        metrics::Unit::Count,
319        "Number of events currently buffered awaiting batch write"
320    );
321
322    // Distributed Locking Metrics
323    describe_gauge!(
324        LOCKS_HELD_TOTAL,
325        metrics::Unit::Count,
326        "Number of distributed locks currently held by this instance"
327    );
328
329    describe_counter!(
330        LOCK_ACQUISITIONS_TOTAL,
331        "Total number of successful distributed lock acquisitions"
332    );
333
334    describe_counter!(
335        LOCK_ACQUISITION_FAILURES_TOTAL,
336        "Total number of failed distributed lock acquisition attempts"
337    );
338
339    describe_counter!(
340        LOCKS_LOST_TOTAL,
341        "Total number of distributed locks lost (expired or stolen by another instance)"
342    );
343
344    describe_counter!(
345        LOCK_REFRESHES_TOTAL,
346        "Total number of successful distributed lock refreshes"
347    );
348
349    describe_counter!(
350        LOCKS_RELEASED_TOTAL,
351        "Total number of distributed locks released gracefully"
352    );
353}
354
355// ============================================================================
356// Counter Metrics
357// ============================================================================
358
359/// Increments the count of successfully processed events.
360///
361/// This should be called after a batch is successfully written to a destination
362/// and the resume token is saved.
363///
364/// # Arguments
365///
366/// * `collection` - MongoDB collection name
367/// * `operation` - Operation type: "insert", "update", "delete", "replace", etc.
368///
369/// # Examples
370///
371/// ```rust
372/// use rigatoni_core::metrics;
373///
374/// metrics::increment_events_processed("users", "insert");
375/// ```
376pub fn increment_events_processed(collection: &str, operation: &str) {
377    counter!(EVENTS_PROCESSED_TOTAL, "collection" => collection.to_string(), "operation" => operation.to_string())
378        .increment(1);
379}
380
381/// Increments the count of successfully processed events by a specific amount.
382///
383/// Use this when processing a batch of events at once.
384///
385/// # Arguments
386///
387/// * `count` - Number of events processed
388/// * `collection` - MongoDB collection name
389/// * `operation` - Operation type
390///
391/// # Examples
392///
393/// ```rust
394/// use rigatoni_core::metrics;
395///
396/// // After writing a batch of 150 events
397/// metrics::increment_events_processed_by(150, "orders", "insert");
398/// ```
399pub fn increment_events_processed_by(count: u64, collection: &str, operation: &str) {
400    counter!(EVENTS_PROCESSED_TOTAL, "collection" => collection.to_string(), "operation" => operation.to_string())
401        .increment(count);
402}
403
404/// Increments the count of failed events.
405///
406/// This should be called when an event fails processing due to an error.
407///
408/// # Arguments
409///
410/// * `collection` - MongoDB collection name
411/// * `error_category` - Category of error that occurred
412///
413/// # Examples
414///
415/// ```rust
416/// use rigatoni_core::metrics::{self, ErrorCategory};
417///
418/// metrics::increment_events_failed("users", ErrorCategory::Serialization);
419/// ```
420pub fn increment_events_failed(collection: &str, error_category: ErrorCategory) {
421    counter!(EVENTS_FAILED_TOTAL, "collection" => collection.to_string(), "error_type" => error_category.as_str())
422        .increment(1);
423}
424
425/// Increments the retry counter.
426///
427/// This should be called each time an operation is retried.
428///
429/// # Arguments
430///
431/// * `error_category` - Category of error that triggered the retry
432///
433/// # Examples
434///
435/// ```rust
436/// use rigatoni_core::metrics::{self, ErrorCategory};
437///
438/// metrics::increment_retries(ErrorCategory::Timeout);
439/// ```
440pub fn increment_retries(error_category: ErrorCategory) {
441    counter!(RETRIES_TOTAL, "error_type" => error_category.as_str()).increment(1);
442}
443
444/// Increments the count of successfully written batches.
445///
446/// # Arguments
447///
448/// * `destination_type` - Type of destination (e.g., "s3", "bigquery", "kafka")
449///
450/// # Examples
451///
452/// ```rust
453/// use rigatoni_core::metrics;
454///
455/// metrics::increment_batches_written("s3");
456/// ```
457pub fn increment_batches_written(destination_type: &str) {
458    counter!(BATCHES_WRITTEN_TOTAL, "destination_type" => destination_type.to_string())
459        .increment(1);
460}
461
462/// Increments the count of destination write errors.
463///
464/// # Arguments
465///
466/// * `destination_type` - Type of destination
467/// * `error_type` - Error category
468///
469/// # Examples
470///
471/// ```rust
472/// use rigatoni_core::metrics::{self, ErrorCategory};
473///
474/// metrics::increment_destination_errors("s3", ErrorCategory::Timeout);
475/// ```
476pub fn increment_destination_errors(destination_type: &str, error_category: ErrorCategory) {
477    counter!(
478        DESTINATION_WRITE_ERRORS_TOTAL,
479        "destination_type" => destination_type.to_string(),
480        "error_type" => error_category.as_str()
481    )
482    .increment(1);
483}
484
485// ============================================================================
486// Histogram Metrics
487// ============================================================================
488
489/// Records a batch size.
490///
491/// # Arguments
492///
493/// * `size` - Number of events in the batch
494/// * `collection` - MongoDB collection name
495///
496/// # Examples
497///
498/// ```rust
499/// use rigatoni_core::metrics;
500///
501/// metrics::record_batch_size(150, "users");
502/// ```
503pub fn record_batch_size(size: usize, collection: &str) {
504    histogram!(BATCH_SIZE, "collection" => collection.to_string()).record(size as f64);
505}
506
507/// Records the duration of batch processing.
508///
509/// This measures the time from when a batch starts accumulating events
510/// to when it's successfully written to the destination.
511///
512/// # Arguments
513///
514/// * `duration_seconds` - Duration in seconds
515/// * `collection` - MongoDB collection name
516///
517/// # Examples
518///
519/// ```rust
520/// use rigatoni_core::metrics;
521/// use std::time::Instant;
522///
523/// let start = Instant::now();
524/// // ... process batch ...
525/// metrics::record_batch_duration(start.elapsed().as_secs_f64(), "users");
526/// ```
527pub fn record_batch_duration(duration_seconds: f64, collection: &str) {
528    histogram!(BATCH_DURATION_SECONDS, "collection" => collection.to_string())
529        .record(duration_seconds);
530}
531
532/// Records the duration of a destination write operation.
533///
534/// This includes time spent in retries if applicable.
535///
536/// # Arguments
537///
538/// * `duration` - Duration of the write operation
539/// * `destination_type` - Type of destination
540///
541/// # Examples
542///
543/// ```rust
544/// use rigatoni_core::metrics;
545/// use std::time::{Duration, Instant};
546///
547/// let start = Instant::now();
548/// // ... write to destination ...
549/// metrics::record_destination_write_duration(start.elapsed(), "s3");
550/// ```
551pub fn record_destination_write_duration(duration: Duration, destination_type: &str) {
552    histogram!(DESTINATION_WRITE_DURATION_SECONDS, "destination_type" => destination_type.to_string())
553        .record(duration.as_secs_f64());
554}
555
556/// Records the size of data written to a destination.
557///
558/// # Arguments
559///
560/// * `bytes` - Number of bytes written (compressed if applicable)
561/// * `destination_type` - Type of destination
562///
563/// # Examples
564///
565/// ```rust
566/// use rigatoni_core::metrics;
567///
568/// metrics::record_destination_write_bytes(1024 * 1024, "s3"); // 1 MB
569/// ```
570pub fn record_destination_write_bytes(bytes: usize, destination_type: &str) {
571    histogram!(DESTINATION_WRITE_BYTES, "destination_type" => destination_type.to_string())
572        .record(bytes as f64);
573}
574
575/// Records change stream lag (approximation).
576///
577/// This estimates the time between when a MongoDB operation occurred
578/// and when the change stream event was received.
579///
580/// # Arguments
581///
582/// * `lag_seconds` - Estimated lag in seconds
583/// * `collection` - MongoDB collection name
584///
585/// # Examples
586///
587/// ```rust
588/// use rigatoni_core::metrics;
589///
590/// metrics::record_change_stream_lag(0.05, "users"); // 50ms lag
591/// ```
592pub fn record_change_stream_lag(lag_seconds: f64, collection: &str) {
593    histogram!(CHANGE_STREAM_LAG_SECONDS, "collection" => collection.to_string())
594        .record(lag_seconds);
595}
596
597// ============================================================================
598// Gauge Metrics
599// ============================================================================
600
601/// Sets the number of active collections being monitored.
602///
603/// # Arguments
604///
605/// * `count` - Number of active collections
606///
607/// # Examples
608///
609/// ```rust
610/// use rigatoni_core::metrics;
611///
612/// metrics::set_active_collections(3);
613/// ```
614pub fn set_active_collections(count: usize) {
615    gauge!(ACTIVE_COLLECTIONS).set(count as f64);
616}
617
618/// Sets the pipeline status.
619///
620/// # Arguments
621///
622/// * `status` - Pipeline status: 0=stopped, 1=running, 2=error
623///
624/// # Examples
625///
626/// ```rust
627/// use rigatoni_core::metrics::{self, PipelineStatus};
628///
629/// metrics::set_pipeline_status(PipelineStatus::Running);
630/// ```
631pub fn set_pipeline_status(status: PipelineStatus) {
632    gauge!(PIPELINE_STATUS).set(f64::from(status as u8));
633}
634
635/// Sets the current batch queue size.
636///
637/// This represents how many events are currently buffered awaiting batch write.
638///
639/// # Arguments
640///
641/// * `size` - Number of events in queue
642/// * `collection` - MongoDB collection name
643///
644/// # Examples
645///
646/// ```rust
647/// use rigatoni_core::metrics;
648///
649/// metrics::set_batch_queue_size(47, "users");
650/// ```
651pub fn set_batch_queue_size(size: usize, collection: &str) {
652    gauge!(BATCH_QUEUE_SIZE, "collection" => collection.to_string()).set(size as f64);
653}
654
655/// Increments the batch queue size by 1.
656///
657/// More efficient than `set_batch_queue_size` when adding a single event.
658///
659/// # Arguments
660///
661/// * `collection` - MongoDB collection name
662pub fn increment_batch_queue_size(collection: &str) {
663    gauge!(BATCH_QUEUE_SIZE, "collection" => collection.to_string()).increment(1.0);
664}
665
666/// Decrements the batch queue size by a specific amount.
667///
668/// Use this after flushing a batch.
669///
670/// # Arguments
671///
672/// * `count` - Number of events removed from queue
673/// * `collection` - MongoDB collection name
674pub fn decrement_batch_queue_size(count: usize, collection: &str) {
675    gauge!(BATCH_QUEUE_SIZE, "collection" => collection.to_string()).decrement(count as f64);
676}
677
678// ============================================================================
679// Distributed Locking Metrics
680// ============================================================================
681
682/// Sets the number of locks currently held by this instance.
683///
684/// # Arguments
685///
686/// * `count` - Number of locks held
687///
688/// # Examples
689///
690/// ```rust
691/// use rigatoni_core::metrics;
692///
693/// metrics::set_locks_held(3);
694/// ```
695pub fn set_locks_held(count: usize) {
696    gauge!(LOCKS_HELD_TOTAL).set(count as f64);
697}
698
699/// Increments the count of successful lock acquisitions.
700///
701/// # Examples
702///
703/// ```rust
704/// use rigatoni_core::metrics;
705///
706/// metrics::increment_lock_acquisitions();
707/// ```
708pub fn increment_lock_acquisitions() {
709    counter!(LOCK_ACQUISITIONS_TOTAL).increment(1);
710}
711
712/// Increments the count of failed lock acquisition attempts.
713///
714/// # Arguments
715///
716/// * `reason` - Why the lock acquisition failed
717///
718/// # Examples
719///
720/// ```rust
721/// use rigatoni_core::metrics::{self, LockFailureReason};
722///
723/// metrics::increment_lock_acquisition_failures(LockFailureReason::AlreadyHeld);
724/// ```
725pub fn increment_lock_acquisition_failures(reason: LockFailureReason) {
726    counter!(LOCK_ACQUISITION_FAILURES_TOTAL, "reason" => reason.as_str()).increment(1);
727}
728
729/// Increments the count of locks lost (expired or stolen).
730///
731/// # Examples
732///
733/// ```rust
734/// use rigatoni_core::metrics;
735///
736/// metrics::increment_locks_lost();
737/// ```
738pub fn increment_locks_lost() {
739    counter!(LOCKS_LOST_TOTAL).increment(1);
740}
741
742/// Increments the count of successful lock refreshes.
743///
744/// # Examples
745///
746/// ```rust
747/// use rigatoni_core::metrics;
748///
749/// metrics::increment_lock_refreshes();
750/// ```
751pub fn increment_lock_refreshes() {
752    counter!(LOCK_REFRESHES_TOTAL).increment(1);
753}
754
755/// Increments the count of locks released gracefully.
756///
757/// # Examples
758///
759/// ```rust
760/// use rigatoni_core::metrics;
761///
762/// metrics::increment_locks_released();
763/// ```
764pub fn increment_locks_released() {
765    counter!(LOCKS_RELEASED_TOTAL).increment(1);
766}
767
768/// Reasons for lock acquisition failure.
769#[derive(Debug, Clone, Copy, PartialEq, Eq)]
770pub enum LockFailureReason {
771    /// Lock is already held by another instance.
772    AlreadyHeld,
773    /// Error communicating with state store.
774    Error,
775}
776
777impl LockFailureReason {
778    /// Returns the reason as a static string for metrics labels.
779    #[must_use]
780    pub const fn as_str(&self) -> &'static str {
781        match self {
782            Self::AlreadyHeld => "already_held",
783            Self::Error => "error",
784        }
785    }
786}
787
788// ============================================================================
789// Pipeline Status Type
790// ============================================================================
791
792/// Pipeline status for the `pipeline_status` gauge.
793#[derive(Debug, Clone, Copy, PartialEq, Eq)]
794#[repr(u8)]
795pub enum PipelineStatus {
796    /// Pipeline is stopped.
797    Stopped = 0,
798    /// Pipeline is running normally.
799    Running = 1,
800    /// Pipeline encountered an error.
801    Error = 2,
802}
803
804/// Error categories for consistent metric labeling.
805///
806/// This enum provides a fixed set of error types to prevent cardinality explosion
807/// from using free-form error messages as labels.
808#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
809pub enum ErrorCategory {
810    /// Timeout error (operation exceeded time limit)
811    Timeout,
812    /// Connection error (network, TCP, DNS failures)
813    Connection,
814    /// Serialization error (JSON, BSON, encoding failures)
815    Serialization,
816    /// Permission error (authentication, authorization failures)
817    Permission,
818    /// Validation error (invalid data, schema violations)
819    Validation,
820    /// Not found error (resource doesn't exist)
821    NotFound,
822    /// Rate limit error (throttling, quota exceeded)
823    RateLimit,
824    /// Unknown error (unclassified)
825    Unknown,
826}
827
828impl ErrorCategory {
829    /// Returns the error category as a static string for metrics labels.
830    #[must_use]
831    pub const fn as_str(&self) -> &'static str {
832        match self {
833            Self::Timeout => "timeout_error",
834            Self::Connection => "connection_error",
835            Self::Serialization => "serialization_error",
836            Self::Permission => "permission_error",
837            Self::Validation => "validation_error",
838            Self::NotFound => "not_found_error",
839            Self::RateLimit => "rate_limit_error",
840            Self::Unknown => "unknown_error",
841        }
842    }
843}
844
845// ============================================================================
846// Metric Helper Utilities
847// ============================================================================
848
849/// Helper for timing operations and automatically recording the duration.
850///
851/// # Examples
852///
853/// ```rust
854/// use rigatoni_core::metrics::Timer;
855///
856/// {
857///     let _timer = Timer::new("s3", |duration, dest_type| {
858///         rigatoni_core::metrics::record_destination_write_duration(duration, dest_type);
859///     });
860///     // ... operation to time ...
861/// } // Timer automatically records when dropped
862/// ```
863pub struct Timer<F>
864where
865    F: FnOnce(Duration, &str),
866{
867    start: std::time::Instant,
868    label: String,
869    record_fn: Option<F>,
870}
871
872impl<F> Timer<F>
873where
874    F: FnOnce(Duration, &str),
875{
876    /// Creates a new timer that will record the duration when dropped.
877    pub fn new(label: impl Into<String>, record_fn: F) -> Self {
878        Self {
879            start: std::time::Instant::now(),
880            label: label.into(),
881            record_fn: Some(record_fn),
882        }
883    }
884}
885
886impl<F> Drop for Timer<F>
887where
888    F: FnOnce(Duration, &str),
889{
890    fn drop(&mut self) {
891        let duration = self.start.elapsed();
892        if let Some(record_fn) = self.record_fn.take() {
893            record_fn(duration, &self.label);
894        }
895    }
896}