1use std::collections::VecDeque;
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use crate::Commands;
16use crate::message::{Agg, Op, Part};
17use crate::shard::Shard;
18use kevy_resp::{ArgvView, encode_array_len, encode_bulk, encode_integer};
19
20#[derive(Debug, Clone)]
22pub struct SlowlogEntry {
23 pub id: u64,
27 pub timestamp_secs: i64,
29 pub micros: u64,
31 pub argv: Vec<Vec<u8>>,
35 pub client_addr: Vec<u8>,
39 pub client_name: Vec<u8>,
41}
42
43pub(crate) struct SlowlogState {
46 pub(crate) buf: VecDeque<SlowlogEntry>,
47 pub(crate) slower_than_micros: i64,
52 pub(crate) max_len: u32,
54 pub(crate) next_local_seq: u64,
57}
58
59impl SlowlogState {
60 pub(crate) fn new(slower_than_micros: i64, max_len: u32) -> Self {
61 Self {
62 buf: VecDeque::with_capacity(max_len.min(1024) as usize),
63 slower_than_micros,
64 max_len,
65 next_local_seq: 0,
66 }
67 }
68}
69
70const MAX_ARGV_RECORDED: usize = 32;
74
75const MAX_ARG_BYTES_RECORDED: usize = 128;
78
79impl<C: Commands> Shard<C> {
80 #[inline]
86 pub(crate) fn slowlog_record<A: ArgvView + ?Sized>(
87 &mut self,
88 args: &A,
89 elapsed_micros: u64,
90 ) {
91 let threshold = self.slowlog.slower_than_micros;
92 if threshold < 0 {
93 return;
94 }
95 if (elapsed_micros as i64) < threshold {
101 return;
102 }
103 let local_seq = self.slowlog.next_local_seq;
104 self.slowlog.next_local_seq = self.slowlog.next_local_seq.wrapping_add(1);
105 let id = ((self.id as u64) << 48) | (local_seq & 0x0000_FFFF_FFFF_FFFF);
108 let timestamp_secs = SystemTime::now()
109 .duration_since(UNIX_EPOCH)
110 .map_or(0, |d| d.as_secs() as i64);
111 let mut argv: Vec<Vec<u8>> = Vec::with_capacity(args.len().min(MAX_ARGV_RECORDED));
112 for i in 0..args.len().min(MAX_ARGV_RECORDED) {
113 let a = &args[i];
114 if a.len() > MAX_ARG_BYTES_RECORDED {
115 argv.push(a[..MAX_ARG_BYTES_RECORDED].to_vec());
116 } else {
117 argv.push(a.to_vec());
118 }
119 }
120 self.slowlog.buf.push_back(SlowlogEntry {
121 id,
122 timestamp_secs,
123 micros: elapsed_micros,
124 argv,
125 client_addr: Vec::new(),
126 client_name: Vec::new(),
127 });
128 let cap = self.slowlog.max_len as usize;
129 while self.slowlog.buf.len() > cap {
130 self.slowlog.buf.pop_front();
131 }
132 }
133
134 pub(crate) fn start_slowlog(&mut self, conn_id: u64, seq: u64, sub: SlowlogSub) {
138 match sub {
139 SlowlogSub::Help => self.slowlog_immediate(conn_id, seq, slowlog_help_bytes()),
140 SlowlogSub::Err(b) => self.slowlog_immediate(conn_id, seq, b),
141 SlowlogSub::Reset => {
142 self.slowlog_fanout(conn_id, seq, Agg::AllOk, || Op::SlowlogReset);
143 }
144 SlowlogSub::Len => {
145 self.slowlog_fanout(conn_id, seq, Agg::SumInt(0), || Op::SlowlogLen);
146 }
147 SlowlogSub::Get(count) => self.slowlog_fanout(
148 conn_id,
149 seq,
150 Agg::SlowlogGet { count, entries: Vec::new() },
151 || Op::SlowlogGet,
152 ),
153 }
154 }
155
156 fn slowlog_immediate(&mut self, conn_id: u64, seq: u64, bytes: Vec<u8>) {
157 self.push_pending_slot(conn_id, 1, Agg::First(None), false);
158 self.fold(conn_id, seq, Part::Reply(crate::message::SmallReply::from_vec(bytes)));
159 }
160
161 fn slowlog_fanout(
162 &mut self,
163 conn_id: u64,
164 seq: u64,
165 agg: Agg,
166 mk_op: impl Fn() -> Op,
167 ) {
168 let targets: Vec<(usize, Op)> = (0..self.nshards).map(|s| (s, mk_op())).collect();
169 self.push_pending_slot(conn_id, targets.len() as u32, agg, false);
170 self.dispatch_targets(conn_id, seq, targets);
171 }
172}
173
174#[derive(Debug, Clone)]
177pub enum SlowlogSub {
178 Get(Option<i64>),
181 Len,
183 Reset,
185 Help,
187 Err(Vec<u8>),
191}
192
193pub(crate) fn encode_slowlog_get(count: Option<i64>, mut entries: Vec<SlowlogEntry>) -> Vec<u8> {
199 entries.sort_by(|a, b| {
200 b.timestamp_secs
201 .cmp(&a.timestamp_secs)
202 .then_with(|| b.id.cmp(&a.id))
203 });
204 let limit = match count {
205 None => 10,
206 Some(n) if n < 0 => entries.len(),
207 Some(n) => n as usize,
208 };
209 let n = entries.len().min(limit);
210 let mut out = Vec::with_capacity(64 + n * 64);
211 encode_array_len(&mut out, n as i64);
212 for e in entries.iter().take(n) {
213 encode_array_len(&mut out, 6);
214 encode_integer(&mut out, e.id as i64);
215 encode_integer(&mut out, e.timestamp_secs);
216 encode_integer(&mut out, e.micros as i64);
217 encode_array_len(&mut out, e.argv.len() as i64);
218 for a in &e.argv {
219 encode_bulk(&mut out, a);
220 }
221 encode_bulk(&mut out, &e.client_addr);
222 encode_bulk(&mut out, &e.client_name);
223 }
224 out
225}
226
227pub(crate) fn slowlog_help_bytes() -> Vec<u8> {
229 const LINES: &[&str] = &[
230 "SLOWLOG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
231 "GET [<count>]",
232 " Return top <count> entries from the slowlog (default: 10, -1 mean all).",
233 " Entries are made of:",
234 " id, timestamp, time in microseconds, arguments array, client IP and port,",
235 " client name",
236 "LEN",
237 " Return the length of the slowlog.",
238 "RESET",
239 " Reset the slowlog.",
240 "HELP",
241 " Print this help.",
242 ];
243 let mut out = Vec::with_capacity(512);
244 encode_array_len(&mut out, LINES.len() as i64);
245 for l in LINES {
246 encode_bulk(&mut out, l.as_bytes());
247 }
248 out
249}
250
251pub fn parse_slowlog_sub<A: ArgvView + ?Sized>(args: &A) -> SlowlogSub {
256 let Some(sub) = args.get(1) else {
257 return SlowlogSub::Err(slowlog_err_bytes("wrong number of arguments for 'slowlog'"));
258 };
259 let mut buf = [0u8; 16];
260 let upper = ascii_upper_into(sub, &mut buf);
261 match upper {
262 b"GET" => parse_slowlog_get(args),
263 b"LEN" if args.len() == 2 => SlowlogSub::Len,
264 b"RESET" if args.len() == 2 => SlowlogSub::Reset,
265 b"HELP" => SlowlogSub::Help,
266 b"LEN" | b"RESET" => SlowlogSub::Err(slowlog_arg_count_err(upper)),
267 _ => SlowlogSub::Err(slowlog_unknown_sub_err(sub)),
268 }
269}
270
271fn parse_slowlog_get<A: ArgvView + ?Sized>(args: &A) -> SlowlogSub {
272 if args.len() == 2 {
273 return SlowlogSub::Get(None);
274 }
275 if args.len() != 3 {
276 return SlowlogSub::Err(slowlog_err_bytes(
277 "wrong number of arguments for 'slowlog|get'",
278 ));
279 }
280 match std::str::from_utf8(&args[2]).ok().and_then(|s| s.parse::<i64>().ok()) {
281 Some(n) => SlowlogSub::Get(Some(n)),
282 None => SlowlogSub::Err(slowlog_err_bytes("value is not an integer or out of range")),
283 }
284}
285
286fn slowlog_arg_count_err(sub_upper: &[u8]) -> Vec<u8> {
287 let lower: String = sub_upper.iter().map(|b| b.to_ascii_lowercase() as char).collect();
288 slowlog_err_bytes(&format!("wrong number of arguments for 'slowlog|{lower}'"))
289}
290
291fn slowlog_unknown_sub_err(sub: &[u8]) -> Vec<u8> {
292 let msg = format!(
293 "ERR Unknown SLOWLOG subcommand or wrong number of arguments for '{}'",
294 String::from_utf8_lossy(sub),
295 );
296 let mut out = Vec::with_capacity(msg.len() + 3);
297 out.push(b'-');
298 out.extend_from_slice(msg.as_bytes());
299 out.extend_from_slice(b"\r\n");
300 out
301}
302
303fn slowlog_err_bytes(msg: &str) -> Vec<u8> {
304 let mut out = Vec::with_capacity(msg.len() + 7);
305 out.extend_from_slice(b"-ERR ");
306 out.extend_from_slice(msg.as_bytes());
307 out.extend_from_slice(b"\r\n");
308 out
309}
310
311fn ascii_upper_into<'a>(src: &[u8], buf: &'a mut [u8; 16]) -> &'a [u8] {
312 let n = src.len().min(buf.len());
313 for i in 0..n {
314 buf[i] = src[i].to_ascii_uppercase();
315 }
316 &buf[..n]
317}