Skip to main content

fastmcp_console/stats/
mod.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
3use std::time::{Duration, Instant};
4
5mod renderer;
6
7pub use renderer::StatsRenderer;
8
9/// Thread-safe server statistics collector.
10#[derive(Debug, Clone)]
11pub struct ServerStats {
12    inner: Arc<ServerStatsInner>,
13}
14
15#[derive(Debug)]
16struct ServerStatsInner {
17    start_time: Instant,
18    total_requests: AtomicU64,
19    successful_requests: AtomicU64,
20    failed_requests: AtomicU64,
21    cancelled_requests: AtomicU64,
22    tool_calls: AtomicU64,
23    resource_reads: AtomicU64,
24    prompt_gets: AtomicU64,
25    list_operations: AtomicU64,
26    total_latency_micros: AtomicU64,
27    max_latency_micros: AtomicU64,
28    min_latency_micros: AtomicU64,
29    active_connections: AtomicUsize,
30    total_connections: AtomicU64,
31    bytes_received: AtomicU64,
32    bytes_sent: AtomicU64,
33}
34
35impl Default for ServerStats {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl ServerStats {
42    /// Create a new metrics collector with zeroed counters.
43    #[must_use]
44    pub fn new() -> Self {
45        Self {
46            inner: Arc::new(ServerStatsInner {
47                start_time: Instant::now(),
48                total_requests: AtomicU64::new(0),
49                successful_requests: AtomicU64::new(0),
50                failed_requests: AtomicU64::new(0),
51                cancelled_requests: AtomicU64::new(0),
52                tool_calls: AtomicU64::new(0),
53                resource_reads: AtomicU64::new(0),
54                prompt_gets: AtomicU64::new(0),
55                list_operations: AtomicU64::new(0),
56                total_latency_micros: AtomicU64::new(0),
57                max_latency_micros: AtomicU64::new(0),
58                min_latency_micros: AtomicU64::new(u64::MAX),
59                active_connections: AtomicUsize::new(0),
60                total_connections: AtomicU64::new(0),
61                bytes_received: AtomicU64::new(0),
62                bytes_sent: AtomicU64::new(0),
63            }),
64        }
65    }
66
67    /// Record a completed request.
68    pub fn record_request(&self, method: &str, latency: Duration, success: bool) {
69        self.record_request_base(method, latency);
70        if success {
71            self.inner
72                .successful_requests
73                .fetch_add(1, Ordering::Relaxed);
74        } else {
75            self.inner.failed_requests.fetch_add(1, Ordering::Relaxed);
76        }
77    }
78
79    /// Record a cancelled request.
80    pub fn record_cancelled(&self, method: &str, latency: Duration) {
81        self.record_request_base(method, latency);
82        self.inner
83            .cancelled_requests
84            .fetch_add(1, Ordering::Relaxed);
85    }
86
87    /// Record a new client connection.
88    pub fn connection_opened(&self) {
89        self.inner
90            .active_connections
91            .fetch_add(1, Ordering::Relaxed);
92        self.inner.total_connections.fetch_add(1, Ordering::Relaxed);
93    }
94
95    /// Record a closed client connection.
96    pub fn connection_closed(&self) {
97        let mut current = self.inner.active_connections.load(Ordering::Relaxed);
98        loop {
99            if current == 0 {
100                return;
101            }
102            match self.inner.active_connections.compare_exchange_weak(
103                current,
104                current - 1,
105                Ordering::Relaxed,
106                Ordering::Relaxed,
107            ) {
108                Ok(_) => return,
109                Err(next) => current = next,
110            }
111        }
112    }
113
114    /// Add to the received byte counter.
115    pub fn add_bytes_received(&self, bytes: u64) {
116        self.inner
117            .bytes_received
118            .fetch_add(bytes, Ordering::Relaxed);
119    }
120
121    /// Add to the sent byte counter.
122    pub fn add_bytes_sent(&self, bytes: u64) {
123        self.inner.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
124    }
125
126    /// Get a point-in-time snapshot of all counters.
127    #[must_use]
128    pub fn snapshot(&self) -> StatsSnapshot {
129        let total = self.inner.total_requests.load(Ordering::Relaxed);
130        let total_latency = self.inner.total_latency_micros.load(Ordering::Relaxed);
131        let min_latency = self.inner.min_latency_micros.load(Ordering::Relaxed);
132        let max_latency = self.inner.max_latency_micros.load(Ordering::Relaxed);
133
134        StatsSnapshot {
135            uptime: self.inner.start_time.elapsed(),
136            total_requests: total,
137            successful_requests: self.inner.successful_requests.load(Ordering::Relaxed),
138            failed_requests: self.inner.failed_requests.load(Ordering::Relaxed),
139            cancelled_requests: self.inner.cancelled_requests.load(Ordering::Relaxed),
140            tool_calls: self.inner.tool_calls.load(Ordering::Relaxed),
141            resource_reads: self.inner.resource_reads.load(Ordering::Relaxed),
142            prompt_gets: self.inner.prompt_gets.load(Ordering::Relaxed),
143            list_operations: self.inner.list_operations.load(Ordering::Relaxed),
144            avg_latency: total_latency
145                .checked_div(total)
146                .map(Duration::from_micros)
147                .unwrap_or(Duration::ZERO),
148            max_latency: Duration::from_micros(max_latency),
149            min_latency: if min_latency == u64::MAX {
150                Duration::ZERO
151            } else {
152                Duration::from_micros(min_latency)
153            },
154            active_connections: self.inner.active_connections.load(Ordering::Relaxed),
155            total_connections: self.inner.total_connections.load(Ordering::Relaxed),
156            bytes_received: self.inner.bytes_received.load(Ordering::Relaxed),
157            bytes_sent: self.inner.bytes_sent.load(Ordering::Relaxed),
158        }
159    }
160
161    fn record_request_base(&self, method: &str, latency: Duration) {
162        self.inner.total_requests.fetch_add(1, Ordering::Relaxed);
163
164        if method.starts_with("tools/") {
165            self.inner.tool_calls.fetch_add(1, Ordering::Relaxed);
166        } else if method.starts_with("resources/") {
167            self.inner.resource_reads.fetch_add(1, Ordering::Relaxed);
168        } else if method.starts_with("prompts/") {
169            self.inner.prompt_gets.fetch_add(1, Ordering::Relaxed);
170        }
171
172        if method.contains("list") {
173            self.inner.list_operations.fetch_add(1, Ordering::Relaxed);
174        }
175
176        let micros = u64::try_from(latency.as_micros()).unwrap_or(u64::MAX);
177        self.inner
178            .total_latency_micros
179            .fetch_add(micros, Ordering::Relaxed);
180        self.inner
181            .max_latency_micros
182            .fetch_max(micros, Ordering::Relaxed);
183        self.inner
184            .min_latency_micros
185            .fetch_min(micros, Ordering::Relaxed);
186    }
187}
188
189/// Point-in-time snapshot of server statistics.
190#[derive(Debug, Clone)]
191pub struct StatsSnapshot {
192    pub uptime: Duration,
193    pub total_requests: u64,
194    pub successful_requests: u64,
195    pub failed_requests: u64,
196    pub cancelled_requests: u64,
197    pub tool_calls: u64,
198    pub resource_reads: u64,
199    pub prompt_gets: u64,
200    pub list_operations: u64,
201    pub avg_latency: Duration,
202    pub max_latency: Duration,
203    pub min_latency: Duration,
204    pub active_connections: usize,
205    pub total_connections: u64,
206    pub bytes_received: u64,
207    pub bytes_sent: u64,
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use std::thread;
214
215    #[test]
216    fn test_record_request_counts() {
217        let stats = ServerStats::new();
218        stats.record_request("tools/call", Duration::from_millis(10), true);
219        stats.record_request("resources/read", Duration::from_millis(20), false);
220        stats.record_request("prompts/get", Duration::from_millis(5), true);
221        stats.record_request("tools/list", Duration::from_millis(7), true);
222
223        let snap = stats.snapshot();
224        assert_eq!(snap.total_requests, 4);
225        assert_eq!(snap.successful_requests, 3);
226        assert_eq!(snap.failed_requests, 1);
227        assert_eq!(snap.cancelled_requests, 0);
228        assert_eq!(snap.tool_calls, 2);
229        assert_eq!(snap.resource_reads, 1);
230        assert_eq!(snap.prompt_gets, 1);
231        assert_eq!(snap.list_operations, 1);
232        assert_eq!(snap.max_latency, Duration::from_millis(20));
233        assert_eq!(snap.min_latency, Duration::from_millis(5));
234    }
235
236    #[test]
237    fn test_snapshot_latency() {
238        let stats = ServerStats::new();
239        stats.record_request("tools/call", Duration::from_millis(10), true);
240        stats.record_request("tools/call", Duration::from_millis(20), true);
241
242        let snap = stats.snapshot();
243        assert_eq!(snap.avg_latency, Duration::from_millis(15));
244        assert_eq!(snap.max_latency, Duration::from_millis(20));
245        assert_eq!(snap.min_latency, Duration::from_millis(10));
246    }
247
248    #[test]
249    fn test_concurrent_updates() {
250        let stats = ServerStats::new();
251        let mut handles = Vec::new();
252
253        for _ in 0..4 {
254            let stats = stats.clone();
255            handles.push(thread::spawn(move || {
256                for _ in 0..1_000 {
257                    stats.record_request("tools/call", Duration::from_millis(1), true);
258                }
259            }));
260        }
261
262        for _ in 0..500 {
263            stats.record_request("resources/read", Duration::from_millis(2), false);
264        }
265
266        for handle in handles {
267            handle.join().expect("thread panicked");
268        }
269
270        let snap = stats.snapshot();
271        assert_eq!(snap.total_requests, 4_500);
272        assert_eq!(snap.successful_requests, 4_000);
273        assert_eq!(snap.failed_requests, 500);
274        assert_eq!(snap.tool_calls, 4_000);
275        assert_eq!(snap.resource_reads, 500);
276    }
277
278    #[test]
279    fn test_default_and_zero_snapshot_values() {
280        let stats = ServerStats::default();
281        let snap = stats.snapshot();
282
283        assert_eq!(snap.total_requests, 0);
284        assert_eq!(snap.successful_requests, 0);
285        assert_eq!(snap.failed_requests, 0);
286        assert_eq!(snap.cancelled_requests, 0);
287        assert_eq!(snap.tool_calls, 0);
288        assert_eq!(snap.resource_reads, 0);
289        assert_eq!(snap.prompt_gets, 0);
290        assert_eq!(snap.list_operations, 0);
291        assert_eq!(snap.avg_latency, Duration::ZERO);
292        assert_eq!(snap.max_latency, Duration::ZERO);
293        assert_eq!(snap.min_latency, Duration::ZERO);
294        assert_eq!(snap.active_connections, 0);
295        assert_eq!(snap.total_connections, 0);
296        assert_eq!(snap.bytes_received, 0);
297        assert_eq!(snap.bytes_sent, 0);
298        assert!(snap.uptime <= Duration::from_secs(1));
299    }
300
301    #[test]
302    fn test_cancelled_and_method_buckets() {
303        let stats = ServerStats::new();
304        stats.record_cancelled("resources/list", Duration::from_millis(11));
305        stats.record_request("other/method", Duration::from_millis(3), true);
306
307        let snap = stats.snapshot();
308        assert_eq!(snap.total_requests, 2);
309        assert_eq!(snap.cancelled_requests, 1);
310        assert_eq!(snap.successful_requests, 1);
311        assert_eq!(snap.failed_requests, 0);
312        assert_eq!(snap.resource_reads, 1);
313        assert_eq!(snap.tool_calls, 0);
314        assert_eq!(snap.prompt_gets, 0);
315        assert_eq!(snap.list_operations, 1);
316        assert_eq!(snap.max_latency, Duration::from_millis(11));
317        assert_eq!(snap.min_latency, Duration::from_millis(3));
318    }
319
320    #[test]
321    fn test_connection_and_byte_counters() {
322        let stats = ServerStats::new();
323
324        stats.connection_opened();
325        stats.connection_opened();
326        stats.connection_closed();
327        stats.connection_closed();
328        // Should stay at zero when already zero.
329        stats.connection_closed();
330
331        stats.add_bytes_received(128);
332        stats.add_bytes_received(72);
333        stats.add_bytes_sent(64);
334
335        let snap = stats.snapshot();
336        assert_eq!(snap.active_connections, 0);
337        assert_eq!(snap.total_connections, 2);
338        assert_eq!(snap.bytes_received, 200);
339        assert_eq!(snap.bytes_sent, 64);
340    }
341
342    // =========================================================================
343    // Additional coverage tests (bd-33a2)
344    // =========================================================================
345
346    #[test]
347    fn server_stats_debug_output() {
348        let stats = ServerStats::new();
349        let debug = format!("{stats:?}");
350        assert!(debug.contains("ServerStats"));
351    }
352
353    #[test]
354    fn server_stats_clone_shares_underlying_data() {
355        let stats = ServerStats::new();
356        let clone = stats.clone();
357
358        clone.record_request("tools/call", Duration::from_millis(5), true);
359
360        // Original should see the update because they share the same Arc
361        let snap = stats.snapshot();
362        assert_eq!(snap.total_requests, 1);
363        assert_eq!(snap.tool_calls, 1);
364    }
365
366    #[test]
367    fn stats_snapshot_debug_and_clone() {
368        let stats = ServerStats::new();
369        stats.record_request("tools/call", Duration::from_millis(7), true);
370        let snap = stats.snapshot();
371
372        let debug = format!("{snap:?}");
373        assert!(debug.contains("StatsSnapshot"));
374        assert!(debug.contains("total_requests: 1"));
375
376        let cloned = snap.clone();
377        assert_eq!(cloned.total_requests, snap.total_requests);
378        assert_eq!(cloned.tool_calls, snap.tool_calls);
379        assert_eq!(cloned.max_latency, snap.max_latency);
380    }
381
382    #[test]
383    fn list_operations_detected_by_contains() {
384        let stats = ServerStats::new();
385
386        // "notifications/list" contains "list" but is not tools/resources/prompts
387        stats.record_request("notifications/list", Duration::from_millis(1), true);
388
389        let snap = stats.snapshot();
390        assert_eq!(snap.list_operations, 1);
391        assert_eq!(snap.tool_calls, 0);
392        assert_eq!(snap.resource_reads, 0);
393        assert_eq!(snap.prompt_gets, 0);
394    }
395
396    #[test]
397    fn connection_closed_concurrent_cas_loop() {
398        let stats = ServerStats::new();
399        stats.connection_opened();
400        stats.connection_opened();
401        stats.connection_opened();
402
403        // Close from multiple threads to exercise the CAS retry loop
404        let mut handles = Vec::new();
405        for _ in 0..3 {
406            let s = stats.clone();
407            handles.push(thread::spawn(move || {
408                s.connection_closed();
409            }));
410        }
411        for h in handles {
412            h.join().expect("thread panicked");
413        }
414
415        let snap = stats.snapshot();
416        assert_eq!(snap.active_connections, 0);
417        assert_eq!(snap.total_connections, 3);
418    }
419}