Skip to main content

skeg_cli/
stats.rs

1//! Minimal RESP3 client for the `skeg-cli stats` subcommand.
2//!
3//! Connects to a running `skeg-resp3` server, runs `HELLO 3`,
4//! `SKEG.STATS`, `SKEG.SHARDS`, and `SKEG.VINDEX.LIST`, and parses the
5//! responses into the structs below. No persistent connection, no
6//! pipelining: open, query, close. This is the read-only "what's the
7//! server doing right now" path; for live dashboards use `skeg-top`.
8
9use std::fmt;
10use std::io::{self, Read, Write};
11use std::net::TcpStream;
12use std::time::Duration;
13
14use bytes::BytesMut;
15use skeg_resp3::{Frame, ProtoVersion, encode_frame, parse_frame};
16
17const READ_CAP: usize = 64 * 1024;
18const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
19const RW_TIMEOUT: Duration = Duration::from_secs(10);
20
21/// Server-side info gathered from `HELLO 3`.
22#[derive(Debug, Clone)]
23pub struct ServerInfo {
24    pub version: String,
25    pub mode: String,
26}
27
28/// Aggregate cache + KV counters from `SKEG.STATS`.
29///
30/// The server returns a flat `cache_bytes=X evictions=Y n_keys=Z budget=B`
31/// status string; we parse the four fields. Unknown fields are ignored
32/// so that a newer server adding more counters does not break the CLI.
33#[derive(Debug, Clone, Default)]
34pub struct AggregateStats {
35    pub cache_bytes: u64,
36    pub evictions: u64,
37    pub n_keys: u64,
38    pub budget: u64,
39}
40
41/// One `SKEG.SHARDS` line, parsed into the four counters the server
42/// emits today. Unknown fields are ignored (same logic as
43/// [`AggregateStats`]).
44#[derive(Debug, Clone, Default)]
45pub struct ShardStats {
46    pub shard_id: u32,
47    pub cache_bytes: u64,
48    pub evictions: u64,
49    pub n_keys: u64,
50}
51
52/// Full report assembled from a single `stats` call.
53#[derive(Debug, Clone)]
54pub struct ServerStats {
55    pub info: ServerInfo,
56    pub aggregate: AggregateStats,
57    pub shards: Vec<ShardStats>,
58    /// `(name, dim)` pairs from `SKEG.VINDEX.LIST`. Empty if the server
59    /// has no indexes or the command returned a non-array response.
60    pub vindexes: Vec<(String, u32)>,
61}
62
63/// Connect to `addr` (e.g. `127.0.0.1:6379`), gather server stats, and
64/// return them. The TCP connection is closed before this returns.
65///
66/// # Errors
67///
68/// Returns an error if the connection fails, a read times out, the
69/// server returns a `-ERR` frame for `HELLO 3`, or a response cannot
70/// be parsed.
71pub fn fetch(addr: &str) -> io::Result<ServerStats> {
72    let mut sock = open(addr)?;
73    let info = do_hello(&mut sock)?;
74    let aggregate = parse_aggregate(&do_call(&mut sock, &["SKEG.STATS"])?);
75    let shards = parse_shards(&do_call(&mut sock, &["SKEG.SHARDS"])?);
76    let vindexes = parse_vindexes(&do_call(&mut sock, &["SKEG.VINDEX.LIST"])?);
77    Ok(ServerStats {
78        info,
79        aggregate,
80        shards,
81        vindexes,
82    })
83}
84
85fn open(addr: &str) -> io::Result<TcpStream> {
86    let socket_addr = addr.parse().map_err(|e| {
87        io::Error::new(
88            io::ErrorKind::InvalidInput,
89            format!("bad address {addr:?}: {e}"),
90        )
91    })?;
92    let sock = TcpStream::connect_timeout(&socket_addr, CONNECT_TIMEOUT)?;
93    sock.set_read_timeout(Some(RW_TIMEOUT))?;
94    sock.set_write_timeout(Some(RW_TIMEOUT))?;
95    Ok(sock)
96}
97
98fn do_hello(sock: &mut TcpStream) -> io::Result<ServerInfo> {
99    let frame = do_call(sock, &["HELLO", "3"])?;
100    let map = match &frame {
101        Frame::Map(m) => m,
102        Frame::Error(e) => {
103            return Err(io::Error::other(format!("HELLO failed: {e}")));
104        }
105        _ => return Err(io::Error::other("HELLO returned a non-map frame")),
106    };
107    let mut version = String::new();
108    let mut mode = String::new();
109    for (k, v) in map {
110        let key = frame_as_string(k);
111        let val = frame_as_string(v);
112        match key.as_str() {
113            "version" => version = val,
114            "mode" => mode = val,
115            _ => {}
116        }
117    }
118    Ok(ServerInfo { version, mode })
119}
120
121/// Send one command (an array of bulk strings) and read one response
122/// frame. The server is expected to answer exactly one frame; extra
123/// bytes left in the socket are discarded by the next `do_call`.
124fn do_call(sock: &mut TcpStream, args: &[&str]) -> io::Result<Frame> {
125    let req = Frame::Array(
126        args.iter()
127            .map(|a| Frame::Bulk(a.as_bytes().to_vec().into()))
128            .collect(),
129    );
130    let mut buf = BytesMut::new();
131    encode_frame(&req, ProtoVersion::Resp3, &mut buf);
132    sock.write_all(&buf)?;
133    read_one_frame(sock)
134}
135
136fn read_one_frame(sock: &mut TcpStream) -> io::Result<Frame> {
137    let mut buf = BytesMut::with_capacity(READ_CAP);
138    let mut tmp = [0u8; 4096];
139    loop {
140        match parse_frame(&buf) {
141            Ok(Some((frame, _consumed))) => return Ok(frame),
142            Ok(None) => {
143                let n = sock.read(&mut tmp)?;
144                if n == 0 {
145                    return Err(io::Error::new(
146                        io::ErrorKind::UnexpectedEof,
147                        "server closed the connection mid-frame",
148                    ));
149                }
150                buf.extend_from_slice(&tmp[..n]);
151            }
152            Err(e) => return Err(io::Error::other(format!("RESP3 parse error: {e}"))),
153        }
154    }
155}
156
157/// Convert a frame into a string, mirroring how `redis-cli` prints
158/// scalars. Frames that are not naturally a string yield an empty
159/// string; the caller decides whether that is an error.
160fn frame_as_string(f: &Frame) -> String {
161    match f {
162        Frame::Simple(s) => s.clone(),
163        Frame::Bulk(b) => String::from_utf8_lossy(b).into_owned(),
164        Frame::Verbatim { data, .. } => String::from_utf8_lossy(data).into_owned(),
165        Frame::Integer(i) => i.to_string(),
166        Frame::Double(d) => d.to_string(),
167        Frame::Boolean(b) => b.to_string(),
168        _ => String::new(),
169    }
170}
171
172fn parse_aggregate(frame: &Frame) -> AggregateStats {
173    let s = frame_as_string(frame);
174    let mut out = AggregateStats::default();
175    apply_kv_pairs(&s, |key, value| match key {
176        "cache_bytes" => out.cache_bytes = value,
177        "evictions" => out.evictions = value,
178        "n_keys" => out.n_keys = value,
179        "budget" => out.budget = value,
180        _ => {}
181    });
182    out
183}
184
185/// Parse the multi-line `SKEG.SHARDS` output, one line per shard.
186fn parse_shards(frame: &Frame) -> Vec<ShardStats> {
187    let s = frame_as_string(frame);
188    let mut out = Vec::new();
189    for line in s.lines() {
190        if line.is_empty() {
191            continue;
192        }
193        let mut shard = ShardStats::default();
194        apply_kv_pairs(line, |key, value| match key {
195            "shard" => {
196                if let Ok(v) = u32::try_from(value) {
197                    shard.shard_id = v;
198                }
199            }
200            "cache_bytes" => shard.cache_bytes = value,
201            "evictions" => shard.evictions = value,
202            "n_keys" => shard.n_keys = value,
203            _ => {}
204        });
205        out.push(shard);
206    }
207    out
208}
209
210/// Parse `SKEG.VINDEX.LIST` output. The server returns one
211/// `name=<n> dim=<d> kind=<k> backend=<b> n_vectors=<v>` line per
212/// index; we keep only `(name, dim)` for the summary.
213fn parse_vindexes(frame: &Frame) -> Vec<(String, u32)> {
214    let s = frame_as_string(frame);
215    let mut out = Vec::new();
216    for line in s.lines() {
217        if line.is_empty() {
218            continue;
219        }
220        let mut name = String::new();
221        let mut dim = 0u32;
222        for tok in line.split_whitespace() {
223            let Some((k, v)) = tok.split_once('=') else {
224                continue;
225            };
226            match k {
227                "name" => name = v.to_string(),
228                "dim" => {
229                    if let Ok(d) = v.parse() {
230                        dim = d;
231                    }
232                }
233                _ => {}
234            }
235        }
236        if !name.is_empty() {
237            out.push((name, dim));
238        }
239    }
240    out
241}
242
243/// Drive `f(key, value)` for every `key=value` token in `s`. Tokens
244/// whose value is not a non-negative integer are skipped silently.
245fn apply_kv_pairs<F: FnMut(&str, u64)>(s: &str, mut f: F) {
246    for tok in s.split_whitespace() {
247        let Some((k, v)) = tok.split_once('=') else {
248            continue;
249        };
250        if let Ok(v) = v.parse::<u64>() {
251            f(k, v);
252        }
253    }
254}
255
256impl fmt::Display for ServerStats {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        writeln!(
259            f,
260            "server     version={} mode={}",
261            self.info.version, self.info.mode
262        )?;
263        writeln!(
264            f,
265            "aggregate  cache_bytes={} evictions={} n_keys={} budget={}",
266            self.aggregate.cache_bytes,
267            self.aggregate.evictions,
268            self.aggregate.n_keys,
269            self.aggregate.budget,
270        )?;
271        if !self.shards.is_empty() {
272            writeln!(f, "shards     {}", self.shards.len())?;
273            for s in &self.shards {
274                writeln!(
275                    f,
276                    "  shard-{}: cache_bytes={} evictions={} n_keys={}",
277                    s.shard_id, s.cache_bytes, s.evictions, s.n_keys
278                )?;
279            }
280        }
281        if self.vindexes.is_empty() {
282            writeln!(f, "vindexes   (none)")?;
283        } else {
284            writeln!(f, "vindexes   {}", self.vindexes.len())?;
285            for (name, dim) in &self.vindexes {
286                writeln!(f, "  {name} (dim={dim})")?;
287            }
288        }
289        Ok(())
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn aggregate_parses_known_keys_ignores_unknown() {
299        let f = Frame::Simple("cache_bytes=128 evictions=2 n_keys=4 budget=1024 future=99".into());
300        let agg = parse_aggregate(&f);
301        assert_eq!(agg.cache_bytes, 128);
302        assert_eq!(agg.evictions, 2);
303        assert_eq!(agg.n_keys, 4);
304        assert_eq!(agg.budget, 1024);
305    }
306
307    #[test]
308    fn shards_parses_one_line_per_shard() {
309        let f = Frame::Simple(
310            "shard=0 cache_bytes=10 evictions=0 n_keys=1\nshard=1 cache_bytes=20 evictions=0 n_keys=2".into(),
311        );
312        let v = parse_shards(&f);
313        assert_eq!(v.len(), 2);
314        assert_eq!(v[0].shard_id, 0);
315        assert_eq!(v[0].cache_bytes, 10);
316        assert_eq!(v[1].shard_id, 1);
317        assert_eq!(v[1].n_keys, 2);
318    }
319
320    #[test]
321    fn vindexes_parses_name_and_dim() {
322        let f = Frame::Simple(
323            "name=docs dim=4 kind=int8 backend=flat n_vectors=0\nname=other dim=128 kind=f32 backend=disk n_vectors=10".into(),
324        );
325        let v = parse_vindexes(&f);
326        assert_eq!(v, vec![("docs".into(), 4), ("other".into(), 128)]);
327    }
328
329    #[test]
330    fn vindexes_empty_on_no_output() {
331        let f = Frame::Simple(String::new());
332        assert!(parse_vindexes(&f).is_empty());
333    }
334}