mistralrs_core/engine/
logger.rs1#![allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
2
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::thread;
6use std::time::Duration;
7
8use tracing::info;
9
10pub struct IntervalLogger {
11 enable_logging: Arc<AtomicBool>,
12 prefix_cache_hits: Arc<AtomicUsize>,
13 tokens_processed: Arc<AtomicUsize>,
14 total_new_seqs: Arc<AtomicUsize>,
15 num_running: Arc<AtomicUsize>,
16 num_waiting: Arc<AtomicUsize>,
17 encoder_cache_hits: Option<Arc<AtomicUsize>>,
18 encoder_cache_misses: Option<Arc<AtomicUsize>>,
19}
20
21impl IntervalLogger {
22 pub fn new(
24 interval: Duration,
25 encoder_cache_counters: Option<(Arc<AtomicUsize>, Arc<AtomicUsize>)>,
26 ) -> Self {
27 let prefix_cache_hits = Arc::new(AtomicUsize::new(0));
28 let tokens_processed = Arc::new(AtomicUsize::new(0));
29 let total_new_seqs = Arc::new(AtomicUsize::new(0));
30 let enable_logging = Arc::new(AtomicBool::new(false));
31 let num_running = Arc::new(AtomicUsize::new(0));
32 let num_waiting = Arc::new(AtomicUsize::new(0));
33
34 let t_prefix_cache_hits = prefix_cache_hits.clone();
35 let t_tokens_processed = tokens_processed.clone();
36 let t_total_new_seqs = total_new_seqs.clone();
37 let t_enable_logging = enable_logging.clone();
38 let t_num_running = num_running.clone();
39 let t_num_waiting = num_waiting.clone();
40 let (encoder_cache_hits, encoder_cache_misses) = match encoder_cache_counters {
41 Some((h, m)) => (Some(h), Some(m)),
42 None => (None, None),
43 };
44 let t_enc_hits = encoder_cache_hits.clone();
45 let t_enc_misses = encoder_cache_misses.clone();
46 thread::spawn(move || {
47 loop {
49 thread::sleep(interval);
50 if !t_enable_logging.load(Ordering::Relaxed) {
51 continue;
52 }
53
54 let total_new_seqs = t_total_new_seqs.load(Ordering::Relaxed);
55 let prefix_cache_hits = t_prefix_cache_hits.load(Ordering::Relaxed);
56 let tokens_processed = t_tokens_processed.swap(0, Ordering::Relaxed);
57 let num_running = t_num_running.load(Ordering::Relaxed);
58 let num_waiting = t_num_waiting.load(Ordering::Relaxed);
59
60 if total_new_seqs != 0 && tokens_processed != 0 {
61 let enc_cache_info =
62 if let (Some(ref hits), Some(ref misses)) = (&t_enc_hits, &t_enc_misses) {
63 let h = hits.load(Ordering::Relaxed);
64 let m = misses.load(Ordering::Relaxed);
65 let total = h + m;
66 if total > 0 {
67 format!(
68 ", Encoder cache hitrate {:.2}%",
69 100. * h as f64 / total as f64
70 )
71 } else {
72 String::new()
73 }
74 } else {
75 String::new()
76 };
77
78 info!(
83 "Throughput (T/s) {:.2}, Prefix cache hitrate {:.2}%{enc_cache_info}, {num_running} running, {num_waiting} waiting",
84 tokens_processed as f64 / interval.as_secs_f64(),
85 100. * prefix_cache_hits as f64 / total_new_seqs as f64,
86 );
87 }
88 }
89 });
90
91 Self {
92 prefix_cache_hits,
93 tokens_processed,
94 total_new_seqs,
95 enable_logging,
96 num_running,
97 num_waiting,
98 encoder_cache_hits,
99 encoder_cache_misses,
100 }
101 }
102
103 pub fn enable_logging(&self) {
104 self.enable_logging.store(true, Ordering::Relaxed);
105 }
106
107 pub fn reset(&self) {
109 self.prefix_cache_hits.store(0, Ordering::Relaxed);
110 self.tokens_processed.store(0, Ordering::Relaxed);
111 self.total_new_seqs.store(0, Ordering::Relaxed);
112 self.num_running.store(0, Ordering::Relaxed);
113 self.num_waiting.store(0, Ordering::Relaxed);
114 if let Some(ref hits) = self.encoder_cache_hits {
115 hits.store(0, Ordering::Relaxed);
116 }
117 if let Some(ref misses) = self.encoder_cache_misses {
118 misses.store(0, Ordering::Relaxed);
119 }
120 }
121
122 pub fn add_tokens_processed(&self, num_tokens: usize) {
123 self.tokens_processed
124 .fetch_add(num_tokens, Ordering::Relaxed);
125 }
126
127 pub fn add_new_sequence(&self) {
128 self.total_new_seqs.fetch_add(1, Ordering::Relaxed);
129 }
130
131 pub fn add_prefix_cache_hit(&self) {
132 self.prefix_cache_hits.fetch_add(1, Ordering::Relaxed);
133 }
134
135 pub fn set_num_running(&self, running: usize) {
136 self.num_running.store(running, Ordering::Relaxed);
137 }
138
139 pub fn set_num_waiting(&self, waiting: usize) {
140 self.num_waiting.store(waiting, Ordering::Relaxed);
141 }
142
143 pub fn prefix_cache_stats(&self) -> (usize, usize) {
145 (
146 self.prefix_cache_hits.load(Ordering::Relaxed),
147 self.total_new_seqs.load(Ordering::Relaxed),
148 )
149 }
150
151 pub fn encoder_cache_stats(&self) -> Option<(usize, usize)> {
153 match (&self.encoder_cache_hits, &self.encoder_cache_misses) {
154 (Some(h), Some(m)) => Some((h.load(Ordering::Relaxed), m.load(Ordering::Relaxed))),
155 _ => None,
156 }
157 }
158}