Skip to main content

mistralrs_core/engine/
logger.rs

1#![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
2
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use tracing::info;
9
10pub struct IntervalLogger {
11    enable_logging: Arc<AtomicBool>,
12    prefix_cache_hits: Arc<AtomicUsize>,
13    tokens_processed: Arc<AtomicUsize>,
14    total_new_seqs: Arc<AtomicUsize>,
15    num_running: Arc<AtomicUsize>,
16    num_waiting: Arc<AtomicUsize>,
17    encoder_cache_hits: Option<Arc<AtomicUsize>>,
18    encoder_cache_misses: Option<Arc<AtomicUsize>>,
19}
20
21impl IntervalLogger {
22    /// Starts an interval logger. Call `begin_logging` to begin the logging process.
23    pub fn new(
24        interval: Duration,
25        encoder_cache_counters: Option<(Arc<AtomicUsize>, Arc<AtomicUsize>)>,
26    ) -> Self {
27        let prefix_cache_hits = Arc::new(AtomicUsize::new(0));
28        let tokens_processed = Arc::new(AtomicUsize::new(0));
29        let total_new_seqs = Arc::new(AtomicUsize::new(0));
30        let enable_logging = Arc::new(AtomicBool::new(false));
31        let num_running = Arc::new(AtomicUsize::new(0));
32        let num_waiting = Arc::new(AtomicUsize::new(0));
33
34        let t_prefix_cache_hits = prefix_cache_hits.clone();
35        let t_tokens_processed = tokens_processed.clone();
36        let t_total_new_seqs = total_new_seqs.clone();
37        let t_enable_logging = enable_logging.clone();
38        let t_num_running = num_running.clone();
39        let t_num_waiting = num_waiting.clone();
40        let (encoder_cache_hits, encoder_cache_misses) = match encoder_cache_counters {
41            Some((h, m)) => (Some(h), Some(m)),
42            None => (None, None),
43        };
44        let t_enc_hits = encoder_cache_hits.clone();
45        let t_enc_misses = encoder_cache_misses.clone();
46        thread::spawn(move || {
47            // Start the actual logging
48            loop {
49                thread::sleep(interval);
50                if !t_enable_logging.load(Ordering::Relaxed) {
51                    continue;
52                }
53
54                let total_new_seqs = t_total_new_seqs.load(Ordering::Relaxed);
55                let prefix_cache_hits = t_prefix_cache_hits.load(Ordering::Relaxed);
56                let tokens_processed = t_tokens_processed.swap(0, Ordering::Relaxed);
57                let num_running = t_num_running.load(Ordering::Relaxed);
58                let num_waiting = t_num_waiting.load(Ordering::Relaxed);
59
60                if total_new_seqs != 0 && tokens_processed != 0 {
61                    let enc_cache_info =
62                        if let (Some(ref hits), Some(ref misses)) = (&t_enc_hits, &t_enc_misses) {
63                            let h = hits.load(Ordering::Relaxed);
64                            let m = misses.load(Ordering::Relaxed);
65                            let total = h + m;
66                            if total > 0 {
67                                format!(
68                                    ", Encoder cache hitrate {:.2}%",
69                                    100. * h as f64 / total as f64
70                                )
71                            } else {
72                                String::new()
73                            }
74                        } else {
75                            String::new()
76                        };
77
78                    // Throughput = tokens processed during this interval / interval duration.
79                    // Combines both prefill and decode tokens. The counter is atomically
80                    // swapped to 0 each interval, so the metric reflects only the current
81                    // window and is not cumulative.
82                    info!(
83                        "Throughput (T/s) {:.2}, Prefix cache hitrate {:.2}%{enc_cache_info}, {num_running} running, {num_waiting} waiting",
84                        tokens_processed as f64 / interval.as_secs_f64(),
85                        100. * prefix_cache_hits as f64 / total_new_seqs as f64,
86                    );
87                }
88            }
89        });
90
91        Self {
92            prefix_cache_hits,
93            tokens_processed,
94            total_new_seqs,
95            enable_logging,
96            num_running,
97            num_waiting,
98            encoder_cache_hits,
99            encoder_cache_misses,
100        }
101    }
102
103    pub fn enable_logging(&self) {
104        self.enable_logging.store(true, Ordering::Relaxed);
105    }
106
107    /// Reset all counters to zero. Call after warmup/dummy runs to get clean stats.
108    pub fn reset(&self) {
109        self.prefix_cache_hits.store(0, Ordering::Relaxed);
110        self.tokens_processed.store(0, Ordering::Relaxed);
111        self.total_new_seqs.store(0, Ordering::Relaxed);
112        self.num_running.store(0, Ordering::Relaxed);
113        self.num_waiting.store(0, Ordering::Relaxed);
114        if let Some(ref hits) = self.encoder_cache_hits {
115            hits.store(0, Ordering::Relaxed);
116        }
117        if let Some(ref misses) = self.encoder_cache_misses {
118            misses.store(0, Ordering::Relaxed);
119        }
120    }
121
122    pub fn add_tokens_processed(&self, num_tokens: usize) {
123        self.tokens_processed
124            .fetch_add(num_tokens, Ordering::Relaxed);
125    }
126
127    pub fn add_new_sequence(&self) {
128        self.total_new_seqs.fetch_add(1, Ordering::Relaxed);
129    }
130
131    pub fn add_prefix_cache_hit(&self) {
132        self.prefix_cache_hits.fetch_add(1, Ordering::Relaxed);
133    }
134
135    pub fn set_num_running(&self, running: usize) {
136        self.num_running.store(running, Ordering::Relaxed);
137    }
138
139    pub fn set_num_waiting(&self, waiting: usize) {
140        self.num_waiting.store(waiting, Ordering::Relaxed);
141    }
142
143    /// Return cumulative prefix cache (hits, total_sequences).
144    pub fn prefix_cache_stats(&self) -> (usize, usize) {
145        (
146            self.prefix_cache_hits.load(Ordering::Relaxed),
147            self.total_new_seqs.load(Ordering::Relaxed),
148        )
149    }
150
151    /// Return cumulative encoder cache (hits, misses), or `None` if no encoder cache exists.
152    pub fn encoder_cache_stats(&self) -> Option<(usize, usize)> {
153        match (&self.encoder_cache_hits, &self.encoder_cache_misses) {
154            (Some(h), Some(m)) => Some((h.load(Ordering::Relaxed), m.load(Ordering::Relaxed))),
155            _ => None,
156        }
157    }
158}