1use std::fmt;
10use std::io::{self, Read, Write};
11use std::net::TcpStream;
12use std::time::Duration;
13
14use bytes::BytesMut;
15use skeg_resp3::{Frame, ProtoVersion, encode_frame, parse_frame};
16
17const READ_CAP: usize = 64 * 1024;
18const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
19const RW_TIMEOUT: Duration = Duration::from_secs(10);
20
21#[derive(Debug, Clone)]
23pub struct ServerInfo {
24 pub version: String,
25 pub mode: String,
26}
27
28#[derive(Debug, Clone, Default)]
34pub struct AggregateStats {
35 pub cache_bytes: u64,
36 pub evictions: u64,
37 pub n_keys: u64,
38 pub budget: u64,
39}
40
41#[derive(Debug, Clone, Default)]
45pub struct ShardStats {
46 pub shard_id: u32,
47 pub cache_bytes: u64,
48 pub evictions: u64,
49 pub n_keys: u64,
50}
51
52#[derive(Debug, Clone)]
54pub struct ServerStats {
55 pub info: ServerInfo,
56 pub aggregate: AggregateStats,
57 pub shards: Vec<ShardStats>,
58 pub vindexes: Vec<(String, u32)>,
61}
62
63pub fn fetch(addr: &str) -> io::Result<ServerStats> {
72 let mut sock = open(addr)?;
73 let info = do_hello(&mut sock)?;
74 let aggregate = parse_aggregate(&do_call(&mut sock, &["SKEG.STATS"])?);
75 let shards = parse_shards(&do_call(&mut sock, &["SKEG.SHARDS"])?);
76 let vindexes = parse_vindexes(&do_call(&mut sock, &["SKEG.VINDEX.LIST"])?);
77 Ok(ServerStats {
78 info,
79 aggregate,
80 shards,
81 vindexes,
82 })
83}
84
85fn open(addr: &str) -> io::Result<TcpStream> {
86 let socket_addr = addr.parse().map_err(|e| {
87 io::Error::new(
88 io::ErrorKind::InvalidInput,
89 format!("bad address {addr:?}: {e}"),
90 )
91 })?;
92 let sock = TcpStream::connect_timeout(&socket_addr, CONNECT_TIMEOUT)?;
93 sock.set_read_timeout(Some(RW_TIMEOUT))?;
94 sock.set_write_timeout(Some(RW_TIMEOUT))?;
95 Ok(sock)
96}
97
98fn do_hello(sock: &mut TcpStream) -> io::Result<ServerInfo> {
99 let frame = do_call(sock, &["HELLO", "3"])?;
100 let map = match &frame {
101 Frame::Map(m) => m,
102 Frame::Error(e) => {
103 return Err(io::Error::other(format!("HELLO failed: {e}")));
104 }
105 _ => return Err(io::Error::other("HELLO returned a non-map frame")),
106 };
107 let mut version = String::new();
108 let mut mode = String::new();
109 for (k, v) in map {
110 let key = frame_as_string(k);
111 let val = frame_as_string(v);
112 match key.as_str() {
113 "version" => version = val,
114 "mode" => mode = val,
115 _ => {}
116 }
117 }
118 Ok(ServerInfo { version, mode })
119}
120
121fn do_call(sock: &mut TcpStream, args: &[&str]) -> io::Result<Frame> {
125 let req = Frame::Array(
126 args.iter()
127 .map(|a| Frame::Bulk(a.as_bytes().to_vec().into()))
128 .collect(),
129 );
130 let mut buf = BytesMut::new();
131 encode_frame(&req, ProtoVersion::Resp3, &mut buf);
132 sock.write_all(&buf)?;
133 read_one_frame(sock)
134}
135
136fn read_one_frame(sock: &mut TcpStream) -> io::Result<Frame> {
137 let mut buf = BytesMut::with_capacity(READ_CAP);
138 let mut tmp = [0u8; 4096];
139 loop {
140 match parse_frame(&buf) {
141 Ok(Some((frame, _consumed))) => return Ok(frame),
142 Ok(None) => {
143 let n = sock.read(&mut tmp)?;
144 if n == 0 {
145 return Err(io::Error::new(
146 io::ErrorKind::UnexpectedEof,
147 "server closed the connection mid-frame",
148 ));
149 }
150 buf.extend_from_slice(&tmp[..n]);
151 }
152 Err(e) => return Err(io::Error::other(format!("RESP3 parse error: {e}"))),
153 }
154 }
155}
156
157fn frame_as_string(f: &Frame) -> String {
161 match f {
162 Frame::Simple(s) => s.clone(),
163 Frame::Bulk(b) => String::from_utf8_lossy(b).into_owned(),
164 Frame::Verbatim { data, .. } => String::from_utf8_lossy(data).into_owned(),
165 Frame::Integer(i) => i.to_string(),
166 Frame::Double(d) => d.to_string(),
167 Frame::Boolean(b) => b.to_string(),
168 _ => String::new(),
169 }
170}
171
172fn parse_aggregate(frame: &Frame) -> AggregateStats {
173 let s = frame_as_string(frame);
174 let mut out = AggregateStats::default();
175 apply_kv_pairs(&s, |key, value| match key {
176 "cache_bytes" => out.cache_bytes = value,
177 "evictions" => out.evictions = value,
178 "n_keys" => out.n_keys = value,
179 "budget" => out.budget = value,
180 _ => {}
181 });
182 out
183}
184
185fn parse_shards(frame: &Frame) -> Vec<ShardStats> {
187 let s = frame_as_string(frame);
188 let mut out = Vec::new();
189 for line in s.lines() {
190 if line.is_empty() {
191 continue;
192 }
193 let mut shard = ShardStats::default();
194 apply_kv_pairs(line, |key, value| match key {
195 "shard" => {
196 if let Ok(v) = u32::try_from(value) {
197 shard.shard_id = v;
198 }
199 }
200 "cache_bytes" => shard.cache_bytes = value,
201 "evictions" => shard.evictions = value,
202 "n_keys" => shard.n_keys = value,
203 _ => {}
204 });
205 out.push(shard);
206 }
207 out
208}
209
210fn parse_vindexes(frame: &Frame) -> Vec<(String, u32)> {
214 let s = frame_as_string(frame);
215 let mut out = Vec::new();
216 for line in s.lines() {
217 if line.is_empty() {
218 continue;
219 }
220 let mut name = String::new();
221 let mut dim = 0u32;
222 for tok in line.split_whitespace() {
223 let Some((k, v)) = tok.split_once('=') else {
224 continue;
225 };
226 match k {
227 "name" => name = v.to_string(),
228 "dim" => {
229 if let Ok(d) = v.parse() {
230 dim = d;
231 }
232 }
233 _ => {}
234 }
235 }
236 if !name.is_empty() {
237 out.push((name, dim));
238 }
239 }
240 out
241}
242
243fn apply_kv_pairs<F: FnMut(&str, u64)>(s: &str, mut f: F) {
246 for tok in s.split_whitespace() {
247 let Some((k, v)) = tok.split_once('=') else {
248 continue;
249 };
250 if let Ok(v) = v.parse::<u64>() {
251 f(k, v);
252 }
253 }
254}
255
256impl fmt::Display for ServerStats {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 writeln!(
259 f,
260 "server version={} mode={}",
261 self.info.version, self.info.mode
262 )?;
263 writeln!(
264 f,
265 "aggregate cache_bytes={} evictions={} n_keys={} budget={}",
266 self.aggregate.cache_bytes,
267 self.aggregate.evictions,
268 self.aggregate.n_keys,
269 self.aggregate.budget,
270 )?;
271 if !self.shards.is_empty() {
272 writeln!(f, "shards {}", self.shards.len())?;
273 for s in &self.shards {
274 writeln!(
275 f,
276 " shard-{}: cache_bytes={} evictions={} n_keys={}",
277 s.shard_id, s.cache_bytes, s.evictions, s.n_keys
278 )?;
279 }
280 }
281 if self.vindexes.is_empty() {
282 writeln!(f, "vindexes (none)")?;
283 } else {
284 writeln!(f, "vindexes {}", self.vindexes.len())?;
285 for (name, dim) in &self.vindexes {
286 writeln!(f, " {name} (dim={dim})")?;
287 }
288 }
289 Ok(())
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn aggregate_parses_known_keys_ignores_unknown() {
299 let f = Frame::Simple("cache_bytes=128 evictions=2 n_keys=4 budget=1024 future=99".into());
300 let agg = parse_aggregate(&f);
301 assert_eq!(agg.cache_bytes, 128);
302 assert_eq!(agg.evictions, 2);
303 assert_eq!(agg.n_keys, 4);
304 assert_eq!(agg.budget, 1024);
305 }
306
307 #[test]
308 fn shards_parses_one_line_per_shard() {
309 let f = Frame::Simple(
310 "shard=0 cache_bytes=10 evictions=0 n_keys=1\nshard=1 cache_bytes=20 evictions=0 n_keys=2".into(),
311 );
312 let v = parse_shards(&f);
313 assert_eq!(v.len(), 2);
314 assert_eq!(v[0].shard_id, 0);
315 assert_eq!(v[0].cache_bytes, 10);
316 assert_eq!(v[1].shard_id, 1);
317 assert_eq!(v[1].n_keys, 2);
318 }
319
320 #[test]
321 fn vindexes_parses_name_and_dim() {
322 let f = Frame::Simple(
323 "name=docs dim=4 kind=int8 backend=flat n_vectors=0\nname=other dim=128 kind=f32 backend=disk n_vectors=10".into(),
324 );
325 let v = parse_vindexes(&f);
326 assert_eq!(v, vec![("docs".into(), 4), ("other".into(), 128)]);
327 }
328
329 #[test]
330 fn vindexes_empty_on_no_output() {
331 let f = Frame::Simple(String::new());
332 assert!(parse_vindexes(&f).is_empty());
333 }
334}