serverless_fn/
telemetry.rs1use std::sync::OnceLock;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Instant;
8
9use crate::error::ServerlessError;
10
11pub struct MetricsCollector {
13 invocation_count: AtomicU64,
14 total_duration_ms: AtomicU64,
15}
16
17impl MetricsCollector {
18 #[must_use]
20 pub fn new() -> Self {
21 Self {
22 invocation_count: AtomicU64::new(0),
23 total_duration_ms: AtomicU64::new(0),
24 }
25 }
26
27 pub fn record_invocation(&self, function_name: &str, duration_ms: u64, success: bool) {
35 if cfg!(feature = "telemetry") {
36 self.invocation_count.fetch_add(1, Ordering::SeqCst);
37 self.total_duration_ms
38 .fetch_add(duration_ms, Ordering::SeqCst);
39
40 tracing::info!(
41 "Function invocation: {} - Success: {}, Duration: {}ms",
42 function_name,
43 success,
44 duration_ms
45 );
46 }
47 }
48
49 #[must_use]
51 pub fn avg_duration_ms(&self) -> f64 {
52 let count = self.invocation_count.load(Ordering::SeqCst);
53 if count == 0 {
54 return 0.0;
55 }
56 let total = self.total_duration_ms.load(Ordering::SeqCst);
57 total as f64 / count as f64
58 }
59
60 #[must_use]
62 pub fn invocation_count(&self) -> u64 {
63 self.invocation_count.load(Ordering::SeqCst)
64 }
65}
66
67impl Default for MetricsCollector {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73pub struct TelemetryContext {
75 start_time: Instant,
76 function_name: String,
77 trace_id: String,
78}
79
80impl TelemetryContext {
81 #[must_use]
83 pub fn new(function_name: &str) -> Self {
84 Self {
85 start_time: Instant::now(),
86 function_name: function_name.to_string(),
87 trace_id: uuid::Uuid::new_v4().to_string(),
88 }
89 }
90
91 pub fn record_completion(&self, success: bool, error: Option<&ServerlessError>) {
98 if cfg!(feature = "telemetry") {
99 let duration_ms = self.start_time.elapsed().as_millis() as u64;
100
101 if let Some(err) = error {
102 tracing::error!(
103 trace_id = %self.trace_id,
104 function = %self.function_name,
105 duration_ms = duration_ms,
106 error = %err,
107 "Function execution failed"
108 );
109 } else {
110 tracing::info!(
111 trace_id = %self.trace_id,
112 function = %self.function_name,
113 duration_ms = duration_ms,
114 "Function execution completed"
115 );
116 }
117
118 get_metrics_collector().record_invocation(&self.function_name, duration_ms, success);
119 }
120 }
121}
122
123static METRICS_COLLECTOR: OnceLock<MetricsCollector> = OnceLock::new();
125
126#[must_use]
128pub fn get_metrics_collector() -> &'static MetricsCollector {
129 METRICS_COLLECTOR.get_or_init(MetricsCollector::new)
130}
131
132pub fn init_telemetry() {
134 if cfg!(feature = "telemetry") {
135 tracing::info!("Telemetry system initialized");
136 }
137}
138
139pub async fn telemetry_middleware<F, R>(
145 function_name: &str,
146 operation: F,
147) -> Result<R, ServerlessError>
148where
149 F: std::future::Future<Output = Result<R, ServerlessError>>,
150{
151 if cfg!(feature = "telemetry") {
152 let ctx = TelemetryContext::new(function_name);
153 match operation.await {
154 Ok(result) => {
155 ctx.record_completion(true, None);
156 Ok(result)
157 }
158 Err(err) => {
159 ctx.record_completion(false, Some(&err));
160 Err(err)
161 }
162 }
163 } else {
164 operation.await
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn test_metrics_collector_initial_state() {
174 let collector = MetricsCollector::new();
175 assert_eq!(collector.invocation_count(), 0);
176 assert_eq!(collector.avg_duration_ms(), 0.0);
177 }
178
179 #[test]
180 fn test_metrics_collector_records_invocation() {
181 let collector = MetricsCollector::new();
182
183 collector.record_invocation("test_func", 100, true);
185 collector.record_invocation("test_func", 200, true);
186
187 let count = collector.invocation_count();
188 let avg_duration = collector.avg_duration_ms();
189
190 if cfg!(feature = "telemetry") {
191 assert_eq!(count, 2);
192 assert_eq!(avg_duration, 150.0);
193 } else {
194 assert_eq!(count, 0);
195 assert_eq!(avg_duration, 0.0);
196 }
197 }
198}