1use std::collections::HashMap;
28use std::fmt::Write;
29use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
30use std::sync::{Arc, LazyLock, RwLock};
31
32use serde::{Deserialize, Serialize};
33
34#[derive(Debug, Clone)]
40pub enum MetricEvent {
41 RequestStarted {
43 service: String,
45 },
46 RequestCompleted {
48 service: String,
50 duration_ms: u64,
52 success: bool,
54 },
55 TokenUsage {
57 provider: String,
59 input_tokens: u64,
61 output_tokens: u64,
63 },
64 CacheAccess {
66 hit: bool,
68 },
69 WorkerCountChanged {
71 count: i64,
73 },
74 QueueDepthChanged {
76 depth: i64,
78 },
79 PipelineExecuted {
81 pipeline_id: String,
83 duration_ms: u64,
85 success: bool,
87 },
88 CircuitBreakerStateChanged {
90 service: String,
92 state: String,
94 },
95}
96
97#[derive(Default)]
100struct ServiceCounters {
101 requests: AtomicU64,
102 errors: AtomicU64,
103 total_duration: AtomicU64,
104}
105
106pub struct MetricsRegistry {
113 requests_total: AtomicU64,
115 errors_total: AtomicU64,
116 cache_hits_total: AtomicU64,
117 cache_misses_total: AtomicU64,
118 pipelines_total: AtomicU64,
119 pipeline_errors_total: AtomicU64,
120 input_tokens_total: AtomicU64,
121 output_tokens_total: AtomicU64,
122
123 active_workers: AtomicI64,
125 queue_depth: AtomicI64,
126
127 request_duration_buckets: [AtomicU64; 10],
130 pipeline_duration_buckets: [AtomicU64; 10],
131 request_duration_sum: AtomicU64,
132 pipeline_duration_sum: AtomicU64,
133
134 services: RwLock<HashMap<String, Arc<ServiceCounters>>>,
136}
137
138const DURATION_BOUNDS: [u64; 9] = [10, 50, 100, 250, 500, 1_000, 2_500, 5_000, 10_000];
139
140fn bucket_index(ms: u64) -> usize {
141 DURATION_BOUNDS.iter().position(|&b| ms <= b).unwrap_or(9)
142}
143
144#[allow(clippy::unwrap_used)]
145impl MetricsRegistry {
146 #[must_use]
156 pub fn new() -> Self {
157 Self {
158 requests_total: AtomicU64::new(0),
159 errors_total: AtomicU64::new(0),
160 cache_hits_total: AtomicU64::new(0),
161 cache_misses_total: AtomicU64::new(0),
162 pipelines_total: AtomicU64::new(0),
163 pipeline_errors_total: AtomicU64::new(0),
164 input_tokens_total: AtomicU64::new(0),
165 output_tokens_total: AtomicU64::new(0),
166 active_workers: AtomicI64::new(0),
167 queue_depth: AtomicI64::new(0),
168 request_duration_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
169 pipeline_duration_buckets: std::array::from_fn(|_| AtomicU64::new(0)),
170 request_duration_sum: AtomicU64::new(0),
171 pipeline_duration_sum: AtomicU64::new(0),
172 services: RwLock::new(HashMap::new()),
173 }
174 }
175
176 #[allow(clippy::indexing_slicing)]
188 pub fn record(&self, event: MetricEvent) {
189 match event {
190 MetricEvent::RequestStarted { service } => {
191 self.requests_total.fetch_add(1, Ordering::Relaxed);
192 self.service_counters(&service)
193 .requests
194 .fetch_add(1, Ordering::Relaxed);
195 }
196 MetricEvent::RequestCompleted {
197 service,
198 duration_ms,
199 success,
200 } => {
201 if !success {
202 self.errors_total.fetch_add(1, Ordering::Relaxed);
203 self.service_counters(&service)
204 .errors
205 .fetch_add(1, Ordering::Relaxed);
206 }
207 self.service_counters(&service)
208 .total_duration
209 .fetch_add(duration_ms, Ordering::Relaxed);
210 let idx = bucket_index(duration_ms);
211 for bucket in &self.request_duration_buckets[idx..] {
212 bucket.fetch_add(1, Ordering::Relaxed);
213 }
214 self.request_duration_sum
215 .fetch_add(duration_ms, Ordering::Relaxed);
216 }
217 MetricEvent::TokenUsage {
218 input_tokens,
219 output_tokens,
220 ..
221 } => {
222 self.input_tokens_total
223 .fetch_add(input_tokens, Ordering::Relaxed);
224 self.output_tokens_total
225 .fetch_add(output_tokens, Ordering::Relaxed);
226 }
227 MetricEvent::CacheAccess { hit } => {
228 if hit {
229 self.cache_hits_total.fetch_add(1, Ordering::Relaxed);
230 } else {
231 self.cache_misses_total.fetch_add(1, Ordering::Relaxed);
232 }
233 }
234 MetricEvent::WorkerCountChanged { count } => {
235 self.active_workers.store(count, Ordering::Relaxed);
236 }
237 MetricEvent::QueueDepthChanged { depth } => {
238 self.queue_depth.store(depth, Ordering::Relaxed);
239 }
240 MetricEvent::PipelineExecuted {
241 duration_ms,
242 success,
243 ..
244 } => {
245 self.pipelines_total.fetch_add(1, Ordering::Relaxed);
246 if !success {
247 self.pipeline_errors_total.fetch_add(1, Ordering::Relaxed);
248 }
249 let idx = bucket_index(duration_ms);
250 for bucket in &self.pipeline_duration_buckets[idx..] {
251 bucket.fetch_add(1, Ordering::Relaxed);
252 }
253 self.pipeline_duration_sum
254 .fetch_add(duration_ms, Ordering::Relaxed);
255 }
256 MetricEvent::CircuitBreakerStateChanged { .. } => {
257 }
259 }
260 }
261
262 pub fn snapshot(&self) -> MetricsSnapshot {
275 MetricsSnapshot {
276 requests_total: self.requests_total.load(Ordering::Relaxed),
277 errors_total: self.errors_total.load(Ordering::Relaxed),
278 cache_hits_total: self.cache_hits_total.load(Ordering::Relaxed),
279 cache_misses_total: self.cache_misses_total.load(Ordering::Relaxed),
280 pipelines_total: self.pipelines_total.load(Ordering::Relaxed),
281 pipeline_errors_total: self.pipeline_errors_total.load(Ordering::Relaxed),
282 input_tokens_total: self.input_tokens_total.load(Ordering::Relaxed),
283 output_tokens_total: self.output_tokens_total.load(Ordering::Relaxed),
284 active_workers: self.active_workers.load(Ordering::Relaxed),
285 queue_depth: self.queue_depth.load(Ordering::Relaxed),
286 }
287 }
288
289 #[allow(
304 clippy::too_many_lines,
305 clippy::indexing_slicing,
306 clippy::format_push_string
307 )]
308 pub fn render_prometheus(&self) -> String {
309 let snap = self.snapshot();
310 let mut out = String::with_capacity(2048);
311
312 macro_rules! counter {
313 ($name:expr, $help:expr, $val:expr) => {
314 out.push_str(&format!(
315 "# HELP {name} {help}\n# TYPE {name} counter\n{name} {val}\n",
316 name = $name,
317 help = $help,
318 val = $val
319 ));
320 };
321 }
322 macro_rules! gauge {
323 ($name:expr, $help:expr, $val:expr) => {
324 out.push_str(&format!(
325 "# HELP {name} {help}\n# TYPE {name} gauge\n{name} {val}\n",
326 name = $name,
327 help = $help,
328 val = $val
329 ));
330 };
331 }
332
333 counter!(
334 "stygian_requests_total",
335 "Total scraping requests initiated",
336 snap.requests_total
337 );
338 counter!(
339 "stygian_errors_total",
340 "Total scraping request failures",
341 snap.errors_total
342 );
343 counter!(
344 "stygian_cache_hits_total",
345 "Total cache hits",
346 snap.cache_hits_total
347 );
348 counter!(
349 "stygian_cache_misses_total",
350 "Total cache misses",
351 snap.cache_misses_total
352 );
353 counter!(
354 "stygian_pipelines_total",
355 "Total pipeline executions",
356 snap.pipelines_total
357 );
358 counter!(
359 "stygian_pipeline_errors_total",
360 "Total pipeline execution failures",
361 snap.pipeline_errors_total
362 );
363 counter!(
364 "stygian_input_tokens_total",
365 "Total AI input/prompt tokens consumed",
366 snap.input_tokens_total
367 );
368 counter!(
369 "stygian_output_tokens_total",
370 "Total AI output/completion tokens generated",
371 snap.output_tokens_total
372 );
373 gauge!(
374 "stygian_active_workers",
375 "Current number of active worker goroutines",
376 snap.active_workers
377 );
378 gauge!(
379 "stygian_queue_depth",
380 "Current worker queue depth",
381 snap.queue_depth
382 );
383
384 out.push_str("# HELP stygian_request_duration_ms Request duration distribution (ms)\n");
386 out.push_str("# TYPE stygian_request_duration_ms histogram\n");
387 let labels = [10, 50, 100, 250, 500, 1000, 2500, 5000, 10000];
388 for (i, bound) in labels.iter().enumerate() {
389 out.push_str(&format!(
390 "stygian_request_duration_ms_bucket{{le=\"{bound}\"}} {}\n",
391 self.request_duration_buckets[i].load(Ordering::Relaxed)
392 ));
393 }
394 out.push_str(&format!(
395 "stygian_request_duration_ms_bucket{{le=\"+Inf\"}} {}\n",
396 self.request_duration_buckets[9].load(Ordering::Relaxed)
397 ));
398 out.push_str(&format!(
399 "stygian_request_duration_ms_sum {}\n",
400 self.request_duration_sum.load(Ordering::Relaxed)
401 ));
402 out.push_str(&format!(
403 "stygian_request_duration_ms_count {}\n",
404 snap.requests_total
405 ));
406
407 out.push_str("# HELP stygian_pipeline_duration_ms Pipeline execution duration (ms)\n");
409 out.push_str("# TYPE stygian_pipeline_duration_ms histogram\n");
410 for (i, bound) in labels.iter().enumerate() {
411 out.push_str(&format!(
412 "stygian_pipeline_duration_ms_bucket{{le=\"{bound}\"}} {}\n",
413 self.pipeline_duration_buckets[i].load(Ordering::Relaxed)
414 ));
415 }
416 out.push_str(&format!(
417 "stygian_pipeline_duration_ms_bucket{{le=\"+Inf\"}} {}\n",
418 self.pipeline_duration_buckets[9].load(Ordering::Relaxed)
419 ));
420 let _ = writeln!(
421 &mut out,
422 "stygian_pipeline_duration_ms_sum {}",
423 self.pipeline_duration_sum.load(Ordering::Relaxed)
424 );
425 let _ = writeln!(
426 &mut out,
427 "stygian_pipeline_duration_ms_count {}",
428 snap.pipelines_total
429 );
430
431 out
432 }
433
434 fn service_counters(&self, name: &str) -> Arc<ServiceCounters> {
435 {
436 let read = self.services.read().unwrap();
437 if let Some(c) = read.get(name) {
438 return Arc::clone(c);
439 }
440 }
441 let mut write = self.services.write().unwrap();
442 write
443 .entry(name.to_string())
444 .or_insert_with(|| Arc::new(ServiceCounters::default()))
445 .clone()
446 }
447}
448
449impl Default for MetricsRegistry {
450 fn default() -> Self {
451 Self::new()
452 }
453}
454
455#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
461pub struct MetricsSnapshot {
462 pub requests_total: u64,
464 pub errors_total: u64,
466 pub cache_hits_total: u64,
468 pub cache_misses_total: u64,
470 pub pipelines_total: u64,
472 pub pipeline_errors_total: u64,
474 pub input_tokens_total: u64,
476 pub output_tokens_total: u64,
478 pub active_workers: i64,
480 pub queue_depth: i64,
482}
483
484impl MetricsSnapshot {
485 #[allow(clippy::cast_precision_loss)]
500 #[must_use]
501 pub fn cache_hit_rate(&self) -> f64 {
502 let total = self.cache_hits_total + self.cache_misses_total;
503 if total == 0 {
504 0.0
505 } else {
506 self.cache_hits_total as f64 / total as f64
507 }
508 }
509
510 #[allow(clippy::cast_precision_loss)]
524 #[must_use]
525 pub fn error_rate(&self) -> f64 {
526 if self.requests_total == 0 {
527 0.0
528 } else {
529 self.errors_total as f64 / self.requests_total as f64
530 }
531 }
532}
533
534#[must_use]
549pub fn global_metrics() -> &'static MetricsRegistry {
550 static INSTANCE: LazyLock<MetricsRegistry> = LazyLock::new(MetricsRegistry::new);
551 &INSTANCE
552}
553
554#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
558pub enum LogFormat {
559 #[default]
561 Pretty,
562 Json,
564 Compact,
566}
567
568#[derive(Debug, Clone)]
585pub struct TracingInit {
586 pub format: LogFormat,
588 pub env_filter: String,
590}
591
592impl Default for TracingInit {
593 fn default() -> Self {
594 Self {
595 format: LogFormat::Pretty,
596 env_filter: std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
597 }
598 }
599}
600
601impl TracingInit {
602 pub fn init(self) {
619 use tracing_subscriber::EnvFilter;
620
621 let filter =
622 EnvFilter::try_new(&self.env_filter).unwrap_or_else(|_| EnvFilter::new("info"));
623
624 match self.format {
625 LogFormat::Pretty => {
626 let _ = tracing_subscriber::fmt()
627 .with_env_filter(filter)
628 .with_target(true)
629 .pretty()
630 .try_init();
631 }
632 LogFormat::Json => {
633 let _ = tracing_subscriber::fmt()
634 .with_env_filter(filter)
635 .with_target(true)
636 .json()
637 .try_init();
638 }
639 LogFormat::Compact => {
640 let _ = tracing_subscriber::fmt()
641 .with_env_filter(filter)
642 .with_target(false)
643 .compact()
644 .try_init();
645 }
646 }
647 }
648}
649
650#[cfg(test)]
653#[allow(clippy::unwrap_used, clippy::float_cmp, clippy::indexing_slicing)]
654mod tests {
655 use super::*;
656
657 fn registry() -> MetricsRegistry {
658 MetricsRegistry::new()
659 }
660
661 #[test]
662 fn new_registry_starts_at_zero() {
663 let snap = registry().snapshot();
664 assert_eq!(snap.requests_total, 0);
665 assert_eq!(snap.errors_total, 0);
666 assert_eq!(snap.cache_hits_total, 0);
667 assert_eq!(snap.pipelines_total, 0);
668 }
669
670 #[test]
671 fn request_started_increments_counter() {
672 let r = registry();
673 r.record(MetricEvent::RequestStarted {
674 service: "http".into(),
675 });
676 r.record(MetricEvent::RequestStarted {
677 service: "claude".into(),
678 });
679 assert_eq!(r.snapshot().requests_total, 2);
680 }
681
682 #[test]
683 fn request_completed_failure_increments_errors() {
684 let r = registry();
685 r.record(MetricEvent::RequestStarted {
686 service: "http".into(),
687 });
688 r.record(MetricEvent::RequestCompleted {
689 service: "http".into(),
690 duration_ms: 500,
691 success: false,
692 });
693 let snap = r.snapshot();
694 assert_eq!(snap.errors_total, 1);
695 assert!((snap.error_rate() - 1.0).abs() < f64::EPSILON);
696 }
697
698 #[test]
699 fn request_completed_success_does_not_increment_errors() {
700 let r = registry();
701 r.record(MetricEvent::RequestStarted {
702 service: "http".into(),
703 });
704 r.record(MetricEvent::RequestCompleted {
705 service: "http".into(),
706 duration_ms: 100,
707 success: true,
708 });
709 assert_eq!(r.snapshot().errors_total, 0);
710 }
711
712 #[test]
713 fn cache_hit_rate_calculation() {
714 let r = registry();
715 r.record(MetricEvent::CacheAccess { hit: true });
716 r.record(MetricEvent::CacheAccess { hit: true });
717 r.record(MetricEvent::CacheAccess { hit: false });
718 let snap = r.snapshot();
719 assert_eq!(snap.cache_hits_total, 2);
720 assert_eq!(snap.cache_misses_total, 1);
721 let rate = snap.cache_hit_rate();
722 assert!((rate - 2.0 / 3.0).abs() < 1e-10);
723 }
724
725 #[test]
726 fn cache_hit_rate_zero_when_no_accesses() {
727 let snap = registry().snapshot();
728 assert_eq!(snap.cache_hit_rate(), 0.0);
729 }
730
731 #[test]
732 fn token_usage_accumulates() {
733 let r = registry();
734 r.record(MetricEvent::TokenUsage {
735 provider: "claude".into(),
736 input_tokens: 1000,
737 output_tokens: 500,
738 });
739 r.record(MetricEvent::TokenUsage {
740 provider: "openai".into(),
741 input_tokens: 200,
742 output_tokens: 100,
743 });
744 let snap = r.snapshot();
745 assert_eq!(snap.input_tokens_total, 1200);
746 assert_eq!(snap.output_tokens_total, 600);
747 }
748
749 #[test]
750 fn worker_gauge_reflects_changes() {
751 let r = registry();
752 r.record(MetricEvent::WorkerCountChanged { count: 4 });
753 assert_eq!(r.snapshot().active_workers, 4);
754 r.record(MetricEvent::WorkerCountChanged { count: 2 });
755 assert_eq!(r.snapshot().active_workers, 2);
756 }
757
758 #[test]
759 fn queue_depth_gauge_reflects_changes() {
760 let r = registry();
761 r.record(MetricEvent::QueueDepthChanged { depth: 10 });
762 assert_eq!(r.snapshot().queue_depth, 10);
763 }
764
765 #[test]
766 fn pipeline_executed_increments_pipelines_counter() {
767 let r = registry();
768 r.record(MetricEvent::PipelineExecuted {
769 pipeline_id: "test".into(),
770 duration_ms: 250,
771 success: true,
772 });
773 assert_eq!(r.snapshot().pipelines_total, 1);
774 assert_eq!(r.snapshot().pipeline_errors_total, 0);
775 }
776
777 #[test]
778 fn pipeline_failure_increments_errors() {
779 let r = registry();
780 r.record(MetricEvent::PipelineExecuted {
781 pipeline_id: "test".into(),
782 duration_ms: 100,
783 success: false,
784 });
785 assert_eq!(r.snapshot().pipeline_errors_total, 1);
786 }
787
788 #[test]
789 fn render_prometheus_contains_required_metric_names() {
790 let r = registry();
791 r.record(MetricEvent::RequestStarted {
792 service: "http".into(),
793 });
794 let text = r.render_prometheus();
795 assert!(text.contains("stygian_requests_total"));
796 assert!(text.contains("stygian_errors_total"));
797 assert!(text.contains("stygian_cache_hits_total"));
798 assert!(text.contains("stygian_active_workers"));
799 assert!(text.contains("stygian_request_duration_ms_bucket"));
800 assert!(text.contains("stygian_pipeline_duration_ms_bucket"));
801 }
802
803 #[test]
804 fn tracing_init_default_does_not_panic() {
805 TracingInit::default().init();
807 }
808}