Skip to main content

ember_protocol/command/
parse.rs

1//! Command parsing from RESP3 frames.
2//!
3//! Contains `Command::from_frame()` and all individual command parsers.
4//! Helper functions for frame extraction and numeric parsing live here too.
5
6use bytes::Bytes;
7
8use crate::error::ProtocolError;
9use crate::types::Frame;
10
11use super::{Command, ScoreBound, SetExpire, ZAddFlags};
12
13/// Maximum number of dimensions in a vector. 65,536 is generous for any
14/// real-world embedding model (OpenAI: 1536, Cohere: 4096) while preventing
15/// memory abuse from absurdly large vectors.
16const MAX_VECTOR_DIMS: usize = 65_536;
17
18/// Maximum value for HNSW connectivity (M) and expansion parameters.
19/// Values above 1024 give no practical benefit and waste memory.
20const MAX_HNSW_PARAM: u64 = 1024;
21
22/// Maximum number of results for VSIM. 10,000 is generous for any practical
23/// similarity search while preventing OOM from unbounded result allocation.
24const MAX_VSIM_COUNT: u64 = 10_000;
25
26/// Maximum search beam width for VSIM. Same cap as MAX_HNSW_PARAM —
27/// larger values cause worst-case O(n) graph traversal with no accuracy gain.
28const MAX_VSIM_EF: u64 = MAX_HNSW_PARAM;
29
30/// Maximum number of vectors in a single VADD_BATCH command. 10,000 keeps
31/// per-command latency bounded while still being large enough to amortize
32/// round-trip overhead for bulk inserts.
33const MAX_VADD_BATCH_SIZE: usize = 10_000;
34
35/// Maximum value for SCAN COUNT. Prevents clients from requesting a scan
36/// hint so large it causes pre-allocation issues.
37const MAX_SCAN_COUNT: u64 = 10_000_000;
38
39impl Command {
40    /// Parses a [`Frame`] into a [`Command`].
41    ///
42    /// Expects an array frame where the first element is the command name
43    /// (as a bulk or simple string) and the rest are arguments.
44    pub fn from_frame(frame: Frame) -> Result<Command, ProtocolError> {
45        let frames = match frame {
46            Frame::Array(frames) => frames,
47            _ => {
48                return Err(ProtocolError::InvalidCommandFrame(
49                    "expected array frame".into(),
50                ));
51            }
52        };
53
54        if frames.is_empty() {
55            return Err(ProtocolError::InvalidCommandFrame(
56                "empty command array".into(),
57            ));
58        }
59
60        // command names are short ASCII keywords — uppercase on the stack to
61        // avoid two heap allocations (extract_string + to_ascii_uppercase).
62        let name_bytes = extract_raw_bytes(&frames[0])?;
63        let mut upper = [0u8; MAX_KEYWORD_LEN];
64        let len = name_bytes.len();
65        if len > MAX_KEYWORD_LEN {
66            let name = extract_string(&frames[0])?;
67            return Ok(Command::Unknown(name));
68        }
69        upper[..len].copy_from_slice(name_bytes);
70        upper[..len].make_ascii_uppercase();
71        let name_upper = std::str::from_utf8(&upper[..len]).map_err(|_| {
72            ProtocolError::InvalidCommandFrame("command name is not valid utf-8".into())
73        })?;
74
75        match name_upper {
76            "PING" => parse_ping(&frames[1..]),
77            "ECHO" => parse_echo(&frames[1..]),
78            "GET" => parse_get(&frames[1..]),
79            "SET" => parse_set(&frames[1..]),
80            "INCR" => parse_incr(&frames[1..]),
81            "DECR" => parse_decr(&frames[1..]),
82            "INCRBY" => parse_incrby(&frames[1..]),
83            "DECRBY" => parse_decrby(&frames[1..]),
84            "INCRBYFLOAT" => parse_incrbyfloat(&frames[1..]),
85            "APPEND" => parse_append(&frames[1..]),
86            "STRLEN" => parse_strlen(&frames[1..]),
87            "SETNX" => parse_setnx(&frames[1..]),
88            "SETEX" => parse_setex(&frames[1..]),
89            "PSETEX" => parse_psetex(&frames[1..]),
90            "GETRANGE" | "SUBSTR" => parse_getrange(&frames[1..]),
91            "SETRANGE" => parse_setrange(&frames[1..]),
92            "KEYS" => parse_keys(&frames[1..]),
93            "RENAME" => parse_rename(&frames[1..]),
94            "DEL" => parse_del(&frames[1..]),
95            "UNLINK" => parse_unlink(&frames[1..]),
96            "EXISTS" => parse_exists(&frames[1..]),
97            "MGET" => parse_mget(&frames[1..]),
98            "MSET" => parse_mset(&frames[1..]),
99            "EXPIRE" => parse_expire(&frames[1..]),
100            "TTL" => parse_ttl(&frames[1..]),
101            "PERSIST" => parse_persist(&frames[1..]),
102            "PTTL" => parse_pttl(&frames[1..]),
103            "PEXPIRE" => parse_pexpire(&frames[1..]),
104            "DBSIZE" => parse_dbsize(&frames[1..]),
105            "INFO" => parse_info(&frames[1..]),
106            "BGSAVE" => parse_bgsave(&frames[1..]),
107            "BGREWRITEAOF" => parse_bgrewriteaof(&frames[1..]),
108            "FLUSHDB" => parse_flushdb(&frames[1..]),
109            "SCAN" => parse_scan(&frames[1..]),
110            "SSCAN" => parse_key_scan(&frames[1..], "SSCAN"),
111            "HSCAN" => parse_key_scan(&frames[1..], "HSCAN"),
112            "ZSCAN" => parse_key_scan(&frames[1..], "ZSCAN"),
113            "LPUSH" => parse_lpush(&frames[1..]),
114            "RPUSH" => parse_rpush(&frames[1..]),
115            "LPOP" => parse_lpop(&frames[1..]),
116            "RPOP" => parse_rpop(&frames[1..]),
117            "LRANGE" => parse_lrange(&frames[1..]),
118            "LLEN" => parse_llen(&frames[1..]),
119            "BLPOP" => parse_blpop(&frames[1..]),
120            "BRPOP" => parse_brpop(&frames[1..]),
121            "LINDEX" => parse_lindex(&frames[1..]),
122            "LSET" => parse_lset(&frames[1..]),
123            "LTRIM" => parse_ltrim(&frames[1..]),
124            "LINSERT" => parse_linsert(&frames[1..]),
125            "LREM" => parse_lrem(&frames[1..]),
126            "LPOS" => parse_lpos(&frames[1..]),
127            "TYPE" => parse_type(&frames[1..]),
128            "ZADD" => parse_zadd(&frames[1..]),
129            "ZREM" => parse_zrem(&frames[1..]),
130            "ZSCORE" => parse_zscore(&frames[1..]),
131            "ZRANK" => parse_zrank(&frames[1..]),
132            "ZREVRANK" => parse_zrevrank(&frames[1..]),
133            "ZCARD" => parse_zcard(&frames[1..]),
134            "ZRANGE" => parse_zrange(&frames[1..]),
135            "ZREVRANGE" => parse_zrevrange(&frames[1..]),
136            "ZCOUNT" => parse_zcount(&frames[1..]),
137            "ZINCRBY" => parse_zincrby(&frames[1..]),
138            "ZRANGEBYSCORE" => parse_zrangebyscore(&frames[1..]),
139            "ZREVRANGEBYSCORE" => parse_zrevrangebyscore(&frames[1..]),
140            "ZPOPMIN" => {
141                let (key, count) = parse_zpop_args(&frames[1..], "ZPOPMIN")?;
142                Ok(Command::ZPopMin { key, count })
143            }
144            "ZPOPMAX" => {
145                let (key, count) = parse_zpop_args(&frames[1..], "ZPOPMAX")?;
146                Ok(Command::ZPopMax { key, count })
147            }
148            "HSET" => parse_hset(&frames[1..]),
149            "HGET" => parse_hget(&frames[1..]),
150            "HGETALL" => parse_hgetall(&frames[1..]),
151            "HDEL" => parse_hdel(&frames[1..]),
152            "HEXISTS" => parse_hexists(&frames[1..]),
153            "HLEN" => parse_hlen(&frames[1..]),
154            "HINCRBY" => parse_hincrby(&frames[1..]),
155            "HKEYS" => parse_hkeys(&frames[1..]),
156            "HVALS" => parse_hvals(&frames[1..]),
157            "HMGET" => parse_hmget(&frames[1..]),
158            "SADD" => parse_sadd(&frames[1..]),
159            "SREM" => parse_srem(&frames[1..]),
160            "SMEMBERS" => parse_smembers(&frames[1..]),
161            "SISMEMBER" => parse_sismember(&frames[1..]),
162            "SCARD" => parse_scard(&frames[1..]),
163            "SUNION" => parse_multi_key_set("SUNION", &frames[1..]),
164            "SINTER" => parse_multi_key_set("SINTER", &frames[1..]),
165            "SDIFF" => parse_multi_key_set("SDIFF", &frames[1..]),
166            "SUNIONSTORE" => parse_store_set("SUNIONSTORE", &frames[1..]),
167            "SINTERSTORE" => parse_store_set("SINTERSTORE", &frames[1..]),
168            "SDIFFSTORE" => parse_store_set("SDIFFSTORE", &frames[1..]),
169            "SRANDMEMBER" => parse_srandmember(&frames[1..]),
170            "SPOP" => parse_spop(&frames[1..]),
171            "SMISMEMBER" => parse_smismember(&frames[1..]),
172            "CLUSTER" => parse_cluster(&frames[1..]),
173            "ASKING" => parse_asking(&frames[1..]),
174            "MIGRATE" => parse_migrate(&frames[1..]),
175            "RESTORE" => parse_restore(&frames[1..]),
176            "CONFIG" => parse_config(&frames[1..]),
177            "MULTI" => parse_no_args("MULTI", &frames[1..], Command::Multi),
178            "EXEC" => parse_no_args("EXEC", &frames[1..], Command::Exec),
179            "DISCARD" => parse_no_args("DISCARD", &frames[1..], Command::Discard),
180            "WATCH" => parse_watch(&frames[1..]),
181            "UNWATCH" => parse_no_args("UNWATCH", &frames[1..], Command::Unwatch),
182            "SLOWLOG" => parse_slowlog(&frames[1..]),
183            "SUBSCRIBE" => parse_subscribe(&frames[1..]),
184            "UNSUBSCRIBE" => parse_unsubscribe(&frames[1..]),
185            "PSUBSCRIBE" => parse_psubscribe(&frames[1..]),
186            "PUNSUBSCRIBE" => parse_punsubscribe(&frames[1..]),
187            "PUBLISH" => parse_publish(&frames[1..]),
188            "PUBSUB" => parse_pubsub(&frames[1..]),
189            "VADD" => parse_vadd(&frames[1..]),
190            "VADD_BATCH" => parse_vadd_batch(&frames[1..]),
191            "VSIM" => parse_vsim(&frames[1..]),
192            "VREM" => parse_vrem(&frames[1..]),
193            "VGET" => parse_vget(&frames[1..]),
194            "VCARD" => parse_vcard(&frames[1..]),
195            "VDIM" => parse_vdim(&frames[1..]),
196            "VINFO" => parse_vinfo(&frames[1..]),
197            "PROTO.REGISTER" => parse_proto_register(&frames[1..]),
198            "PROTO.SET" => parse_proto_set(&frames[1..]),
199            "PROTO.GET" => parse_proto_get(&frames[1..]),
200            "PROTO.TYPE" => parse_proto_type(&frames[1..]),
201            "PROTO.SCHEMAS" => parse_proto_schemas(&frames[1..]),
202            "PROTO.DESCRIBE" => parse_proto_describe(&frames[1..]),
203            "PROTO.GETFIELD" => parse_proto_getfield(&frames[1..]),
204            "PROTO.SETFIELD" => parse_proto_setfield(&frames[1..]),
205            "PROTO.DELFIELD" => parse_proto_delfield(&frames[1..]),
206            "TIME" => parse_no_args("TIME", &frames[1..], Command::Time),
207            "LASTSAVE" => parse_no_args("LASTSAVE", &frames[1..], Command::LastSave),
208            "ROLE" => parse_no_args("ROLE", &frames[1..], Command::Role),
209            "OBJECT" => parse_object(&frames[1..]),
210            "COPY" => parse_copy(&frames[1..]),
211            "CLIENT" => parse_client(&frames[1..]),
212            "ACL" => parse_acl(&frames[1..]),
213            "AUTH" => parse_auth(&frames[1..]),
214            "QUIT" => parse_quit(&frames[1..]),
215            "MONITOR" => parse_monitor(&frames[1..]),
216            "RANDOMKEY" => parse_no_args("RANDOMKEY", &frames[1..], Command::RandomKey),
217            "TOUCH" => parse_touch(&frames[1..]),
218            "SORT" => parse_sort(&frames[1..]),
219            _ => {
220                // only allocate for truly unknown commands
221                let name = extract_string(&frames[0])?;
222                Ok(Command::Unknown(name))
223            }
224        }
225    }
226}
227
228/// Extracts a UTF-8 string from a Bulk or Simple frame.
229///
230/// Validates UTF-8 in-place on the Bytes buffer to avoid an
231/// intermediate Vec<u8> allocation from `to_vec()`.
232fn extract_string(frame: &Frame) -> Result<String, ProtocolError> {
233    match frame {
234        Frame::Bulk(data) => {
235            let s = std::str::from_utf8(data).map_err(|_| {
236                ProtocolError::InvalidCommandFrame("command name is not valid utf-8".into())
237            })?;
238            Ok(s.to_owned())
239        }
240        Frame::Simple(s) => Ok(s.clone()),
241        _ => Err(ProtocolError::InvalidCommandFrame(
242            "expected bulk or simple string for command name".into(),
243        )),
244    }
245}
246
247/// Extracts raw bytes from a Bulk or Simple frame.
248fn extract_bytes(frame: &Frame) -> Result<Bytes, ProtocolError> {
249    match frame {
250        Frame::Bulk(data) => Ok(data.clone()),
251        Frame::Simple(s) => Ok(Bytes::copy_from_slice(s.as_bytes())),
252        _ => Err(ProtocolError::InvalidCommandFrame(
253            "expected bulk or simple string argument".into(),
254        )),
255    }
256}
257
258/// Extracts all frames in a slice as UTF-8 strings.
259fn extract_strings(frames: &[Frame]) -> Result<Vec<String>, ProtocolError> {
260    frames.iter().map(extract_string).collect()
261}
262
263/// Extracts all frames in a slice as raw byte buffers.
264fn extract_bytes_vec(frames: &[Frame]) -> Result<Vec<Bytes>, ProtocolError> {
265    frames.iter().map(extract_bytes).collect()
266}
267
268/// Maximum length for a command name or keyword uppercased on the stack.
269/// All Redis/Ember commands and subcommands are well under this limit.
270const MAX_KEYWORD_LEN: usize = 32;
271
272/// Returns the raw bytes of a Bulk or Simple frame without allocating.
273fn extract_raw_bytes(frame: &Frame) -> Result<&[u8], ProtocolError> {
274    match frame {
275        Frame::Bulk(data) => Ok(data.as_ref()),
276        Frame::Simple(s) => Ok(s.as_bytes()),
277        _ => Err(ProtocolError::InvalidCommandFrame(
278            "expected bulk or simple string".into(),
279        )),
280    }
281}
282
283/// Uppercases a frame's bytes into a stack buffer and returns the result as `&str`.
284///
285/// Used for command names, subcommands, and option flags where the value is always
286/// a short ASCII keyword. Avoids the two heap allocations that
287/// `extract_string()?.to_ascii_uppercase()` would require.
288fn uppercase_arg<'b>(
289    frame: &Frame,
290    buf: &'b mut [u8; MAX_KEYWORD_LEN],
291) -> Result<&'b str, ProtocolError> {
292    let bytes = extract_raw_bytes(frame)?;
293    let len = bytes.len();
294    if len > MAX_KEYWORD_LEN {
295        return Err(ProtocolError::InvalidCommandFrame(
296            "keyword too long".into(),
297        ));
298    }
299    buf[..len].copy_from_slice(bytes);
300    buf[..len].make_ascii_uppercase();
301    std::str::from_utf8(&buf[..len])
302        .map_err(|_| ProtocolError::InvalidCommandFrame("keyword is not valid utf-8".into()))
303}
304
305/// Parses a frame's bytes directly as a positive u64 without allocating a String.
306fn parse_u64(frame: &Frame, cmd: &str) -> Result<u64, ProtocolError> {
307    let bytes = extract_raw_bytes(frame)?;
308    parse_u64_bytes(bytes).ok_or_else(|| {
309        ProtocolError::InvalidCommandFrame(format!("value is not a valid integer for '{cmd}'"))
310    })
311}
312
313/// Parses an unsigned integer directly from a byte slice.
314fn parse_u64_bytes(buf: &[u8]) -> Option<u64> {
315    if buf.is_empty() {
316        return None;
317    }
318    let mut n: u64 = 0;
319    for &b in buf {
320        if !b.is_ascii_digit() {
321            return None;
322        }
323        n = n.checked_mul(10)?.checked_add((b - b'0') as u64)?;
324    }
325    Some(n)
326}
327
328/// Shorthand for the wrong-arity error returned by every parser.
329fn wrong_arity(cmd: &'static str) -> ProtocolError {
330    ProtocolError::WrongArity(cmd.into())
331}
332
333fn parse_ping(args: &[Frame]) -> Result<Command, ProtocolError> {
334    match args.len() {
335        0 => Ok(Command::Ping(None)),
336        1 => {
337            let msg = extract_bytes(&args[0])?;
338            Ok(Command::Ping(Some(msg)))
339        }
340        _ => Err(wrong_arity("PING")),
341    }
342}
343
344fn parse_echo(args: &[Frame]) -> Result<Command, ProtocolError> {
345    if args.len() != 1 {
346        return Err(wrong_arity("ECHO"));
347    }
348    let msg = extract_bytes(&args[0])?;
349    Ok(Command::Echo(msg))
350}
351
352fn parse_get(args: &[Frame]) -> Result<Command, ProtocolError> {
353    if args.len() != 1 {
354        return Err(wrong_arity("GET"));
355    }
356    let key = extract_string(&args[0])?;
357    Ok(Command::Get { key })
358}
359
360/// Parses NX / XX / EX / PX options from a slice of command arguments.
361///
362/// Returns `(expire, nx, xx)`. `cmd` is used in error messages.
363fn parse_set_options(
364    args: &[Frame],
365    cmd: &'static str,
366) -> Result<(Option<SetExpire>, bool, bool), ProtocolError> {
367    let mut expire = None;
368    let mut nx = false;
369    let mut xx = false;
370    let mut idx = 0;
371
372    while idx < args.len() {
373        let mut kw = [0u8; MAX_KEYWORD_LEN];
374        let flag = uppercase_arg(&args[idx], &mut kw)?;
375        match flag {
376            "NX" => {
377                nx = true;
378                idx += 1;
379            }
380            "XX" => {
381                xx = true;
382                idx += 1;
383            }
384            "EX" => {
385                idx += 1;
386                if idx >= args.len() {
387                    return Err(wrong_arity(cmd));
388                }
389                let amount = parse_u64(&args[idx], cmd)?;
390                if amount == 0 {
391                    return Err(ProtocolError::InvalidCommandFrame(format!(
392                        "invalid expire time in '{cmd}' command"
393                    )));
394                }
395                expire = Some(SetExpire::Ex(amount));
396                idx += 1;
397            }
398            "PX" => {
399                idx += 1;
400                if idx >= args.len() {
401                    return Err(wrong_arity(cmd));
402                }
403                let amount = parse_u64(&args[idx], cmd)?;
404                if amount == 0 {
405                    return Err(ProtocolError::InvalidCommandFrame(format!(
406                        "invalid expire time in '{cmd}' command"
407                    )));
408                }
409                expire = Some(SetExpire::Px(amount));
410                idx += 1;
411            }
412            _ => {
413                return Err(ProtocolError::InvalidCommandFrame(format!(
414                    "unsupported {cmd} option '{flag}'"
415                )));
416            }
417        }
418    }
419
420    if nx && xx {
421        return Err(ProtocolError::InvalidCommandFrame(
422            "XX and NX options at the same time are not compatible".into(),
423        ));
424    }
425
426    Ok((expire, nx, xx))
427}
428
429fn parse_set(args: &[Frame]) -> Result<Command, ProtocolError> {
430    if args.len() < 2 {
431        return Err(wrong_arity("SET"));
432    }
433
434    let key = extract_string(&args[0])?;
435    let value = extract_bytes(&args[1])?;
436    let (expire, nx, xx) = parse_set_options(&args[2..], "SET")?;
437
438    Ok(Command::Set {
439        key,
440        value,
441        expire,
442        nx,
443        xx,
444    })
445}
446
447fn parse_incr(args: &[Frame]) -> Result<Command, ProtocolError> {
448    if args.len() != 1 {
449        return Err(wrong_arity("INCR"));
450    }
451    let key = extract_string(&args[0])?;
452    Ok(Command::Incr { key })
453}
454
455fn parse_decr(args: &[Frame]) -> Result<Command, ProtocolError> {
456    if args.len() != 1 {
457        return Err(wrong_arity("DECR"));
458    }
459    let key = extract_string(&args[0])?;
460    Ok(Command::Decr { key })
461}
462
463fn parse_incrby(args: &[Frame]) -> Result<Command, ProtocolError> {
464    if args.len() != 2 {
465        return Err(wrong_arity("INCRBY"));
466    }
467    let key = extract_string(&args[0])?;
468    let delta = parse_i64(&args[1], "INCRBY")?;
469    Ok(Command::IncrBy { key, delta })
470}
471
472fn parse_decrby(args: &[Frame]) -> Result<Command, ProtocolError> {
473    if args.len() != 2 {
474        return Err(wrong_arity("DECRBY"));
475    }
476    let key = extract_string(&args[0])?;
477    let delta = parse_i64(&args[1], "DECRBY")?;
478    Ok(Command::DecrBy { key, delta })
479}
480
481fn parse_incrbyfloat(args: &[Frame]) -> Result<Command, ProtocolError> {
482    if args.len() != 2 {
483        return Err(wrong_arity("INCRBYFLOAT"));
484    }
485    let key = extract_string(&args[0])?;
486    let s = extract_string(&args[1])?;
487    let delta: f64 = s.parse().map_err(|_| {
488        ProtocolError::InvalidCommandFrame("value is not a valid float for 'INCRBYFLOAT'".into())
489    })?;
490    if delta.is_nan() || delta.is_infinite() {
491        return Err(ProtocolError::InvalidCommandFrame(
492            "increment would produce NaN or Infinity".into(),
493        ));
494    }
495    Ok(Command::IncrByFloat { key, delta })
496}
497
498fn parse_append(args: &[Frame]) -> Result<Command, ProtocolError> {
499    if args.len() != 2 {
500        return Err(wrong_arity("APPEND"));
501    }
502    let key = extract_string(&args[0])?;
503    let value = extract_bytes(&args[1])?;
504    Ok(Command::Append { key, value })
505}
506
507fn parse_strlen(args: &[Frame]) -> Result<Command, ProtocolError> {
508    if args.len() != 1 {
509        return Err(wrong_arity("STRLEN"));
510    }
511    let key = extract_string(&args[0])?;
512    Ok(Command::Strlen { key })
513}
514
515/// SETNX key value — set key only if it does not exist.
516/// Equivalent to `SET key value NX`.
517fn parse_setnx(args: &[Frame]) -> Result<Command, ProtocolError> {
518    if args.len() != 2 {
519        return Err(wrong_arity("SETNX"));
520    }
521    let key = extract_string(&args[0])?;
522    let value = extract_bytes(&args[1])?;
523    Ok(Command::Set {
524        key,
525        value,
526        expire: None,
527        nx: true,
528        xx: false,
529    })
530}
531
532/// SETEX key seconds value — set key with an expiration in seconds.
533/// Equivalent to `SET key value EX seconds`.
534fn parse_setex(args: &[Frame]) -> Result<Command, ProtocolError> {
535    if args.len() != 3 {
536        return Err(wrong_arity("SETEX"));
537    }
538    let key = extract_string(&args[0])?;
539    let seconds = parse_u64(&args[1], "SETEX")?;
540    if seconds == 0 {
541        return Err(ProtocolError::InvalidCommandFrame(
542            "invalid expire time in 'SETEX' command".into(),
543        ));
544    }
545    let value = extract_bytes(&args[2])?;
546    Ok(Command::Set {
547        key,
548        value,
549        expire: Some(SetExpire::Ex(seconds)),
550        nx: false,
551        xx: false,
552    })
553}
554
555/// PSETEX key milliseconds value — set key with an expiration in milliseconds.
556/// Equivalent to `SET key value PX milliseconds`.
557fn parse_psetex(args: &[Frame]) -> Result<Command, ProtocolError> {
558    if args.len() != 3 {
559        return Err(wrong_arity("PSETEX"));
560    }
561    let key = extract_string(&args[0])?;
562    let ms = parse_u64(&args[1], "PSETEX")?;
563    if ms == 0 {
564        return Err(ProtocolError::InvalidCommandFrame(
565            "invalid expire time in 'PSETEX' command".into(),
566        ));
567    }
568    let value = extract_bytes(&args[2])?;
569    Ok(Command::Set {
570        key,
571        value,
572        expire: Some(SetExpire::Px(ms)),
573        nx: false,
574        xx: false,
575    })
576}
577
578/// GETRANGE key start end (also aliases SUBSTR).
579fn parse_getrange(args: &[Frame]) -> Result<Command, ProtocolError> {
580    if args.len() != 3 {
581        return Err(wrong_arity("GETRANGE"));
582    }
583    let key = extract_string(&args[0])?;
584    let start = parse_i64(&args[1], "GETRANGE")?;
585    let end = parse_i64(&args[2], "GETRANGE")?;
586    Ok(Command::GetRange { key, start, end })
587}
588
589/// SETRANGE key offset value.
590fn parse_setrange(args: &[Frame]) -> Result<Command, ProtocolError> {
591    if args.len() != 3 {
592        return Err(wrong_arity("SETRANGE"));
593    }
594    let key = extract_string(&args[0])?;
595    let offset = parse_u64(&args[1], "SETRANGE")? as usize;
596    let value = extract_bytes(&args[2])?;
597    Ok(Command::SetRange { key, offset, value })
598}
599
600fn parse_keys(args: &[Frame]) -> Result<Command, ProtocolError> {
601    if args.len() != 1 {
602        return Err(wrong_arity("KEYS"));
603    }
604    let pattern = extract_string(&args[0])?;
605    Ok(Command::Keys { pattern })
606}
607
608fn parse_rename(args: &[Frame]) -> Result<Command, ProtocolError> {
609    if args.len() != 2 {
610        return Err(wrong_arity("RENAME"));
611    }
612    let key = extract_string(&args[0])?;
613    let newkey = extract_string(&args[1])?;
614    Ok(Command::Rename { key, newkey })
615}
616
617fn parse_del(args: &[Frame]) -> Result<Command, ProtocolError> {
618    if args.is_empty() {
619        return Err(wrong_arity("DEL"));
620    }
621    let keys = extract_strings(args)?;
622    Ok(Command::Del { keys })
623}
624
625fn parse_exists(args: &[Frame]) -> Result<Command, ProtocolError> {
626    if args.is_empty() {
627        return Err(wrong_arity("EXISTS"));
628    }
629    let keys = extract_strings(args)?;
630    Ok(Command::Exists { keys })
631}
632
633fn parse_mget(args: &[Frame]) -> Result<Command, ProtocolError> {
634    if args.is_empty() {
635        return Err(wrong_arity("MGET"));
636    }
637    let keys = extract_strings(args)?;
638    Ok(Command::MGet { keys })
639}
640
641fn parse_mset(args: &[Frame]) -> Result<Command, ProtocolError> {
642    if args.is_empty() || !args.len().is_multiple_of(2) {
643        return Err(wrong_arity("MSET"));
644    }
645    let mut pairs = Vec::with_capacity(args.len() / 2);
646    for chunk in args.chunks(2) {
647        let key = extract_string(&chunk[0])?;
648        let value = extract_bytes(&chunk[1])?;
649        pairs.push((key, value));
650    }
651    Ok(Command::MSet { pairs })
652}
653
654fn parse_expire(args: &[Frame]) -> Result<Command, ProtocolError> {
655    if args.len() != 2 {
656        return Err(wrong_arity("EXPIRE"));
657    }
658    let key = extract_string(&args[0])?;
659    let seconds = parse_u64(&args[1], "EXPIRE")?;
660
661    if seconds == 0 {
662        return Err(ProtocolError::InvalidCommandFrame(
663            "invalid expire time in 'EXPIRE' command".into(),
664        ));
665    }
666
667    Ok(Command::Expire { key, seconds })
668}
669
670fn parse_ttl(args: &[Frame]) -> Result<Command, ProtocolError> {
671    if args.len() != 1 {
672        return Err(wrong_arity("TTL"));
673    }
674    let key = extract_string(&args[0])?;
675    Ok(Command::Ttl { key })
676}
677
678fn parse_persist(args: &[Frame]) -> Result<Command, ProtocolError> {
679    if args.len() != 1 {
680        return Err(wrong_arity("PERSIST"));
681    }
682    let key = extract_string(&args[0])?;
683    Ok(Command::Persist { key })
684}
685
686fn parse_pttl(args: &[Frame]) -> Result<Command, ProtocolError> {
687    if args.len() != 1 {
688        return Err(wrong_arity("PTTL"));
689    }
690    let key = extract_string(&args[0])?;
691    Ok(Command::Pttl { key })
692}
693
694fn parse_pexpire(args: &[Frame]) -> Result<Command, ProtocolError> {
695    if args.len() != 2 {
696        return Err(wrong_arity("PEXPIRE"));
697    }
698    let key = extract_string(&args[0])?;
699    let milliseconds = parse_u64(&args[1], "PEXPIRE")?;
700
701    if milliseconds == 0 {
702        return Err(ProtocolError::InvalidCommandFrame(
703            "invalid expire time in 'PEXPIRE' command".into(),
704        ));
705    }
706
707    Ok(Command::Pexpire { key, milliseconds })
708}
709
710fn parse_dbsize(args: &[Frame]) -> Result<Command, ProtocolError> {
711    if !args.is_empty() {
712        return Err(wrong_arity("DBSIZE"));
713    }
714    Ok(Command::DbSize)
715}
716
717fn parse_info(args: &[Frame]) -> Result<Command, ProtocolError> {
718    match args.len() {
719        0 => Ok(Command::Info { section: None }),
720        1 => {
721            let section = extract_string(&args[0])?;
722            Ok(Command::Info {
723                section: Some(section),
724            })
725        }
726        _ => Err(wrong_arity("INFO")),
727    }
728}
729
730fn parse_bgsave(args: &[Frame]) -> Result<Command, ProtocolError> {
731    if !args.is_empty() {
732        return Err(wrong_arity("BGSAVE"));
733    }
734    Ok(Command::BgSave)
735}
736
737fn parse_bgrewriteaof(args: &[Frame]) -> Result<Command, ProtocolError> {
738    if !args.is_empty() {
739        return Err(wrong_arity("BGREWRITEAOF"));
740    }
741    Ok(Command::BgRewriteAof)
742}
743
744fn parse_flushdb(args: &[Frame]) -> Result<Command, ProtocolError> {
745    if args.is_empty() {
746        return Ok(Command::FlushDb { async_mode: false });
747    }
748    if args.len() == 1 {
749        let arg = extract_string(&args[0])?;
750        if arg.eq_ignore_ascii_case("ASYNC") {
751            return Ok(Command::FlushDb { async_mode: true });
752        }
753    }
754    Err(wrong_arity("FLUSHDB"))
755}
756
757fn parse_unlink(args: &[Frame]) -> Result<Command, ProtocolError> {
758    if args.is_empty() {
759        return Err(wrong_arity("UNLINK"));
760    }
761    let keys = extract_strings(args)?;
762    Ok(Command::Unlink { keys })
763}
764
765fn parse_scan(args: &[Frame]) -> Result<Command, ProtocolError> {
766    if args.is_empty() {
767        return Err(wrong_arity("SCAN"));
768    }
769
770    let cursor = parse_u64(&args[0], "SCAN")?;
771    let mut pattern = None;
772    let mut count = None;
773    let mut idx = 1;
774
775    while idx < args.len() {
776        let mut kw = [0u8; MAX_KEYWORD_LEN];
777        let flag = uppercase_arg(&args[idx], &mut kw)?;
778        match flag {
779            "MATCH" => {
780                idx += 1;
781                if idx >= args.len() {
782                    return Err(wrong_arity("SCAN"));
783                }
784                pattern = Some(extract_string(&args[idx])?);
785                idx += 1;
786            }
787            "COUNT" => {
788                idx += 1;
789                if idx >= args.len() {
790                    return Err(wrong_arity("SCAN"));
791                }
792                let n = parse_u64(&args[idx], "SCAN")?;
793                if n > MAX_SCAN_COUNT {
794                    return Err(ProtocolError::InvalidCommandFrame(format!(
795                        "SCAN COUNT {n} exceeds max {MAX_SCAN_COUNT}"
796                    )));
797                }
798                count = Some(n as usize);
799                idx += 1;
800            }
801            _ => {
802                return Err(ProtocolError::InvalidCommandFrame(format!(
803                    "unsupported SCAN option '{flag}'"
804                )));
805            }
806        }
807    }
808
809    Ok(Command::Scan {
810        cursor,
811        pattern,
812        count,
813    })
814}
815
816/// Shared parser for SSCAN, HSCAN, ZSCAN.
817///
818/// All three share the same shape: `key cursor [MATCH pattern] [COUNT count]`.
819fn parse_key_scan(args: &[Frame], cmd: &'static str) -> Result<Command, ProtocolError> {
820    if args.len() < 2 {
821        return Err(wrong_arity(cmd));
822    }
823
824    let key = extract_string(&args[0])?;
825    let cursor = parse_u64(&args[1], cmd)?;
826    let mut pattern = None;
827    let mut count = None;
828    let mut idx = 2;
829
830    while idx < args.len() {
831        let mut kw = [0u8; MAX_KEYWORD_LEN];
832        let flag = uppercase_arg(&args[idx], &mut kw)?;
833        match flag {
834            "MATCH" => {
835                idx += 1;
836                if idx >= args.len() {
837                    return Err(wrong_arity(cmd));
838                }
839                pattern = Some(extract_string(&args[idx])?);
840                idx += 1;
841            }
842            "COUNT" => {
843                idx += 1;
844                if idx >= args.len() {
845                    return Err(wrong_arity(cmd));
846                }
847                let n = parse_u64(&args[idx], cmd)?;
848                if n > MAX_SCAN_COUNT {
849                    return Err(ProtocolError::InvalidCommandFrame(format!(
850                        "{cmd} COUNT {n} exceeds max {MAX_SCAN_COUNT}"
851                    )));
852                }
853                count = Some(n as usize);
854                idx += 1;
855            }
856            _ => {
857                return Err(ProtocolError::InvalidCommandFrame(format!(
858                    "unsupported {cmd} option '{flag}'"
859                )));
860            }
861        }
862    }
863
864    match cmd {
865        "SSCAN" => Ok(Command::SScan {
866            key,
867            cursor,
868            pattern,
869            count,
870        }),
871        "HSCAN" => Ok(Command::HScan {
872            key,
873            cursor,
874            pattern,
875            count,
876        }),
877        "ZSCAN" => Ok(Command::ZScan {
878            key,
879            cursor,
880            pattern,
881            count,
882        }),
883        _ => Err(ProtocolError::InvalidCommandFrame(format!(
884            "unknown scan command '{cmd}'"
885        ))),
886    }
887}
888
889/// Parses a frame's bytes directly as an i64 without allocating a String.
890fn parse_i64(frame: &Frame, cmd: &str) -> Result<i64, ProtocolError> {
891    let bytes = extract_raw_bytes(frame)?;
892    parse_i64_bytes(bytes).ok_or_else(|| {
893        ProtocolError::InvalidCommandFrame(format!("value is not a valid integer for '{cmd}'"))
894    })
895}
896
897/// Parses a signed integer directly from a byte slice. Accumulates in the
898/// negative direction for negative numbers so that `i64::MIN` is representable.
899fn parse_i64_bytes(buf: &[u8]) -> Option<i64> {
900    if buf.is_empty() {
901        return None;
902    }
903    let (negative, digits) = if buf[0] == b'-' {
904        (true, &buf[1..])
905    } else {
906        (false, buf)
907    };
908    if digits.is_empty() {
909        return None;
910    }
911    if negative {
912        let mut n: i64 = 0;
913        for &b in digits {
914            if !b.is_ascii_digit() {
915                return None;
916            }
917            n = n.checked_mul(10)?.checked_sub((b - b'0') as i64)?;
918        }
919        Some(n)
920    } else {
921        let mut n: i64 = 0;
922        for &b in digits {
923            if !b.is_ascii_digit() {
924                return None;
925            }
926            n = n.checked_mul(10)?.checked_add((b - b'0') as i64)?;
927        }
928        Some(n)
929    }
930}
931
932fn parse_lpush(args: &[Frame]) -> Result<Command, ProtocolError> {
933    if args.len() < 2 {
934        return Err(wrong_arity("LPUSH"));
935    }
936    let key = extract_string(&args[0])?;
937    let values = extract_bytes_vec(&args[1..])?;
938    Ok(Command::LPush { key, values })
939}
940
941fn parse_rpush(args: &[Frame]) -> Result<Command, ProtocolError> {
942    if args.len() < 2 {
943        return Err(wrong_arity("RPUSH"));
944    }
945    let key = extract_string(&args[0])?;
946    let values = extract_bytes_vec(&args[1..])?;
947    Ok(Command::RPush { key, values })
948}
949
950fn parse_lpop(args: &[Frame]) -> Result<Command, ProtocolError> {
951    if args.len() != 1 {
952        return Err(wrong_arity("LPOP"));
953    }
954    let key = extract_string(&args[0])?;
955    Ok(Command::LPop { key })
956}
957
958fn parse_rpop(args: &[Frame]) -> Result<Command, ProtocolError> {
959    if args.len() != 1 {
960        return Err(wrong_arity("RPOP"));
961    }
962    let key = extract_string(&args[0])?;
963    Ok(Command::RPop { key })
964}
965
966fn parse_lrange(args: &[Frame]) -> Result<Command, ProtocolError> {
967    if args.len() != 3 {
968        return Err(wrong_arity("LRANGE"));
969    }
970    let key = extract_string(&args[0])?;
971    let start = parse_i64(&args[1], "LRANGE")?;
972    let stop = parse_i64(&args[2], "LRANGE")?;
973    Ok(Command::LRange { key, start, stop })
974}
975
976fn parse_llen(args: &[Frame]) -> Result<Command, ProtocolError> {
977    if args.len() != 1 {
978        return Err(wrong_arity("LLEN"));
979    }
980    let key = extract_string(&args[0])?;
981    Ok(Command::LLen { key })
982}
983
984/// Parses BLPOP/BRPOP: all args except the last are keys, the last is the
985/// timeout in seconds (float). At least one key is required.
986fn parse_blpop(args: &[Frame]) -> Result<Command, ProtocolError> {
987    if args.len() < 2 {
988        return Err(wrong_arity("BLPOP"));
989    }
990    let timeout_secs = parse_timeout(&args[args.len() - 1], "BLPOP")?;
991    let keys = extract_strings(&args[..args.len() - 1])?;
992    Ok(Command::BLPop { keys, timeout_secs })
993}
994
995fn parse_brpop(args: &[Frame]) -> Result<Command, ProtocolError> {
996    if args.len() < 2 {
997        return Err(wrong_arity("BRPOP"));
998    }
999    let timeout_secs = parse_timeout(&args[args.len() - 1], "BRPOP")?;
1000    let keys = extract_strings(&args[..args.len() - 1])?;
1001    Ok(Command::BRPop { keys, timeout_secs })
1002}
1003
1004fn parse_lindex(args: &[Frame]) -> Result<Command, ProtocolError> {
1005    if args.len() != 2 {
1006        return Err(wrong_arity("LINDEX"));
1007    }
1008    let key = extract_string(&args[0])?;
1009    let index = parse_i64(&args[1], "LINDEX")?;
1010    Ok(Command::LIndex { key, index })
1011}
1012
1013fn parse_lset(args: &[Frame]) -> Result<Command, ProtocolError> {
1014    if args.len() != 3 {
1015        return Err(wrong_arity("LSET"));
1016    }
1017    let key = extract_string(&args[0])?;
1018    let index = parse_i64(&args[1], "LSET")?;
1019    let value = extract_bytes(&args[2])?;
1020    Ok(Command::LSet { key, index, value })
1021}
1022
1023fn parse_ltrim(args: &[Frame]) -> Result<Command, ProtocolError> {
1024    if args.len() != 3 {
1025        return Err(wrong_arity("LTRIM"));
1026    }
1027    let key = extract_string(&args[0])?;
1028    let start = parse_i64(&args[1], "LTRIM")?;
1029    let stop = parse_i64(&args[2], "LTRIM")?;
1030    Ok(Command::LTrim { key, start, stop })
1031}
1032
1033fn parse_linsert(args: &[Frame]) -> Result<Command, ProtocolError> {
1034    if args.len() != 4 {
1035        return Err(wrong_arity("LINSERT"));
1036    }
1037    let key = extract_string(&args[0])?;
1038    let direction = extract_string(&args[1])?;
1039    let before = match direction.to_ascii_uppercase().as_str() {
1040        "BEFORE" => true,
1041        "AFTER" => false,
1042        _ => {
1043            return Err(ProtocolError::InvalidCommandFrame(
1044                "ERR syntax error".into(),
1045            ))
1046        }
1047    };
1048    let pivot = extract_bytes(&args[2])?;
1049    let value = extract_bytes(&args[3])?;
1050    Ok(Command::LInsert {
1051        key,
1052        before,
1053        pivot,
1054        value,
1055    })
1056}
1057
1058fn parse_lrem(args: &[Frame]) -> Result<Command, ProtocolError> {
1059    if args.len() != 3 {
1060        return Err(wrong_arity("LREM"));
1061    }
1062    let key = extract_string(&args[0])?;
1063    let count = parse_i64(&args[1], "LREM")?;
1064    let value = extract_bytes(&args[2])?;
1065    Ok(Command::LRem { key, count, value })
1066}
1067
1068fn parse_lpos(args: &[Frame]) -> Result<Command, ProtocolError> {
1069    if args.is_empty() {
1070        return Err(wrong_arity("LPOS"));
1071    }
1072    let key = extract_string(&args[0])?;
1073    if args.len() < 2 {
1074        return Err(wrong_arity("LPOS"));
1075    }
1076    let element = extract_bytes(&args[1])?;
1077
1078    let mut rank: i64 = 1;
1079    let mut count: Option<usize> = None;
1080    let mut maxlen: usize = 0;
1081
1082    let mut i = 2;
1083    while i < args.len() {
1084        let opt = extract_string(&args[i])?.to_ascii_uppercase();
1085        match opt.as_str() {
1086            "RANK" => {
1087                i += 1;
1088                if i >= args.len() {
1089                    return Err(ProtocolError::InvalidCommandFrame(
1090                        "ERR syntax error".into(),
1091                    ));
1092                }
1093                rank = parse_i64(&args[i], "LPOS")?;
1094                if rank == 0 {
1095                    return Err(ProtocolError::InvalidCommandFrame(
1096                        "ERR RANK can't be zero: use 1 to start from the first match, 2 from the second ... or use negative values for starting from the end of the list".into(),
1097                    ));
1098                }
1099            }
1100            "COUNT" => {
1101                i += 1;
1102                if i >= args.len() {
1103                    return Err(ProtocolError::InvalidCommandFrame(
1104                        "ERR syntax error".into(),
1105                    ));
1106                }
1107                let n = parse_i64(&args[i], "LPOS")?;
1108                if n < 0 {
1109                    return Err(ProtocolError::InvalidCommandFrame(
1110                        "ERR COUNT can't be negative".into(),
1111                    ));
1112                }
1113                count = Some(n as usize);
1114            }
1115            "MAXLEN" => {
1116                i += 1;
1117                if i >= args.len() {
1118                    return Err(ProtocolError::InvalidCommandFrame(
1119                        "ERR syntax error".into(),
1120                    ));
1121                }
1122                let n = parse_i64(&args[i], "LPOS")?;
1123                if n < 0 {
1124                    return Err(ProtocolError::InvalidCommandFrame(
1125                        "ERR MAXLEN can't be negative".into(),
1126                    ));
1127                }
1128                maxlen = n as usize;
1129            }
1130            _ => {
1131                return Err(ProtocolError::InvalidCommandFrame(
1132                    "ERR syntax error".into(),
1133                ))
1134            }
1135        }
1136        i += 1;
1137    }
1138
1139    Ok(Command::LPos {
1140        key,
1141        element,
1142        rank,
1143        count,
1144        maxlen,
1145    })
1146}
1147
1148/// Extracts the timeout argument for BLPOP/BRPOP. Redis accepts integer or
1149/// float seconds; negative values are an error.
1150fn parse_timeout(frame: &Frame, cmd: &str) -> Result<f64, ProtocolError> {
1151    let bytes = extract_raw_bytes(frame)?;
1152    let s = std::str::from_utf8(bytes).map_err(|_| {
1153        ProtocolError::InvalidCommandFrame(format!(
1154            "timeout is not a float or out of range for '{cmd}'"
1155        ))
1156    })?;
1157    let val: f64 = s.parse().map_err(|_| {
1158        ProtocolError::InvalidCommandFrame(format!(
1159            "timeout is not a float or out of range for '{cmd}'"
1160        ))
1161    })?;
1162    if val < 0.0 {
1163        return Err(ProtocolError::InvalidCommandFrame(format!(
1164            "timeout is negative for '{cmd}'"
1165        )));
1166    }
1167    Ok(val)
1168}
1169
1170fn parse_type(args: &[Frame]) -> Result<Command, ProtocolError> {
1171    if args.len() != 1 {
1172        return Err(wrong_arity("TYPE"));
1173    }
1174    let key = extract_string(&args[0])?;
1175    Ok(Command::Type { key })
1176}
1177
1178/// Parses a string argument as an f64 score.
1179fn parse_f64(frame: &Frame, cmd: &str) -> Result<f64, ProtocolError> {
1180    let bytes = extract_raw_bytes(frame)?;
1181    let s = std::str::from_utf8(bytes).map_err(|_| {
1182        ProtocolError::InvalidCommandFrame(format!("value is not a valid float for '{cmd}'"))
1183    })?;
1184    let v = s.parse::<f64>().map_err(|_| {
1185        ProtocolError::InvalidCommandFrame(format!("value is not a valid float for '{cmd}'"))
1186    })?;
1187    if v.is_nan() || v.is_infinite() {
1188        return Err(ProtocolError::InvalidCommandFrame(format!(
1189            "value is not a valid finite float for '{cmd}'"
1190        )));
1191    }
1192    Ok(v)
1193}
1194
1195fn parse_zadd(args: &[Frame]) -> Result<Command, ProtocolError> {
1196    // ZADD key [NX|XX] [GT|LT] [CH] score member [score member ...]
1197    if args.len() < 3 {
1198        return Err(wrong_arity("ZADD"));
1199    }
1200
1201    let key = extract_string(&args[0])?;
1202    let mut flags = ZAddFlags::default();
1203    let mut idx = 1;
1204
1205    // parse optional flags before score/member pairs
1206    while idx < args.len() {
1207        let mut kw = [0u8; MAX_KEYWORD_LEN];
1208        let s = uppercase_arg(&args[idx], &mut kw)?;
1209        match s {
1210            "NX" => {
1211                flags.nx = true;
1212                idx += 1;
1213            }
1214            "XX" => {
1215                flags.xx = true;
1216                idx += 1;
1217            }
1218            "GT" => {
1219                flags.gt = true;
1220                idx += 1;
1221            }
1222            "LT" => {
1223                flags.lt = true;
1224                idx += 1;
1225            }
1226            "CH" => {
1227                flags.ch = true;
1228                idx += 1;
1229            }
1230            _ => break,
1231        }
1232    }
1233
1234    // NX and XX are mutually exclusive
1235    if flags.nx && flags.xx {
1236        return Err(ProtocolError::InvalidCommandFrame(
1237            "XX and NX options at the same time are not compatible".into(),
1238        ));
1239    }
1240    // GT and LT are mutually exclusive
1241    if flags.gt && flags.lt {
1242        return Err(ProtocolError::InvalidCommandFrame(
1243            "GT and LT options at the same time are not compatible".into(),
1244        ));
1245    }
1246
1247    // remaining args must be score/member pairs
1248    let remaining = &args[idx..];
1249    if remaining.is_empty() || !remaining.len().is_multiple_of(2) {
1250        return Err(wrong_arity("ZADD"));
1251    }
1252
1253    let mut members = Vec::with_capacity(remaining.len() / 2);
1254    for pair in remaining.chunks(2) {
1255        let score = parse_f64(&pair[0], "ZADD")?;
1256        let member = extract_string(&pair[1])?;
1257        members.push((score, member));
1258    }
1259
1260    Ok(Command::ZAdd {
1261        key,
1262        flags,
1263        members,
1264    })
1265}
1266
1267fn parse_zcard(args: &[Frame]) -> Result<Command, ProtocolError> {
1268    if args.len() != 1 {
1269        return Err(wrong_arity("ZCARD"));
1270    }
1271    let key = extract_string(&args[0])?;
1272    Ok(Command::ZCard { key })
1273}
1274
1275fn parse_zrem(args: &[Frame]) -> Result<Command, ProtocolError> {
1276    if args.len() < 2 {
1277        return Err(wrong_arity("ZREM"));
1278    }
1279    let key = extract_string(&args[0])?;
1280    let members = extract_strings(&args[1..])?;
1281    Ok(Command::ZRem { key, members })
1282}
1283
1284fn parse_zscore(args: &[Frame]) -> Result<Command, ProtocolError> {
1285    if args.len() != 2 {
1286        return Err(wrong_arity("ZSCORE"));
1287    }
1288    let key = extract_string(&args[0])?;
1289    let member = extract_string(&args[1])?;
1290    Ok(Command::ZScore { key, member })
1291}
1292
1293fn parse_zrank(args: &[Frame]) -> Result<Command, ProtocolError> {
1294    if args.len() != 2 {
1295        return Err(wrong_arity("ZRANK"));
1296    }
1297    let key = extract_string(&args[0])?;
1298    let member = extract_string(&args[1])?;
1299    Ok(Command::ZRank { key, member })
1300}
1301
1302fn parse_zrange(args: &[Frame]) -> Result<Command, ProtocolError> {
1303    if args.len() < 3 || args.len() > 4 {
1304        return Err(wrong_arity("ZRANGE"));
1305    }
1306    let key = extract_string(&args[0])?;
1307    let start = parse_i64(&args[1], "ZRANGE")?;
1308    let stop = parse_i64(&args[2], "ZRANGE")?;
1309
1310    let with_scores = if args.len() == 4 {
1311        let mut kw = [0u8; MAX_KEYWORD_LEN];
1312        let opt = uppercase_arg(&args[3], &mut kw)?;
1313        if opt != "WITHSCORES" {
1314            return Err(ProtocolError::InvalidCommandFrame(format!(
1315                "unsupported ZRANGE option '{opt}'"
1316            )));
1317        }
1318        true
1319    } else {
1320        false
1321    };
1322
1323    Ok(Command::ZRange {
1324        key,
1325        start,
1326        stop,
1327        with_scores,
1328    })
1329}
1330
1331fn parse_zrevrange(args: &[Frame]) -> Result<Command, ProtocolError> {
1332    if args.len() < 3 || args.len() > 4 {
1333        return Err(wrong_arity("ZREVRANGE"));
1334    }
1335    let key = extract_string(&args[0])?;
1336    let start = parse_i64(&args[1], "ZREVRANGE")?;
1337    let stop = parse_i64(&args[2], "ZREVRANGE")?;
1338
1339    let with_scores = if args.len() == 4 {
1340        let mut kw = [0u8; MAX_KEYWORD_LEN];
1341        let opt = uppercase_arg(&args[3], &mut kw)?;
1342        if opt != "WITHSCORES" {
1343            return Err(ProtocolError::InvalidCommandFrame(format!(
1344                "unsupported ZREVRANGE option '{opt}'"
1345            )));
1346        }
1347        true
1348    } else {
1349        false
1350    };
1351
1352    Ok(Command::ZRevRange {
1353        key,
1354        start,
1355        stop,
1356        with_scores,
1357    })
1358}
1359
1360fn parse_zrevrank(args: &[Frame]) -> Result<Command, ProtocolError> {
1361    if args.len() != 2 {
1362        return Err(wrong_arity("ZREVRANK"));
1363    }
1364    let key = extract_string(&args[0])?;
1365    let member = extract_string(&args[1])?;
1366    Ok(Command::ZRevRank { key, member })
1367}
1368
1369/// Parses a Redis score bound string.
1370///
1371/// Supports `-inf`, `+inf`, `inf`, exclusive `(value`, and plain inclusive values.
1372fn parse_score_bound(frame: &Frame, cmd: &str) -> Result<ScoreBound, ProtocolError> {
1373    let bytes = extract_raw_bytes(frame)?;
1374    let s = std::str::from_utf8(bytes).map_err(|_| {
1375        ProtocolError::InvalidCommandFrame(format!("invalid score bound for '{cmd}'"))
1376    })?;
1377
1378    match s {
1379        "-inf" => Ok(ScoreBound::NegInf),
1380        "+inf" | "inf" => Ok(ScoreBound::PosInf),
1381        _ if s.starts_with('(') => {
1382            let val = s[1..].parse::<f64>().map_err(|_| {
1383                ProtocolError::InvalidCommandFrame(format!("min or max is not a float for '{cmd}'"))
1384            })?;
1385            Ok(ScoreBound::Exclusive(val))
1386        }
1387        _ => {
1388            let val = s.parse::<f64>().map_err(|_| {
1389                ProtocolError::InvalidCommandFrame(format!("min or max is not a float for '{cmd}'"))
1390            })?;
1391            Ok(ScoreBound::Inclusive(val))
1392        }
1393    }
1394}
1395
1396fn parse_zcount(args: &[Frame]) -> Result<Command, ProtocolError> {
1397    if args.len() != 3 {
1398        return Err(wrong_arity("ZCOUNT"));
1399    }
1400    let key = extract_string(&args[0])?;
1401    let min = parse_score_bound(&args[1], "ZCOUNT")?;
1402    let max = parse_score_bound(&args[2], "ZCOUNT")?;
1403    Ok(Command::ZCount { key, min, max })
1404}
1405
1406fn parse_zincrby(args: &[Frame]) -> Result<Command, ProtocolError> {
1407    if args.len() != 3 {
1408        return Err(wrong_arity("ZINCRBY"));
1409    }
1410    let key = extract_string(&args[0])?;
1411    let increment = parse_f64(&args[1], "ZINCRBY")?;
1412    let member = extract_string(&args[2])?;
1413    Ok(Command::ZIncrBy {
1414        key,
1415        increment,
1416        member,
1417    })
1418}
1419
1420/// Parses ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
1421fn parse_zrangebyscore(args: &[Frame]) -> Result<Command, ProtocolError> {
1422    if args.len() < 3 {
1423        return Err(wrong_arity("ZRANGEBYSCORE"));
1424    }
1425    let key = extract_string(&args[0])?;
1426    let min = parse_score_bound(&args[1], "ZRANGEBYSCORE")?;
1427    let max = parse_score_bound(&args[2], "ZRANGEBYSCORE")?;
1428
1429    let mut with_scores = false;
1430    let mut offset = 0usize;
1431    let mut count = None;
1432    let mut idx = 3;
1433
1434    while idx < args.len() {
1435        let mut kw = [0u8; MAX_KEYWORD_LEN];
1436        let opt = uppercase_arg(&args[idx], &mut kw)?;
1437        match opt {
1438            "WITHSCORES" => {
1439                with_scores = true;
1440                idx += 1;
1441            }
1442            "LIMIT" => {
1443                if idx + 2 >= args.len() {
1444                    return Err(wrong_arity("ZRANGEBYSCORE"));
1445                }
1446                offset = parse_i64(&args[idx + 1], "ZRANGEBYSCORE")? as usize;
1447                count = Some(parse_i64(&args[idx + 2], "ZRANGEBYSCORE")? as usize);
1448                idx += 3;
1449            }
1450            _ => {
1451                return Err(ProtocolError::InvalidCommandFrame(format!(
1452                    "unsupported ZRANGEBYSCORE option '{opt}'"
1453                )));
1454            }
1455        }
1456    }
1457
1458    Ok(Command::ZRangeByScore {
1459        key,
1460        min,
1461        max,
1462        with_scores,
1463        offset,
1464        count,
1465    })
1466}
1467
1468/// Parses ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
1469///
1470/// Note: Redis reverses min/max argument order for this command.
1471fn parse_zrevrangebyscore(args: &[Frame]) -> Result<Command, ProtocolError> {
1472    if args.len() < 3 {
1473        return Err(wrong_arity("ZREVRANGEBYSCORE"));
1474    }
1475    let key = extract_string(&args[0])?;
1476    // Redis: ZREVRANGEBYSCORE key max min — the order is reversed
1477    let max = parse_score_bound(&args[1], "ZREVRANGEBYSCORE")?;
1478    let min = parse_score_bound(&args[2], "ZREVRANGEBYSCORE")?;
1479
1480    let mut with_scores = false;
1481    let mut offset = 0usize;
1482    let mut count = None;
1483    let mut idx = 3;
1484
1485    while idx < args.len() {
1486        let mut kw = [0u8; MAX_KEYWORD_LEN];
1487        let opt = uppercase_arg(&args[idx], &mut kw)?;
1488        match opt {
1489            "WITHSCORES" => {
1490                with_scores = true;
1491                idx += 1;
1492            }
1493            "LIMIT" => {
1494                if idx + 2 >= args.len() {
1495                    return Err(wrong_arity("ZREVRANGEBYSCORE"));
1496                }
1497                offset = parse_i64(&args[idx + 1], "ZREVRANGEBYSCORE")? as usize;
1498                count = Some(parse_i64(&args[idx + 2], "ZREVRANGEBYSCORE")? as usize);
1499                idx += 3;
1500            }
1501            _ => {
1502                return Err(ProtocolError::InvalidCommandFrame(format!(
1503                    "unsupported ZREVRANGEBYSCORE option '{opt}'"
1504                )));
1505            }
1506        }
1507    }
1508
1509    Ok(Command::ZRevRangeByScore {
1510        key,
1511        min,
1512        max,
1513        with_scores,
1514        offset,
1515        count,
1516    })
1517}
1518
1519/// Shared argument parsing for ZPOPMIN/ZPOPMAX: key [count]
1520fn parse_zpop_args(args: &[Frame], cmd: &'static str) -> Result<(String, usize), ProtocolError> {
1521    if args.is_empty() || args.len() > 2 {
1522        return Err(wrong_arity(cmd));
1523    }
1524    let key = extract_string(&args[0])?;
1525    let count = if args.len() == 2 {
1526        let c = parse_i64(&args[1], cmd)?;
1527        if c < 0 {
1528            return Err(ProtocolError::InvalidCommandFrame(format!(
1529                "value is out of range for '{cmd}'"
1530            )));
1531        }
1532        c as usize
1533    } else {
1534        1
1535    };
1536    Ok((key, count))
1537}
1538
1539// --- hash commands ---
1540
1541fn parse_hset(args: &[Frame]) -> Result<Command, ProtocolError> {
1542    // HSET key field value [field value ...]
1543    // args = [key, field, value, ...]
1544    // Need at least 3 args, and after key we need pairs (so remaining count must be even)
1545    if args.len() < 3 || !(args.len() - 1).is_multiple_of(2) {
1546        return Err(wrong_arity("HSET"));
1547    }
1548
1549    let key = extract_string(&args[0])?;
1550    let mut fields = Vec::with_capacity((args.len() - 1) / 2);
1551
1552    for chunk in args[1..].chunks(2) {
1553        let field = extract_string(&chunk[0])?;
1554        let value = extract_bytes(&chunk[1])?;
1555        fields.push((field, value));
1556    }
1557
1558    Ok(Command::HSet { key, fields })
1559}
1560
1561fn parse_hget(args: &[Frame]) -> Result<Command, ProtocolError> {
1562    if args.len() != 2 {
1563        return Err(wrong_arity("HGET"));
1564    }
1565    let key = extract_string(&args[0])?;
1566    let field = extract_string(&args[1])?;
1567    Ok(Command::HGet { key, field })
1568}
1569
1570fn parse_hgetall(args: &[Frame]) -> Result<Command, ProtocolError> {
1571    if args.len() != 1 {
1572        return Err(wrong_arity("HGETALL"));
1573    }
1574    let key = extract_string(&args[0])?;
1575    Ok(Command::HGetAll { key })
1576}
1577
1578fn parse_hdel(args: &[Frame]) -> Result<Command, ProtocolError> {
1579    if args.len() < 2 {
1580        return Err(wrong_arity("HDEL"));
1581    }
1582    let key = extract_string(&args[0])?;
1583    let fields = extract_strings(&args[1..])?;
1584    Ok(Command::HDel { key, fields })
1585}
1586
1587fn parse_hexists(args: &[Frame]) -> Result<Command, ProtocolError> {
1588    if args.len() != 2 {
1589        return Err(wrong_arity("HEXISTS"));
1590    }
1591    let key = extract_string(&args[0])?;
1592    let field = extract_string(&args[1])?;
1593    Ok(Command::HExists { key, field })
1594}
1595
1596fn parse_hlen(args: &[Frame]) -> Result<Command, ProtocolError> {
1597    if args.len() != 1 {
1598        return Err(wrong_arity("HLEN"));
1599    }
1600    let key = extract_string(&args[0])?;
1601    Ok(Command::HLen { key })
1602}
1603
1604fn parse_hincrby(args: &[Frame]) -> Result<Command, ProtocolError> {
1605    if args.len() != 3 {
1606        return Err(wrong_arity("HINCRBY"));
1607    }
1608    let key = extract_string(&args[0])?;
1609    let field = extract_string(&args[1])?;
1610    let delta = parse_i64(&args[2], "HINCRBY")?;
1611    Ok(Command::HIncrBy { key, field, delta })
1612}
1613
1614fn parse_hkeys(args: &[Frame]) -> Result<Command, ProtocolError> {
1615    if args.len() != 1 {
1616        return Err(wrong_arity("HKEYS"));
1617    }
1618    let key = extract_string(&args[0])?;
1619    Ok(Command::HKeys { key })
1620}
1621
1622fn parse_hvals(args: &[Frame]) -> Result<Command, ProtocolError> {
1623    if args.len() != 1 {
1624        return Err(wrong_arity("HVALS"));
1625    }
1626    let key = extract_string(&args[0])?;
1627    Ok(Command::HVals { key })
1628}
1629
1630fn parse_hmget(args: &[Frame]) -> Result<Command, ProtocolError> {
1631    if args.len() < 2 {
1632        return Err(wrong_arity("HMGET"));
1633    }
1634    let key = extract_string(&args[0])?;
1635    let fields = extract_strings(&args[1..])?;
1636    Ok(Command::HMGet { key, fields })
1637}
1638
1639// --- set commands ---
1640
1641fn parse_sadd(args: &[Frame]) -> Result<Command, ProtocolError> {
1642    if args.len() < 2 {
1643        return Err(wrong_arity("SADD"));
1644    }
1645    let key = extract_string(&args[0])?;
1646    let members = extract_strings(&args[1..])?;
1647    Ok(Command::SAdd { key, members })
1648}
1649
1650fn parse_srem(args: &[Frame]) -> Result<Command, ProtocolError> {
1651    if args.len() < 2 {
1652        return Err(wrong_arity("SREM"));
1653    }
1654    let key = extract_string(&args[0])?;
1655    let members = extract_strings(&args[1..])?;
1656    Ok(Command::SRem { key, members })
1657}
1658
1659fn parse_smembers(args: &[Frame]) -> Result<Command, ProtocolError> {
1660    if args.len() != 1 {
1661        return Err(wrong_arity("SMEMBERS"));
1662    }
1663    let key = extract_string(&args[0])?;
1664    Ok(Command::SMembers { key })
1665}
1666
1667fn parse_sismember(args: &[Frame]) -> Result<Command, ProtocolError> {
1668    if args.len() != 2 {
1669        return Err(wrong_arity("SISMEMBER"));
1670    }
1671    let key = extract_string(&args[0])?;
1672    let member = extract_string(&args[1])?;
1673    Ok(Command::SIsMember { key, member })
1674}
1675
1676fn parse_scard(args: &[Frame]) -> Result<Command, ProtocolError> {
1677    if args.len() != 1 {
1678        return Err(wrong_arity("SCARD"));
1679    }
1680    let key = extract_string(&args[0])?;
1681    Ok(Command::SCard { key })
1682}
1683
1684fn parse_multi_key_set(cmd: &'static str, args: &[Frame]) -> Result<Command, ProtocolError> {
1685    if args.is_empty() {
1686        return Err(wrong_arity(cmd));
1687    }
1688    let keys = extract_strings(args)?;
1689    match cmd {
1690        "SUNION" => Ok(Command::SUnion { keys }),
1691        "SINTER" => Ok(Command::SInter { keys }),
1692        "SDIFF" => Ok(Command::SDiff { keys }),
1693        _ => Err(wrong_arity(cmd)),
1694    }
1695}
1696
1697fn parse_store_set(cmd: &'static str, args: &[Frame]) -> Result<Command, ProtocolError> {
1698    if args.len() < 2 {
1699        return Err(wrong_arity(cmd));
1700    }
1701    let dest = extract_string(&args[0])?;
1702    let keys = extract_strings(&args[1..])?;
1703    match cmd {
1704        "SUNIONSTORE" => Ok(Command::SUnionStore { dest, keys }),
1705        "SINTERSTORE" => Ok(Command::SInterStore { dest, keys }),
1706        "SDIFFSTORE" => Ok(Command::SDiffStore { dest, keys }),
1707        _ => Err(wrong_arity(cmd)),
1708    }
1709}
1710
1711fn parse_srandmember(args: &[Frame]) -> Result<Command, ProtocolError> {
1712    if args.is_empty() || args.len() > 2 {
1713        return Err(wrong_arity("SRANDMEMBER"));
1714    }
1715    let key = extract_string(&args[0])?;
1716    let count = if args.len() == 2 {
1717        let s = extract_string(&args[1])?;
1718        let n: i64 = s.parse().map_err(|_| {
1719            ProtocolError::InvalidCommandFrame("ERR value is not an integer or out of range".into())
1720        })?;
1721        Some(n)
1722    } else {
1723        None
1724    };
1725    Ok(Command::SRandMember { key, count })
1726}
1727
1728fn parse_spop(args: &[Frame]) -> Result<Command, ProtocolError> {
1729    if args.is_empty() || args.len() > 2 {
1730        return Err(wrong_arity("SPOP"));
1731    }
1732    let key = extract_string(&args[0])?;
1733    let count = if args.len() == 2 {
1734        let s = extract_string(&args[1])?;
1735        let n: i64 = s.parse().map_err(|_| {
1736            ProtocolError::InvalidCommandFrame("ERR value is not an integer or out of range".into())
1737        })?;
1738        if n < 0 {
1739            return Err(ProtocolError::InvalidCommandFrame(
1740                "ERR value is not an integer or out of range".into(),
1741            ));
1742        }
1743        n as usize
1744    } else {
1745        1
1746    };
1747    Ok(Command::SPop { key, count })
1748}
1749
1750fn parse_smismember(args: &[Frame]) -> Result<Command, ProtocolError> {
1751    if args.len() < 2 {
1752        return Err(wrong_arity("SMISMEMBER"));
1753    }
1754    let key = extract_string(&args[0])?;
1755    let members = extract_strings(&args[1..])?;
1756    Ok(Command::SMisMember { key, members })
1757}
1758
1759// --- cluster commands ---
1760
1761fn parse_cluster(args: &[Frame]) -> Result<Command, ProtocolError> {
1762    if args.is_empty() {
1763        return Err(wrong_arity("CLUSTER"));
1764    }
1765
1766    let mut kw = [0u8; MAX_KEYWORD_LEN];
1767    let subcommand = uppercase_arg(&args[0], &mut kw)?;
1768    match subcommand {
1769        "INFO" => {
1770            if args.len() != 1 {
1771                return Err(wrong_arity("CLUSTER INFO"));
1772            }
1773            Ok(Command::ClusterInfo)
1774        }
1775        "NODES" => {
1776            if args.len() != 1 {
1777                return Err(wrong_arity("CLUSTER NODES"));
1778            }
1779            Ok(Command::ClusterNodes)
1780        }
1781        "SLOTS" => {
1782            if args.len() != 1 {
1783                return Err(wrong_arity("CLUSTER SLOTS"));
1784            }
1785            Ok(Command::ClusterSlots)
1786        }
1787        "KEYSLOT" => {
1788            if args.len() != 2 {
1789                return Err(wrong_arity("CLUSTER KEYSLOT"));
1790            }
1791            let key = extract_string(&args[1])?;
1792            Ok(Command::ClusterKeySlot { key })
1793        }
1794        "MYID" => {
1795            if args.len() != 1 {
1796                return Err(wrong_arity("CLUSTER MYID"));
1797            }
1798            Ok(Command::ClusterMyId)
1799        }
1800        "SETSLOT" => parse_cluster_setslot(&args[1..]),
1801        "MEET" => {
1802            if args.len() != 3 {
1803                return Err(wrong_arity("CLUSTER MEET"));
1804            }
1805            let ip = extract_string(&args[1])?;
1806            let p = parse_u64(&args[2], "CLUSTER MEET")?;
1807            let port = u16::try_from(p)
1808                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid port number".into()))?;
1809            Ok(Command::ClusterMeet { ip, port })
1810        }
1811        "ADDSLOTS" => {
1812            if args.len() < 2 {
1813                return Err(wrong_arity("CLUSTER ADDSLOTS"));
1814            }
1815            let slots = parse_slot_list(&args[1..])?;
1816            Ok(Command::ClusterAddSlots { slots })
1817        }
1818        "ADDSLOTSRANGE" => {
1819            // arguments are pairs: start1 end1 [start2 end2 ...]
1820            if args.len() < 3 || !(args.len() - 1).is_multiple_of(2) {
1821                return Err(wrong_arity("CLUSTER ADDSLOTSRANGE"));
1822            }
1823            let mut ranges = Vec::new();
1824            for pair in args[1..].chunks(2) {
1825                let s = parse_u64(&pair[0], "CLUSTER ADDSLOTSRANGE")?;
1826                let start = u16::try_from(s)
1827                    .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot".into()))?;
1828                let e = parse_u64(&pair[1], "CLUSTER ADDSLOTSRANGE")?;
1829                let end = u16::try_from(e)
1830                    .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot".into()))?;
1831                if start > end || end >= 16384 {
1832                    return Err(ProtocolError::InvalidCommandFrame(
1833                        "invalid slot range: start must be <= end and both must be 0-16383".into(),
1834                    ));
1835                }
1836                ranges.push((start, end));
1837            }
1838            Ok(Command::ClusterAddSlotsRange { ranges })
1839        }
1840        "DELSLOTS" => {
1841            if args.len() < 2 {
1842                return Err(wrong_arity("CLUSTER DELSLOTS"));
1843            }
1844            let slots = parse_slot_list(&args[1..])?;
1845            Ok(Command::ClusterDelSlots { slots })
1846        }
1847        "FORGET" => {
1848            if args.len() != 2 {
1849                return Err(wrong_arity("CLUSTER FORGET"));
1850            }
1851            let node_id = extract_string(&args[1])?;
1852            Ok(Command::ClusterForget { node_id })
1853        }
1854        "REPLICATE" => {
1855            if args.len() != 2 {
1856                return Err(wrong_arity("CLUSTER REPLICATE"));
1857            }
1858            let node_id = extract_string(&args[1])?;
1859            Ok(Command::ClusterReplicate { node_id })
1860        }
1861        "FAILOVER" => {
1862            let mut force = false;
1863            let mut takeover = false;
1864            for arg in &args[1..] {
1865                let mut kw2 = [0u8; MAX_KEYWORD_LEN];
1866                let opt = uppercase_arg(arg, &mut kw2)?;
1867                match opt {
1868                    "FORCE" => force = true,
1869                    "TAKEOVER" => takeover = true,
1870                    _ => {
1871                        return Err(ProtocolError::InvalidCommandFrame(format!(
1872                            "unknown CLUSTER FAILOVER option '{opt}'"
1873                        )))
1874                    }
1875                }
1876            }
1877            Ok(Command::ClusterFailover { force, takeover })
1878        }
1879        "COUNTKEYSINSLOT" => {
1880            if args.len() != 2 {
1881                return Err(wrong_arity("CLUSTER COUNTKEYSINSLOT"));
1882            }
1883            let n = parse_u64(&args[1], "CLUSTER COUNTKEYSINSLOT")?;
1884            let slot = u16::try_from(n)
1885                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
1886            Ok(Command::ClusterCountKeysInSlot { slot })
1887        }
1888        "GETKEYSINSLOT" => {
1889            if args.len() != 3 {
1890                return Err(wrong_arity("CLUSTER GETKEYSINSLOT"));
1891            }
1892            let n = parse_u64(&args[1], "CLUSTER GETKEYSINSLOT")?;
1893            let slot = u16::try_from(n)
1894                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
1895            let c = parse_u64(&args[2], "CLUSTER GETKEYSINSLOT")?;
1896            let count = u32::try_from(c)
1897                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid count".into()))?;
1898            Ok(Command::ClusterGetKeysInSlot { slot, count })
1899        }
1900        _ => Err(ProtocolError::InvalidCommandFrame(format!(
1901            "unknown CLUSTER subcommand '{subcommand}'"
1902        ))),
1903    }
1904}
1905
1906fn parse_asking(args: &[Frame]) -> Result<Command, ProtocolError> {
1907    if !args.is_empty() {
1908        return Err(wrong_arity("ASKING"));
1909    }
1910    Ok(Command::Asking)
1911}
1912
1913fn parse_watch(args: &[Frame]) -> Result<Command, ProtocolError> {
1914    if args.is_empty() {
1915        return Err(wrong_arity("WATCH"));
1916    }
1917    let keys = extract_strings(args)?;
1918    Ok(Command::Watch { keys })
1919}
1920
1921fn parse_no_args(
1922    name: &'static str,
1923    args: &[Frame],
1924    cmd: Command,
1925) -> Result<Command, ProtocolError> {
1926    if !args.is_empty() {
1927        return Err(wrong_arity(name));
1928    }
1929    Ok(cmd)
1930}
1931
1932fn parse_acl(args: &[Frame]) -> Result<Command, ProtocolError> {
1933    if args.is_empty() {
1934        return Err(wrong_arity("ACL"));
1935    }
1936
1937    let mut kw = [0u8; MAX_KEYWORD_LEN];
1938    let subcmd = uppercase_arg(&args[0], &mut kw)?;
1939    match subcmd {
1940        "WHOAMI" => {
1941            if args.len() != 1 {
1942                return Err(wrong_arity("ACL|WHOAMI"));
1943            }
1944            Ok(Command::AclWhoAmI)
1945        }
1946        "LIST" => {
1947            if args.len() != 1 {
1948                return Err(wrong_arity("ACL|LIST"));
1949            }
1950            Ok(Command::AclList)
1951        }
1952        "USERS" => {
1953            if args.len() != 1 {
1954                return Err(wrong_arity("ACL|USERS"));
1955            }
1956            Ok(Command::AclUsers)
1957        }
1958        "GETUSER" => {
1959            if args.len() != 2 {
1960                return Err(wrong_arity("ACL|GETUSER"));
1961            }
1962            let username = extract_string(&args[1])?;
1963            Ok(Command::AclGetUser { username })
1964        }
1965        "DELUSER" => {
1966            if args.len() < 2 {
1967                return Err(wrong_arity("ACL|DELUSER"));
1968            }
1969            let usernames = extract_strings(&args[1..])?;
1970            Ok(Command::AclDelUser { usernames })
1971        }
1972        "SETUSER" => {
1973            if args.len() < 2 {
1974                return Err(wrong_arity("ACL|SETUSER"));
1975            }
1976            let username = extract_string(&args[1])?;
1977            let rules = if args.len() > 2 {
1978                extract_strings(&args[2..])?
1979            } else {
1980                Vec::new()
1981            };
1982            Ok(Command::AclSetUser { username, rules })
1983        }
1984        "CAT" => {
1985            if args.len() > 2 {
1986                return Err(wrong_arity("ACL|CAT"));
1987            }
1988            let category = if args.len() == 2 {
1989                Some(extract_string(&args[1])?)
1990            } else {
1991                None
1992            };
1993            Ok(Command::AclCat { category })
1994        }
1995        other => Err(ProtocolError::InvalidCommandFrame(format!(
1996            "unknown ACL subcommand '{other}'"
1997        ))),
1998    }
1999}
2000
2001fn parse_config(args: &[Frame]) -> Result<Command, ProtocolError> {
2002    if args.is_empty() {
2003        return Err(wrong_arity("CONFIG"));
2004    }
2005
2006    let mut kw = [0u8; MAX_KEYWORD_LEN];
2007    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2008    match subcmd {
2009        "GET" => {
2010            if args.len() != 2 {
2011                return Err(wrong_arity("CONFIG|GET"));
2012            }
2013            let pattern = extract_string(&args[1])?;
2014            Ok(Command::ConfigGet { pattern })
2015        }
2016        "SET" => {
2017            if args.len() != 3 {
2018                return Err(wrong_arity("CONFIG|SET"));
2019            }
2020            let param = extract_string(&args[1])?;
2021            let value = extract_string(&args[2])?;
2022            Ok(Command::ConfigSet { param, value })
2023        }
2024        "REWRITE" => {
2025            if args.len() != 1 {
2026                return Err(wrong_arity("CONFIG|REWRITE"));
2027            }
2028            Ok(Command::ConfigRewrite)
2029        }
2030        other => Err(ProtocolError::InvalidCommandFrame(format!(
2031            "unknown CONFIG subcommand '{other}'"
2032        ))),
2033    }
2034}
2035
2036fn parse_slowlog(args: &[Frame]) -> Result<Command, ProtocolError> {
2037    if args.is_empty() {
2038        return Err(wrong_arity("SLOWLOG"));
2039    }
2040
2041    let mut kw = [0u8; MAX_KEYWORD_LEN];
2042    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2043    match subcmd {
2044        "GET" => {
2045            let count = if args.len() > 1 {
2046                Some(parse_u64(&args[1], "SLOWLOG")? as usize)
2047            } else {
2048                None
2049            };
2050            Ok(Command::SlowLogGet { count })
2051        }
2052        "LEN" => Ok(Command::SlowLogLen),
2053        "RESET" => Ok(Command::SlowLogReset),
2054        other => Err(ProtocolError::InvalidCommandFrame(format!(
2055            "unknown SLOWLOG subcommand '{other}'"
2056        ))),
2057    }
2058}
2059
2060fn parse_client(args: &[Frame]) -> Result<Command, ProtocolError> {
2061    if args.is_empty() {
2062        return Err(wrong_arity("CLIENT"));
2063    }
2064
2065    let mut kw = [0u8; MAX_KEYWORD_LEN];
2066    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2067    match subcmd {
2068        "ID" => Ok(Command::ClientId),
2069        "GETNAME" => Ok(Command::ClientGetName),
2070        "LIST" => Ok(Command::ClientList),
2071        "SETNAME" => {
2072            if args.len() < 2 {
2073                return Err(wrong_arity("CLIENT SETNAME"));
2074            }
2075            let name = extract_string(&args[1])?;
2076            Ok(Command::ClientSetName { name })
2077        }
2078        other => Err(ProtocolError::InvalidCommandFrame(format!(
2079            "unknown CLIENT subcommand '{other}'"
2080        ))),
2081    }
2082}
2083
2084fn parse_object(args: &[Frame]) -> Result<Command, ProtocolError> {
2085    if args.is_empty() {
2086        return Err(wrong_arity("OBJECT"));
2087    }
2088
2089    let mut kw = [0u8; MAX_KEYWORD_LEN];
2090    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2091    match subcmd {
2092        "ENCODING" => {
2093            if args.len() != 2 {
2094                return Err(wrong_arity("OBJECT|ENCODING"));
2095            }
2096            let key = extract_string(&args[1])?;
2097            Ok(Command::ObjectEncoding { key })
2098        }
2099        "REFCOUNT" => {
2100            if args.len() != 2 {
2101                return Err(wrong_arity("OBJECT|REFCOUNT"));
2102            }
2103            let key = extract_string(&args[1])?;
2104            Ok(Command::ObjectRefcount { key })
2105        }
2106        other => Err(ProtocolError::InvalidCommandFrame(format!(
2107            "unknown OBJECT subcommand '{other}'"
2108        ))),
2109    }
2110}
2111
2112fn parse_copy(args: &[Frame]) -> Result<Command, ProtocolError> {
2113    if args.len() < 2 {
2114        return Err(wrong_arity("COPY"));
2115    }
2116    let source = extract_string(&args[0])?;
2117    let destination = extract_string(&args[1])?;
2118
2119    let mut replace = false;
2120    let mut i = 2;
2121    while i < args.len() {
2122        let mut kw = [0u8; MAX_KEYWORD_LEN];
2123        let arg = uppercase_arg(&args[i], &mut kw)?;
2124        match arg {
2125            "REPLACE" => replace = true,
2126            "DB" => {
2127                // consume and ignore the DB argument (single-db server)
2128                i += 1;
2129                if i >= args.len() {
2130                    return Err(wrong_arity("COPY"));
2131                }
2132            }
2133            _ => {
2134                return Err(ProtocolError::InvalidCommandFrame(format!(
2135                    "unsupported COPY option '{arg}'"
2136                )));
2137            }
2138        }
2139        i += 1;
2140    }
2141
2142    Ok(Command::Copy {
2143        source,
2144        destination,
2145        replace,
2146    })
2147}
2148
2149fn parse_slot_list(args: &[Frame]) -> Result<Vec<u16>, ProtocolError> {
2150    let mut slots = Vec::with_capacity(args.len());
2151    for arg in args {
2152        let n = parse_u64(arg, "CLUSTER")?;
2153        let slot = u16::try_from(n)
2154            .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
2155        if slot >= 16384 {
2156            return Err(ProtocolError::InvalidCommandFrame(format!(
2157                "invalid slot {slot}: must be 0-16383"
2158            )));
2159        }
2160        slots.push(slot);
2161    }
2162    Ok(slots)
2163}
2164
2165fn parse_cluster_setslot(args: &[Frame]) -> Result<Command, ProtocolError> {
2166    if args.is_empty() {
2167        return Err(wrong_arity("CLUSTER SETSLOT"));
2168    }
2169
2170    let n = parse_u64(&args[0], "CLUSTER SETSLOT")?;
2171    let slot = u16::try_from(n)
2172        .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
2173    if slot >= 16384 {
2174        return Err(ProtocolError::InvalidCommandFrame(format!(
2175            "invalid slot {slot}: must be 0-16383"
2176        )));
2177    }
2178
2179    if args.len() < 2 {
2180        return Err(wrong_arity("CLUSTER SETSLOT"));
2181    }
2182
2183    let mut kw = [0u8; MAX_KEYWORD_LEN];
2184    let action = uppercase_arg(&args[1], &mut kw)?;
2185    match action {
2186        "IMPORTING" => {
2187            if args.len() != 3 {
2188                return Err(ProtocolError::WrongArity(
2189                    "CLUSTER SETSLOT IMPORTING".into(),
2190                ));
2191            }
2192            let node_id = extract_string(&args[2])?;
2193            Ok(Command::ClusterSetSlotImporting { slot, node_id })
2194        }
2195        "MIGRATING" => {
2196            if args.len() != 3 {
2197                return Err(ProtocolError::WrongArity(
2198                    "CLUSTER SETSLOT MIGRATING".into(),
2199                ));
2200            }
2201            let node_id = extract_string(&args[2])?;
2202            Ok(Command::ClusterSetSlotMigrating { slot, node_id })
2203        }
2204        "NODE" => {
2205            if args.len() != 3 {
2206                return Err(wrong_arity("CLUSTER SETSLOT NODE"));
2207            }
2208            let node_id = extract_string(&args[2])?;
2209            Ok(Command::ClusterSetSlotNode { slot, node_id })
2210        }
2211        "STABLE" => {
2212            if args.len() != 2 {
2213                return Err(wrong_arity("CLUSTER SETSLOT STABLE"));
2214            }
2215            Ok(Command::ClusterSetSlotStable { slot })
2216        }
2217        _ => Err(ProtocolError::InvalidCommandFrame(format!(
2218            "unknown CLUSTER SETSLOT action '{action}'"
2219        ))),
2220    }
2221}
2222
2223fn parse_migrate(args: &[Frame]) -> Result<Command, ProtocolError> {
2224    // MIGRATE host port key db timeout [COPY] [REPLACE]
2225    if args.len() < 5 {
2226        return Err(wrong_arity("MIGRATE"));
2227    }
2228
2229    let host = extract_string(&args[0])?;
2230    let p = parse_u64(&args[1], "MIGRATE")?;
2231    let port = u16::try_from(p)
2232        .map_err(|_| ProtocolError::InvalidCommandFrame("invalid port number".into()))?;
2233    let key = extract_string(&args[2])?;
2234    let d = parse_u64(&args[3], "MIGRATE")?;
2235    let db = u32::try_from(d)
2236        .map_err(|_| ProtocolError::InvalidCommandFrame("invalid db number".into()))?;
2237    let timeout_ms = parse_u64(&args[4], "MIGRATE")?;
2238
2239    let mut copy = false;
2240    let mut replace = false;
2241
2242    for arg in &args[5..] {
2243        let mut kw = [0u8; MAX_KEYWORD_LEN];
2244        let opt = uppercase_arg(arg, &mut kw)?;
2245        match opt {
2246            "COPY" => copy = true,
2247            "REPLACE" => replace = true,
2248            _ => {
2249                return Err(ProtocolError::InvalidCommandFrame(format!(
2250                    "unknown MIGRATE option '{opt}'"
2251                )))
2252            }
2253        }
2254    }
2255
2256    Ok(Command::Migrate {
2257        host,
2258        port,
2259        key,
2260        db,
2261        timeout_ms,
2262        copy,
2263        replace,
2264    })
2265}
2266
2267fn parse_restore(args: &[Frame]) -> Result<Command, ProtocolError> {
2268    // RESTORE key ttl serialized-value [REPLACE]
2269    if args.len() < 3 {
2270        return Err(wrong_arity("RESTORE"));
2271    }
2272
2273    let key = extract_string(&args[0])?;
2274    let ttl_ms = parse_u64(&args[1], "RESTORE")?;
2275    let data = extract_bytes(&args[2])?;
2276
2277    let mut replace = false;
2278    for arg in &args[3..] {
2279        let mut kw = [0u8; MAX_KEYWORD_LEN];
2280        let opt = uppercase_arg(arg, &mut kw)?;
2281        if opt == "REPLACE" {
2282            replace = true;
2283        } else {
2284            return Err(ProtocolError::InvalidCommandFrame(format!(
2285                "unknown RESTORE option '{opt}'"
2286            )));
2287        }
2288    }
2289
2290    Ok(Command::Restore {
2291        key,
2292        ttl_ms,
2293        data,
2294        replace,
2295    })
2296}
2297
2298fn parse_subscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2299    if args.is_empty() {
2300        return Err(wrong_arity("SUBSCRIBE"));
2301    }
2302    let channels: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2303    Ok(Command::Subscribe { channels })
2304}
2305
2306fn parse_unsubscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2307    let channels: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2308    Ok(Command::Unsubscribe { channels })
2309}
2310
2311fn parse_psubscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2312    if args.is_empty() {
2313        return Err(wrong_arity("PSUBSCRIBE"));
2314    }
2315    let patterns: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2316    Ok(Command::PSubscribe { patterns })
2317}
2318
2319fn parse_punsubscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2320    let patterns: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2321    Ok(Command::PUnsubscribe { patterns })
2322}
2323
2324fn parse_publish(args: &[Frame]) -> Result<Command, ProtocolError> {
2325    if args.len() != 2 {
2326        return Err(wrong_arity("PUBLISH"));
2327    }
2328    let channel = extract_string(&args[0])?;
2329    let message = extract_bytes(&args[1])?;
2330    Ok(Command::Publish { channel, message })
2331}
2332
2333fn parse_pubsub(args: &[Frame]) -> Result<Command, ProtocolError> {
2334    if args.is_empty() {
2335        return Err(wrong_arity("PUBSUB"));
2336    }
2337
2338    let mut kw = [0u8; MAX_KEYWORD_LEN];
2339    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2340    match subcmd {
2341        "CHANNELS" => {
2342            let pattern = if args.len() > 1 {
2343                Some(extract_string(&args[1])?)
2344            } else {
2345                None
2346            };
2347            Ok(Command::PubSubChannels { pattern })
2348        }
2349        "NUMSUB" => {
2350            let channels: Vec<String> = args[1..]
2351                .iter()
2352                .map(extract_string)
2353                .collect::<Result<_, _>>()?;
2354            Ok(Command::PubSubNumSub { channels })
2355        }
2356        "NUMPAT" => Ok(Command::PubSubNumPat),
2357        other => Err(ProtocolError::InvalidCommandFrame(format!(
2358            "unknown PUBSUB subcommand '{other}'"
2359        ))),
2360    }
2361}
2362
2363// --- vector command parsers ---
2364
2365/// Parses METRIC / QUANT / M / EF flags from a slice of command arguments.
2366///
2367/// Returns `(metric, quantization, connectivity, expansion_add)`.
2368/// `cmd` is used in error messages (e.g., "VADD" or "VADD_BATCH").
2369fn parse_vector_flags(
2370    args: &[Frame],
2371    cmd: &'static str,
2372) -> Result<(u8, u8, u32, u32), ProtocolError> {
2373    let mut metric: u8 = 0; // cosine default
2374    let mut quantization: u8 = 0; // f32 default
2375    let mut connectivity: u32 = 16;
2376    let mut expansion_add: u32 = 64;
2377    let mut idx = 0;
2378
2379    while idx < args.len() {
2380        let mut kw = [0u8; MAX_KEYWORD_LEN];
2381        let flag = uppercase_arg(&args[idx], &mut kw)?;
2382        match flag {
2383            "METRIC" => {
2384                idx += 1;
2385                if idx >= args.len() {
2386                    return Err(ProtocolError::InvalidCommandFrame(format!(
2387                        "{cmd}: METRIC requires a value"
2388                    )));
2389                }
2390                let mut kw2 = [0u8; MAX_KEYWORD_LEN];
2391                let val = uppercase_arg(&args[idx], &mut kw2)?;
2392                metric = match val {
2393                    "COSINE" => 0,
2394                    "L2" => 1,
2395                    "IP" => 2,
2396                    _ => {
2397                        return Err(ProtocolError::InvalidCommandFrame(format!(
2398                            "{cmd}: unknown metric '{val}'"
2399                        )))
2400                    }
2401                };
2402                idx += 1;
2403            }
2404            "QUANT" => {
2405                idx += 1;
2406                if idx >= args.len() {
2407                    return Err(ProtocolError::InvalidCommandFrame(format!(
2408                        "{cmd}: QUANT requires a value"
2409                    )));
2410                }
2411                let mut kw2 = [0u8; MAX_KEYWORD_LEN];
2412                let val = uppercase_arg(&args[idx], &mut kw2)?;
2413                quantization = match val {
2414                    "F32" => 0,
2415                    "F16" => 1,
2416                    "I8" | "Q8" => 2,
2417                    _ => {
2418                        return Err(ProtocolError::InvalidCommandFrame(format!(
2419                            "{cmd}: unknown quantization '{val}'"
2420                        )))
2421                    }
2422                };
2423                idx += 1;
2424            }
2425            "M" => {
2426                idx += 1;
2427                if idx >= args.len() {
2428                    return Err(ProtocolError::InvalidCommandFrame(format!(
2429                        "{cmd}: M requires a value"
2430                    )));
2431                }
2432                let m = parse_u64(&args[idx], cmd)?;
2433                if m > MAX_HNSW_PARAM {
2434                    return Err(ProtocolError::InvalidCommandFrame(format!(
2435                        "{cmd}: M value {m} exceeds max {MAX_HNSW_PARAM}"
2436                    )));
2437                }
2438                connectivity = m as u32;
2439                idx += 1;
2440            }
2441            "EF" => {
2442                idx += 1;
2443                if idx >= args.len() {
2444                    return Err(ProtocolError::InvalidCommandFrame(format!(
2445                        "{cmd}: EF requires a value"
2446                    )));
2447                }
2448                let ef = parse_u64(&args[idx], cmd)?;
2449                if ef > MAX_HNSW_PARAM {
2450                    return Err(ProtocolError::InvalidCommandFrame(format!(
2451                        "{cmd}: EF value {ef} exceeds max {MAX_HNSW_PARAM}"
2452                    )));
2453                }
2454                expansion_add = ef as u32;
2455                idx += 1;
2456            }
2457            _ => {
2458                return Err(ProtocolError::InvalidCommandFrame(format!(
2459                    "{cmd}: unexpected argument '{flag}'"
2460                )));
2461            }
2462        }
2463    }
2464
2465    Ok((metric, quantization, connectivity, expansion_add))
2466}
2467
2468/// VADD key element f32 [f32 ...] [METRIC COSINE|L2|IP] [QUANT F32|F16|I8] [M n] [EF n]
2469fn parse_vadd(args: &[Frame]) -> Result<Command, ProtocolError> {
2470    // minimum: key + element + at least one float
2471    if args.len() < 3 {
2472        return Err(wrong_arity("VADD"));
2473    }
2474
2475    let key = extract_string(&args[0])?;
2476    let element = extract_string(&args[1])?;
2477
2478    // parse vector values until we hit a non-numeric argument, end, or dim limit
2479    let mut idx = 2;
2480    let mut vector = Vec::new();
2481    while idx < args.len() {
2482        if vector.len() >= MAX_VECTOR_DIMS {
2483            return Err(ProtocolError::InvalidCommandFrame(format!(
2484                "VADD: vector exceeds {MAX_VECTOR_DIMS} dimensions"
2485            )));
2486        }
2487        let s = extract_string(&args[idx])?;
2488        if let Ok(v) = s.parse::<f32>() {
2489            if v.is_nan() || v.is_infinite() {
2490                return Err(ProtocolError::InvalidCommandFrame(
2491                    "VADD: vector components must be finite (no NaN/infinity)".into(),
2492                ));
2493            }
2494            vector.push(v);
2495            idx += 1;
2496        } else {
2497            break;
2498        }
2499    }
2500
2501    if vector.is_empty() {
2502        return Err(ProtocolError::InvalidCommandFrame(
2503            "VADD: at least one vector dimension required".into(),
2504        ));
2505    }
2506
2507    // parse optional flags
2508    let (metric, quantization, connectivity, expansion_add) =
2509        parse_vector_flags(&args[idx..], "VADD")?;
2510
2511    Ok(Command::VAdd {
2512        key,
2513        element,
2514        vector,
2515        metric,
2516        quantization,
2517        connectivity,
2518        expansion_add,
2519    })
2520}
2521
2522/// VADD_BATCH key DIM n [BINARY] element1 f32...|<blob> element2 f32...|<blob>
2523/// [METRIC COSINE|L2|IP] [QUANT F32|F16|I8] [M n] [EF n]
2524///
2525/// When BINARY is specified, each vector is a single bulk string of `dim * 4`
2526/// raw little-endian f32 bytes instead of `dim` separate text arguments.
2527/// This eliminates string serialization overhead on both client and server.
2528fn parse_vadd_batch(args: &[Frame]) -> Result<Command, ProtocolError> {
2529    // minimum: key + DIM + n (even an empty batch needs the DIM declaration)
2530    if args.len() < 3 {
2531        return Err(wrong_arity("VADD_BATCH"));
2532    }
2533
2534    let key = extract_string(&args[0])?;
2535
2536    // require DIM keyword
2537    let mut kw = [0u8; MAX_KEYWORD_LEN];
2538    let dim_kw = uppercase_arg(&args[1], &mut kw)?;
2539    if dim_kw != "DIM" {
2540        return Err(ProtocolError::InvalidCommandFrame(
2541            "VADD_BATCH: expected DIM keyword".into(),
2542        ));
2543    }
2544
2545    let dim = parse_u64(&args[2], "VADD_BATCH")? as usize;
2546    if dim == 0 {
2547        return Err(ProtocolError::InvalidCommandFrame(
2548            "VADD_BATCH: DIM must be at least 1".into(),
2549        ));
2550    }
2551    if dim > MAX_VECTOR_DIMS {
2552        return Err(ProtocolError::InvalidCommandFrame(format!(
2553            "VADD_BATCH: DIM {dim} exceeds max {MAX_VECTOR_DIMS}"
2554        )));
2555    }
2556
2557    // check for optional BINARY flag after DIM
2558    let mut idx = 3;
2559    let binary_mode = if idx < args.len() {
2560        let mut kw2 = [0u8; MAX_KEYWORD_LEN];
2561        matches!(uppercase_arg(&args[idx], &mut kw2), Ok("BINARY"))
2562    } else {
2563        false
2564    };
2565    if binary_mode {
2566        idx += 1;
2567    }
2568
2569    let mut entries: Vec<(String, Vec<f32>)> = Vec::new();
2570
2571    if binary_mode {
2572        // binary mode: each entry is element_name + single blob of dim*4 bytes
2573        let blob_len = dim * 4;
2574        let entry_len = 2; // element name + blob
2575
2576        while idx < args.len() {
2577            if idx + entry_len > args.len() {
2578                break;
2579            }
2580
2581            // peek: if the second arg isn't exactly blob_len bytes, we've
2582            // hit the flags section (flags are short text strings)
2583            let blob_bytes = extract_bytes(&args[idx + 1])?;
2584            if blob_bytes.len() != blob_len {
2585                break;
2586            }
2587
2588            let element = extract_string(&args[idx])?;
2589            idx += 1;
2590
2591            // skip extract_bytes again — reuse what we already have
2592            idx += 1;
2593
2594            // reinterpret raw LE bytes as f32 slice
2595            let vector = bytes_to_f32_vec(&blob_bytes, dim)?;
2596
2597            entries.push((element, vector));
2598
2599            if entries.len() >= MAX_VADD_BATCH_SIZE {
2600                return Err(ProtocolError::InvalidCommandFrame(format!(
2601                    "VADD_BATCH: batch size exceeds max {MAX_VADD_BATCH_SIZE}"
2602                )));
2603            }
2604        }
2605    } else {
2606        // text mode: each entry is element_name followed by exactly `dim` floats.
2607        // we detect the end of entries by checking whether enough args remain
2608        // for a full entry (1 name + dim floats). this avoids misinterpreting
2609        // element names like "metric" as flags.
2610        let entry_len = 1 + dim; // element name + dim floats
2611
2612        while idx < args.len() {
2613            // not enough remaining args for a full entry — must be flags
2614            if idx + entry_len > args.len() {
2615                break;
2616            }
2617
2618            // peek: if the token after the element name isn't a valid float,
2619            // we've reached the flags section
2620            if dim > 0 {
2621                let peek = extract_string(&args[idx + 1])?;
2622                if peek.parse::<f32>().is_err() {
2623                    break;
2624                }
2625            }
2626
2627            let element = extract_string(&args[idx])?;
2628            idx += 1;
2629
2630            let mut vector = Vec::with_capacity(dim);
2631            for _ in 0..dim {
2632                let s = extract_string(&args[idx])?;
2633                let v = s.parse::<f32>().map_err(|_| {
2634                    ProtocolError::InvalidCommandFrame(format!(
2635                        "VADD_BATCH: expected float, got '{s}'"
2636                    ))
2637                })?;
2638                if v.is_nan() || v.is_infinite() {
2639                    return Err(ProtocolError::InvalidCommandFrame(
2640                        "VADD_BATCH: vector components must be finite (no NaN/infinity)".into(),
2641                    ));
2642                }
2643                vector.push(v);
2644                idx += 1;
2645            }
2646
2647            entries.push((element, vector));
2648
2649            if entries.len() >= MAX_VADD_BATCH_SIZE {
2650                return Err(ProtocolError::InvalidCommandFrame(format!(
2651                    "VADD_BATCH: batch size exceeds max {MAX_VADD_BATCH_SIZE}"
2652                )));
2653            }
2654        }
2655    }
2656
2657    // parse optional flags (same logic as parse_vadd)
2658    let (metric, quantization, connectivity, expansion_add) =
2659        parse_vector_flags(&args[idx..], "VADD_BATCH")?;
2660
2661    Ok(Command::VAddBatch {
2662        key,
2663        entries,
2664        dim,
2665        metric,
2666        quantization,
2667        connectivity,
2668        expansion_add,
2669    })
2670}
2671
2672/// Converts a raw byte buffer of little-endian f32s into a Vec<f32>.
2673///
2674/// Validates that all values are finite (no NaN/infinity). The buffer
2675/// must be exactly `dim * 4` bytes.
2676fn bytes_to_f32_vec(data: &[u8], dim: usize) -> Result<Vec<f32>, ProtocolError> {
2677    // compile-time endianness check — binary protocol assumes little-endian
2678    #[cfg(not(target_endian = "little"))]
2679    compile_error!("VADD_BATCH BINARY mode requires a little-endian target");
2680
2681    debug_assert_eq!(data.len(), dim * 4);
2682
2683    let mut vector = Vec::with_capacity(dim);
2684    for chunk in data.chunks_exact(4) {
2685        let v = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
2686        if !v.is_finite() {
2687            return Err(ProtocolError::InvalidCommandFrame(
2688                "VADD_BATCH BINARY: vector contains non-finite value (NaN/infinity)".into(),
2689            ));
2690        }
2691        vector.push(v);
2692    }
2693    Ok(vector)
2694}
2695
2696/// VSIM key f32 [f32 ...] COUNT k [EF n] [WITHSCORES]
2697fn parse_vsim(args: &[Frame]) -> Result<Command, ProtocolError> {
2698    // minimum: key + at least one float + COUNT + k
2699    if args.len() < 4 {
2700        return Err(wrong_arity("VSIM"));
2701    }
2702
2703    let key = extract_string(&args[0])?;
2704
2705    // parse query vector until we hit a non-numeric argument, end, or dim limit
2706    let mut idx = 1;
2707    let mut query = Vec::new();
2708    while idx < args.len() {
2709        if query.len() >= MAX_VECTOR_DIMS {
2710            return Err(ProtocolError::InvalidCommandFrame(format!(
2711                "VSIM: query exceeds {MAX_VECTOR_DIMS} dimensions"
2712            )));
2713        }
2714        let s = extract_string(&args[idx])?;
2715        if let Ok(v) = s.parse::<f32>() {
2716            if v.is_nan() || v.is_infinite() {
2717                return Err(ProtocolError::InvalidCommandFrame(
2718                    "VSIM: query components must be finite (no NaN/infinity)".into(),
2719                ));
2720            }
2721            query.push(v);
2722            idx += 1;
2723        } else {
2724            break;
2725        }
2726    }
2727
2728    if query.is_empty() {
2729        return Err(ProtocolError::InvalidCommandFrame(
2730            "VSIM: at least one query dimension required".into(),
2731        ));
2732    }
2733
2734    // COUNT k is required
2735    let mut count: Option<usize> = None;
2736    let mut ef_search: usize = 0;
2737    let mut with_scores = false;
2738
2739    while idx < args.len() {
2740        let mut kw = [0u8; MAX_KEYWORD_LEN];
2741        let flag = uppercase_arg(&args[idx], &mut kw)?;
2742        match flag {
2743            "COUNT" => {
2744                idx += 1;
2745                if idx >= args.len() {
2746                    return Err(ProtocolError::InvalidCommandFrame(
2747                        "VSIM: COUNT requires a value".into(),
2748                    ));
2749                }
2750                let c = parse_u64(&args[idx], "VSIM")?;
2751                if c > MAX_VSIM_COUNT {
2752                    return Err(ProtocolError::InvalidCommandFrame(format!(
2753                        "VSIM: COUNT {c} exceeds max {MAX_VSIM_COUNT}"
2754                    )));
2755                }
2756                count = Some(c as usize);
2757                idx += 1;
2758            }
2759            "EF" => {
2760                idx += 1;
2761                if idx >= args.len() {
2762                    return Err(ProtocolError::InvalidCommandFrame(
2763                        "VSIM: EF requires a value".into(),
2764                    ));
2765                }
2766                let ef = parse_u64(&args[idx], "VSIM")?;
2767                if ef > MAX_VSIM_EF {
2768                    return Err(ProtocolError::InvalidCommandFrame(format!(
2769                        "VSIM: EF {ef} exceeds max {MAX_VSIM_EF}"
2770                    )));
2771                }
2772                ef_search = ef as usize;
2773                idx += 1;
2774            }
2775            "WITHSCORES" => {
2776                with_scores = true;
2777                idx += 1;
2778            }
2779            _ => {
2780                return Err(ProtocolError::InvalidCommandFrame(format!(
2781                    "VSIM: unexpected argument '{flag}'"
2782                )));
2783            }
2784        }
2785    }
2786
2787    let count = count
2788        .ok_or_else(|| ProtocolError::InvalidCommandFrame("VSIM: COUNT is required".into()))?;
2789
2790    Ok(Command::VSim {
2791        key,
2792        query,
2793        count,
2794        ef_search,
2795        with_scores,
2796    })
2797}
2798
2799/// VREM key element
2800fn parse_vrem(args: &[Frame]) -> Result<Command, ProtocolError> {
2801    if args.len() != 2 {
2802        return Err(wrong_arity("VREM"));
2803    }
2804    let key = extract_string(&args[0])?;
2805    let element = extract_string(&args[1])?;
2806    Ok(Command::VRem { key, element })
2807}
2808
2809/// VGET key element
2810fn parse_vget(args: &[Frame]) -> Result<Command, ProtocolError> {
2811    if args.len() != 2 {
2812        return Err(wrong_arity("VGET"));
2813    }
2814    let key = extract_string(&args[0])?;
2815    let element = extract_string(&args[1])?;
2816    Ok(Command::VGet { key, element })
2817}
2818
2819/// VCARD key
2820fn parse_vcard(args: &[Frame]) -> Result<Command, ProtocolError> {
2821    if args.len() != 1 {
2822        return Err(wrong_arity("VCARD"));
2823    }
2824    let key = extract_string(&args[0])?;
2825    Ok(Command::VCard { key })
2826}
2827
2828/// VDIM key
2829fn parse_vdim(args: &[Frame]) -> Result<Command, ProtocolError> {
2830    if args.len() != 1 {
2831        return Err(wrong_arity("VDIM"));
2832    }
2833    let key = extract_string(&args[0])?;
2834    Ok(Command::VDim { key })
2835}
2836
2837/// VINFO key
2838fn parse_vinfo(args: &[Frame]) -> Result<Command, ProtocolError> {
2839    if args.len() != 1 {
2840        return Err(wrong_arity("VINFO"));
2841    }
2842    let key = extract_string(&args[0])?;
2843    Ok(Command::VInfo { key })
2844}
2845
2846// --- proto command parsers ---
2847
2848fn parse_proto_register(args: &[Frame]) -> Result<Command, ProtocolError> {
2849    if args.len() != 2 {
2850        return Err(wrong_arity("PROTO.REGISTER"));
2851    }
2852    let name = extract_string(&args[0])?;
2853    let descriptor = extract_bytes(&args[1])?;
2854    Ok(Command::ProtoRegister { name, descriptor })
2855}
2856
2857fn parse_proto_set(args: &[Frame]) -> Result<Command, ProtocolError> {
2858    if args.len() < 3 {
2859        return Err(wrong_arity("PROTO.SET"));
2860    }
2861
2862    let key = extract_string(&args[0])?;
2863    let type_name = extract_string(&args[1])?;
2864    let data = extract_bytes(&args[2])?;
2865    let (expire, nx, xx) = parse_set_options(&args[3..], "PROTO.SET")?;
2866
2867    Ok(Command::ProtoSet {
2868        key,
2869        type_name,
2870        data,
2871        expire,
2872        nx,
2873        xx,
2874    })
2875}
2876
2877fn parse_proto_get(args: &[Frame]) -> Result<Command, ProtocolError> {
2878    if args.len() != 1 {
2879        return Err(wrong_arity("PROTO.GET"));
2880    }
2881    let key = extract_string(&args[0])?;
2882    Ok(Command::ProtoGet { key })
2883}
2884
2885fn parse_proto_type(args: &[Frame]) -> Result<Command, ProtocolError> {
2886    if args.len() != 1 {
2887        return Err(wrong_arity("PROTO.TYPE"));
2888    }
2889    let key = extract_string(&args[0])?;
2890    Ok(Command::ProtoType { key })
2891}
2892
2893fn parse_proto_schemas(args: &[Frame]) -> Result<Command, ProtocolError> {
2894    if !args.is_empty() {
2895        return Err(wrong_arity("PROTO.SCHEMAS"));
2896    }
2897    Ok(Command::ProtoSchemas)
2898}
2899
2900fn parse_proto_describe(args: &[Frame]) -> Result<Command, ProtocolError> {
2901    if args.len() != 1 {
2902        return Err(wrong_arity("PROTO.DESCRIBE"));
2903    }
2904    let name = extract_string(&args[0])?;
2905    Ok(Command::ProtoDescribe { name })
2906}
2907
2908fn parse_proto_getfield(args: &[Frame]) -> Result<Command, ProtocolError> {
2909    if args.len() != 2 {
2910        return Err(wrong_arity("PROTO.GETFIELD"));
2911    }
2912    let key = extract_string(&args[0])?;
2913    let field_path = extract_string(&args[1])?;
2914    Ok(Command::ProtoGetField { key, field_path })
2915}
2916
2917fn parse_proto_setfield(args: &[Frame]) -> Result<Command, ProtocolError> {
2918    if args.len() != 3 {
2919        return Err(wrong_arity("PROTO.SETFIELD"));
2920    }
2921    let key = extract_string(&args[0])?;
2922    let field_path = extract_string(&args[1])?;
2923    let value = extract_string(&args[2])?;
2924    Ok(Command::ProtoSetField {
2925        key,
2926        field_path,
2927        value,
2928    })
2929}
2930
2931fn parse_proto_delfield(args: &[Frame]) -> Result<Command, ProtocolError> {
2932    if args.len() != 2 {
2933        return Err(wrong_arity("PROTO.DELFIELD"));
2934    }
2935    let key = extract_string(&args[0])?;
2936    let field_path = extract_string(&args[1])?;
2937    Ok(Command::ProtoDelField { key, field_path })
2938}
2939
2940fn parse_auth(args: &[Frame]) -> Result<Command, ProtocolError> {
2941    match args.len() {
2942        1 => {
2943            let password = extract_string(&args[0])?;
2944            Ok(Command::Auth {
2945                username: None,
2946                password,
2947            })
2948        }
2949        2 => {
2950            let username = extract_string(&args[0])?;
2951            let password = extract_string(&args[1])?;
2952            Ok(Command::Auth {
2953                username: Some(username),
2954                password,
2955            })
2956        }
2957        _ => Err(wrong_arity("AUTH")),
2958    }
2959}
2960
2961fn parse_quit(args: &[Frame]) -> Result<Command, ProtocolError> {
2962    if !args.is_empty() {
2963        return Err(wrong_arity("QUIT"));
2964    }
2965    Ok(Command::Quit)
2966}
2967
2968fn parse_monitor(args: &[Frame]) -> Result<Command, ProtocolError> {
2969    if !args.is_empty() {
2970        return Err(wrong_arity("MONITOR"));
2971    }
2972    Ok(Command::Monitor)
2973}
2974
2975fn parse_touch(args: &[Frame]) -> Result<Command, ProtocolError> {
2976    if args.is_empty() {
2977        return Err(wrong_arity("TOUCH"));
2978    }
2979    let keys = extract_strings(args)?;
2980    Ok(Command::Touch { keys })
2981}
2982
2983fn parse_sort(args: &[Frame]) -> Result<Command, ProtocolError> {
2984    if args.is_empty() {
2985        return Err(wrong_arity("SORT"));
2986    }
2987    let key = extract_string(&args[0])?;
2988    let mut desc = false;
2989    let mut alpha = false;
2990    let mut limit = None;
2991    let mut store = None;
2992    let mut i = 1;
2993    while i < args.len() {
2994        let flag = extract_string(&args[i])?.to_uppercase();
2995        match flag.as_str() {
2996            "ASC" => {
2997                desc = false;
2998                i += 1;
2999            }
3000            "DESC" => {
3001                desc = true;
3002                i += 1;
3003            }
3004            "ALPHA" => {
3005                alpha = true;
3006                i += 1;
3007            }
3008            "LIMIT" => {
3009                if i + 2 >= args.len() {
3010                    return Err(ProtocolError::InvalidCommandFrame(
3011                        "SORT LIMIT requires offset and count".into(),
3012                    ));
3013                }
3014                let offset = extract_string(&args[i + 1])?.parse::<i64>().map_err(|_| {
3015                    ProtocolError::InvalidCommandFrame(
3016                        "SORT LIMIT offset is not a valid integer".into(),
3017                    )
3018                })?;
3019                let count = extract_string(&args[i + 2])?.parse::<i64>().map_err(|_| {
3020                    ProtocolError::InvalidCommandFrame(
3021                        "SORT LIMIT count is not a valid integer".into(),
3022                    )
3023                })?;
3024                limit = Some((offset, count));
3025                i += 3;
3026            }
3027            "STORE" => {
3028                if i + 1 >= args.len() {
3029                    return Err(wrong_arity("SORT"));
3030                }
3031                store = Some(extract_string(&args[i + 1])?);
3032                i += 2;
3033            }
3034            _ => {
3035                return Err(ProtocolError::InvalidCommandFrame(format!(
3036                    "SORT: unsupported flag '{flag}'"
3037                )));
3038            }
3039        }
3040    }
3041    Ok(Command::Sort {
3042        key,
3043        desc,
3044        alpha,
3045        limit,
3046        store,
3047    })
3048}