Skip to main content

ember_protocol/command/
parse.rs

1//! Command parsing from RESP3 frames.
2//!
3//! Contains `Command::from_frame()` and all individual command parsers.
4//! Helper functions for frame extraction and numeric parsing live here too.
5
6use bytes::Bytes;
7
8use crate::error::ProtocolError;
9use crate::types::Frame;
10
11use super::{BitOpKind, BitRange, BitRangeUnit, Command, ScoreBound, SetExpire, ZAddFlags};
12
13/// Maximum number of dimensions in a vector. 65,536 is generous for any
14/// real-world embedding model (OpenAI: 1536, Cohere: 4096) while preventing
15/// memory abuse from absurdly large vectors.
16const MAX_VECTOR_DIMS: usize = 65_536;
17
18/// Maximum value for HNSW connectivity (M) and expansion parameters.
19/// Values above 1024 give no practical benefit and waste memory.
20const MAX_HNSW_PARAM: u64 = 1024;
21
22/// Maximum number of results for VSIM. 10,000 is generous for any practical
23/// similarity search while preventing OOM from unbounded result allocation.
24const MAX_VSIM_COUNT: u64 = 10_000;
25
26/// Maximum search beam width for VSIM. Same cap as MAX_HNSW_PARAM —
27/// larger values cause worst-case O(n) graph traversal with no accuracy gain.
28const MAX_VSIM_EF: u64 = MAX_HNSW_PARAM;
29
30/// Maximum number of vectors in a single VADD_BATCH command. 10,000 keeps
31/// per-command latency bounded while still being large enough to amortize
32/// round-trip overhead for bulk inserts.
33const MAX_VADD_BATCH_SIZE: usize = 10_000;
34
35/// Maximum value for SCAN COUNT. Prevents clients from requesting a scan
36/// hint so large it causes pre-allocation issues.
37const MAX_SCAN_COUNT: u64 = 10_000_000;
38
39impl Command {
40    /// Parses a [`Frame`] into a [`Command`].
41    ///
42    /// Expects an array frame where the first element is the command name
43    /// (as a bulk or simple string) and the rest are arguments.
44    pub fn from_frame(frame: Frame) -> Result<Command, ProtocolError> {
45        let frames = match frame {
46            Frame::Array(frames) => frames,
47            _ => {
48                return Err(ProtocolError::InvalidCommandFrame(
49                    "expected array frame".into(),
50                ));
51            }
52        };
53
54        if frames.is_empty() {
55            return Err(ProtocolError::InvalidCommandFrame(
56                "empty command array".into(),
57            ));
58        }
59
60        // command names are short ASCII keywords — uppercase on the stack to
61        // avoid two heap allocations (extract_string + to_ascii_uppercase).
62        let name_bytes = extract_raw_bytes(&frames[0])?;
63        let mut upper = [0u8; MAX_KEYWORD_LEN];
64        let len = name_bytes.len();
65        if len > MAX_KEYWORD_LEN {
66            let name = extract_string(&frames[0])?;
67            return Ok(Command::Unknown(name));
68        }
69        upper[..len].copy_from_slice(name_bytes);
70        upper[..len].make_ascii_uppercase();
71        let name_upper = std::str::from_utf8(&upper[..len]).map_err(|_| {
72            ProtocolError::InvalidCommandFrame("command name is not valid utf-8".into())
73        })?;
74
75        match name_upper {
76            "PING" => parse_ping(&frames[1..]),
77            "ECHO" => parse_echo(&frames[1..]),
78            "GET" => parse_get(&frames[1..]),
79            "SET" => parse_set(&frames[1..]),
80            "INCR" => parse_incr(&frames[1..]),
81            "DECR" => parse_decr(&frames[1..]),
82            "INCRBY" => parse_incrby(&frames[1..]),
83            "DECRBY" => parse_decrby(&frames[1..]),
84            "INCRBYFLOAT" => parse_incrbyfloat(&frames[1..]),
85            "APPEND" => parse_append(&frames[1..]),
86            "STRLEN" => parse_strlen(&frames[1..]),
87            "SETNX" => parse_setnx(&frames[1..]),
88            "SETEX" => parse_setex(&frames[1..]),
89            "PSETEX" => parse_psetex(&frames[1..]),
90            "GETRANGE" | "SUBSTR" => parse_getrange(&frames[1..]),
91            "SETRANGE" => parse_setrange(&frames[1..]),
92            "GETBIT" => parse_getbit(&frames[1..]),
93            "SETBIT" => parse_setbit(&frames[1..]),
94            "BITCOUNT" => parse_bitcount(&frames[1..]),
95            "BITPOS" => parse_bitpos(&frames[1..]),
96            "BITOP" => parse_bitop(&frames[1..]),
97            "KEYS" => parse_keys(&frames[1..]),
98            "RENAME" => parse_rename(&frames[1..]),
99            "DEL" => parse_del(&frames[1..]),
100            "UNLINK" => parse_unlink(&frames[1..]),
101            "EXISTS" => parse_exists(&frames[1..]),
102            "MGET" => parse_mget(&frames[1..]),
103            "MSET" => parse_mset(&frames[1..]),
104            "MSETNX" => parse_msetnx(&frames[1..]),
105            "GETSET" => parse_getset(&frames[1..]),
106            "EXPIRE" => parse_expire(&frames[1..]),
107            "EXPIREAT" => parse_expireat(&frames[1..]),
108            "TTL" => parse_ttl(&frames[1..]),
109            "PERSIST" => parse_persist(&frames[1..]),
110            "PTTL" => parse_pttl(&frames[1..]),
111            "PEXPIRE" => parse_pexpire(&frames[1..]),
112            "PEXPIREAT" => parse_pexpireat_cmd(&frames[1..]),
113            "DBSIZE" => parse_dbsize(&frames[1..]),
114            "INFO" => parse_info(&frames[1..]),
115            "BGSAVE" => parse_bgsave(&frames[1..]),
116            "BGREWRITEAOF" => parse_bgrewriteaof(&frames[1..]),
117            "FLUSHDB" => parse_flushdb(&frames[1..]),
118            "FLUSHALL" => parse_flushall(&frames[1..]),
119            "MEMORY" => parse_memory_cmd(&frames[1..]),
120            "SCAN" => parse_scan(&frames[1..]),
121            "SSCAN" => parse_key_scan(&frames[1..], "SSCAN"),
122            "HSCAN" => parse_key_scan(&frames[1..], "HSCAN"),
123            "ZSCAN" => parse_key_scan(&frames[1..], "ZSCAN"),
124            "LPUSH" => parse_lpush(&frames[1..]),
125            "RPUSH" => parse_rpush(&frames[1..]),
126            "LPOP" => parse_lpop(&frames[1..]),
127            "RPOP" => parse_rpop(&frames[1..]),
128            "LRANGE" => parse_lrange(&frames[1..]),
129            "LLEN" => parse_llen(&frames[1..]),
130            "BLPOP" => parse_blpop(&frames[1..]),
131            "BRPOP" => parse_brpop(&frames[1..]),
132            "LINDEX" => parse_lindex(&frames[1..]),
133            "LSET" => parse_lset(&frames[1..]),
134            "LTRIM" => parse_ltrim(&frames[1..]),
135            "LINSERT" => parse_linsert(&frames[1..]),
136            "LREM" => parse_lrem(&frames[1..]),
137            "LPOS" => parse_lpos(&frames[1..]),
138            "LMOVE" => parse_lmove(&frames[1..]),
139            "GETDEL" => parse_getdel(&frames[1..]),
140            "GETEX" => parse_getex(&frames[1..]),
141            "TYPE" => parse_type(&frames[1..]),
142            "ZADD" => parse_zadd(&frames[1..]),
143            "ZREM" => parse_zrem(&frames[1..]),
144            "ZSCORE" => parse_zscore(&frames[1..]),
145            "ZRANK" => parse_zrank(&frames[1..]),
146            "ZREVRANK" => parse_zrevrank(&frames[1..]),
147            "ZCARD" => parse_zcard(&frames[1..]),
148            "ZRANGE" => parse_zrange(&frames[1..]),
149            "ZREVRANGE" => parse_zrevrange(&frames[1..]),
150            "ZCOUNT" => parse_zcount(&frames[1..]),
151            "ZINCRBY" => parse_zincrby(&frames[1..]),
152            "ZRANGEBYSCORE" => parse_zrangebyscore(&frames[1..]),
153            "ZREVRANGEBYSCORE" => parse_zrevrangebyscore(&frames[1..]),
154            "ZPOPMIN" => {
155                let (key, count) = parse_zpop_args(&frames[1..], "ZPOPMIN")?;
156                Ok(Command::ZPopMin { key, count })
157            }
158            "ZPOPMAX" => {
159                let (key, count) = parse_zpop_args(&frames[1..], "ZPOPMAX")?;
160                Ok(Command::ZPopMax { key, count })
161            }
162            "LMPOP" => parse_lmpop(&frames[1..]),
163            "ZMPOP" => parse_zmpop(&frames[1..]),
164            "ZDIFF" => parse_zset_multi("ZDIFF", &frames[1..]),
165            "ZINTER" => parse_zset_multi("ZINTER", &frames[1..]),
166            "ZUNION" => parse_zset_multi("ZUNION", &frames[1..]),
167            "ZDIFFSTORE" => parse_zset_store("ZDIFFSTORE", &frames[1..]),
168            "ZINTERSTORE" => parse_zset_store("ZINTERSTORE", &frames[1..]),
169            "ZUNIONSTORE" => parse_zset_store("ZUNIONSTORE", &frames[1..]),
170            "ZRANDMEMBER" => parse_zrandmember(&frames[1..]),
171            "HSET" => parse_hset(&frames[1..]),
172            "HGET" => parse_hget(&frames[1..]),
173            "HGETALL" => parse_hgetall(&frames[1..]),
174            "HDEL" => parse_hdel(&frames[1..]),
175            "HEXISTS" => parse_hexists(&frames[1..]),
176            "HLEN" => parse_hlen(&frames[1..]),
177            "HINCRBY" => parse_hincrby(&frames[1..]),
178            "HINCRBYFLOAT" => parse_hincrbyfloat(&frames[1..]),
179            "HKEYS" => parse_hkeys(&frames[1..]),
180            "HVALS" => parse_hvals(&frames[1..]),
181            "HMGET" => parse_hmget(&frames[1..]),
182            "HRANDFIELD" => parse_hrandfield(&frames[1..]),
183            "SADD" => parse_sadd(&frames[1..]),
184            "SREM" => parse_srem(&frames[1..]),
185            "SMEMBERS" => parse_smembers(&frames[1..]),
186            "SISMEMBER" => parse_sismember(&frames[1..]),
187            "SCARD" => parse_scard(&frames[1..]),
188            "SUNION" => parse_multi_key_set("SUNION", &frames[1..]),
189            "SINTER" => parse_multi_key_set("SINTER", &frames[1..]),
190            "SDIFF" => parse_multi_key_set("SDIFF", &frames[1..]),
191            "SUNIONSTORE" => parse_store_set("SUNIONSTORE", &frames[1..]),
192            "SINTERSTORE" => parse_store_set("SINTERSTORE", &frames[1..]),
193            "SDIFFSTORE" => parse_store_set("SDIFFSTORE", &frames[1..]),
194            "SRANDMEMBER" => parse_srandmember(&frames[1..]),
195            "SPOP" => parse_spop(&frames[1..]),
196            "SMISMEMBER" => parse_smismember(&frames[1..]),
197            "SMOVE" => parse_smove(&frames[1..]),
198            "SINTERCARD" => parse_sintercard(&frames[1..]),
199            "EXPIRETIME" => parse_expiretime(&frames[1..]),
200            "PEXPIRETIME" => parse_pexpiretime(&frames[1..]),
201            "CLUSTER" => parse_cluster(&frames[1..]),
202            "ASKING" => parse_asking(&frames[1..]),
203            "MIGRATE" => parse_migrate(&frames[1..]),
204            "RESTORE" => parse_restore(&frames[1..]),
205            "CONFIG" => parse_config(&frames[1..]),
206            "COMMAND" => parse_command_cmd(&frames[1..]),
207            "MULTI" => parse_no_args("MULTI", &frames[1..], Command::Multi),
208            "EXEC" => parse_no_args("EXEC", &frames[1..], Command::Exec),
209            "DISCARD" => parse_no_args("DISCARD", &frames[1..], Command::Discard),
210            "WATCH" => parse_watch(&frames[1..]),
211            "UNWATCH" => parse_no_args("UNWATCH", &frames[1..], Command::Unwatch),
212            "SLOWLOG" => parse_slowlog(&frames[1..]),
213            "SUBSCRIBE" => parse_subscribe(&frames[1..]),
214            "UNSUBSCRIBE" => parse_unsubscribe(&frames[1..]),
215            "PSUBSCRIBE" => parse_psubscribe(&frames[1..]),
216            "PUNSUBSCRIBE" => parse_punsubscribe(&frames[1..]),
217            "PUBLISH" => parse_publish(&frames[1..]),
218            "PUBSUB" => parse_pubsub(&frames[1..]),
219            "VADD" => parse_vadd(&frames[1..]),
220            "VADD_BATCH" => parse_vadd_batch(&frames[1..]),
221            "VSIM" => parse_vsim(&frames[1..]),
222            "VREM" => parse_vrem(&frames[1..]),
223            "VGET" => parse_vget(&frames[1..]),
224            "VCARD" => parse_vcard(&frames[1..]),
225            "VDIM" => parse_vdim(&frames[1..]),
226            "VINFO" => parse_vinfo(&frames[1..]),
227            "PROTO.REGISTER" => parse_proto_register(&frames[1..]),
228            "PROTO.SET" => parse_proto_set(&frames[1..]),
229            "PROTO.GET" => parse_proto_get(&frames[1..]),
230            "PROTO.TYPE" => parse_proto_type(&frames[1..]),
231            "PROTO.SCHEMAS" => parse_proto_schemas(&frames[1..]),
232            "PROTO.DESCRIBE" => parse_proto_describe(&frames[1..]),
233            "PROTO.GETFIELD" => parse_proto_getfield(&frames[1..]),
234            "PROTO.SETFIELD" => parse_proto_setfield(&frames[1..]),
235            "PROTO.DELFIELD" => parse_proto_delfield(&frames[1..]),
236            "TIME" => parse_no_args("TIME", &frames[1..], Command::Time),
237            "LASTSAVE" => parse_no_args("LASTSAVE", &frames[1..], Command::LastSave),
238            "ROLE" => parse_no_args("ROLE", &frames[1..], Command::Role),
239            "WAIT" => parse_wait(&frames[1..]),
240            "OBJECT" => parse_object(&frames[1..]),
241            "COPY" => parse_copy(&frames[1..]),
242            "CLIENT" => parse_client(&frames[1..]),
243            "ACL" => parse_acl(&frames[1..]),
244            "AUTH" => parse_auth(&frames[1..]),
245            "QUIT" => parse_quit(&frames[1..]),
246            "MONITOR" => parse_monitor(&frames[1..]),
247            "RANDOMKEY" => parse_no_args("RANDOMKEY", &frames[1..], Command::RandomKey),
248            "TOUCH" => parse_touch(&frames[1..]),
249            "SORT" => parse_sort(&frames[1..]),
250            _ => {
251                // only allocate for truly unknown commands
252                let name = extract_string(&frames[0])?;
253                Ok(Command::Unknown(name))
254            }
255        }
256    }
257}
258
259/// Extracts a UTF-8 string from a Bulk or Simple frame.
260///
261/// Validates UTF-8 in-place on the Bytes buffer to avoid an
262/// intermediate Vec<u8> allocation from `to_vec()`.
263fn extract_string(frame: &Frame) -> Result<String, ProtocolError> {
264    match frame {
265        Frame::Bulk(data) => {
266            let s = std::str::from_utf8(data).map_err(|_| {
267                ProtocolError::InvalidCommandFrame("command name is not valid utf-8".into())
268            })?;
269            Ok(s.to_owned())
270        }
271        Frame::Simple(s) => Ok(s.clone()),
272        _ => Err(ProtocolError::InvalidCommandFrame(
273            "expected bulk or simple string for command name".into(),
274        )),
275    }
276}
277
278/// Extracts raw bytes from a Bulk or Simple frame.
279fn extract_bytes(frame: &Frame) -> Result<Bytes, ProtocolError> {
280    match frame {
281        Frame::Bulk(data) => Ok(data.clone()),
282        Frame::Simple(s) => Ok(Bytes::copy_from_slice(s.as_bytes())),
283        _ => Err(ProtocolError::InvalidCommandFrame(
284            "expected bulk or simple string argument".into(),
285        )),
286    }
287}
288
289/// Extracts all frames in a slice as UTF-8 strings.
290fn extract_strings(frames: &[Frame]) -> Result<Vec<String>, ProtocolError> {
291    frames.iter().map(extract_string).collect()
292}
293
294/// Extracts all frames in a slice as raw byte buffers.
295fn extract_bytes_vec(frames: &[Frame]) -> Result<Vec<Bytes>, ProtocolError> {
296    frames.iter().map(extract_bytes).collect()
297}
298
299/// Maximum length for a command name or keyword uppercased on the stack.
300/// All Redis/Ember commands and subcommands are well under this limit.
301const MAX_KEYWORD_LEN: usize = 32;
302
303/// Returns the raw bytes of a Bulk or Simple frame without allocating.
304fn extract_raw_bytes(frame: &Frame) -> Result<&[u8], ProtocolError> {
305    match frame {
306        Frame::Bulk(data) => Ok(data.as_ref()),
307        Frame::Simple(s) => Ok(s.as_bytes()),
308        _ => Err(ProtocolError::InvalidCommandFrame(
309            "expected bulk or simple string".into(),
310        )),
311    }
312}
313
314/// Uppercases a frame's bytes into a stack buffer and returns the result as `&str`.
315///
316/// Used for command names, subcommands, and option flags where the value is always
317/// a short ASCII keyword. Avoids the two heap allocations that
318/// `extract_string()?.to_ascii_uppercase()` would require.
319fn uppercase_arg<'b>(
320    frame: &Frame,
321    buf: &'b mut [u8; MAX_KEYWORD_LEN],
322) -> Result<&'b str, ProtocolError> {
323    let bytes = extract_raw_bytes(frame)?;
324    let len = bytes.len();
325    if len > MAX_KEYWORD_LEN {
326        return Err(ProtocolError::InvalidCommandFrame(
327            "keyword too long".into(),
328        ));
329    }
330    buf[..len].copy_from_slice(bytes);
331    buf[..len].make_ascii_uppercase();
332    std::str::from_utf8(&buf[..len])
333        .map_err(|_| ProtocolError::InvalidCommandFrame("keyword is not valid utf-8".into()))
334}
335
336/// Parses a frame's bytes directly as a positive u64 without allocating a String.
337fn parse_u64(frame: &Frame, cmd: &str) -> Result<u64, ProtocolError> {
338    let bytes = extract_raw_bytes(frame)?;
339    parse_u64_bytes(bytes).ok_or_else(|| {
340        ProtocolError::InvalidCommandFrame(format!("value is not a valid integer for '{cmd}'"))
341    })
342}
343
344/// Parses a frame's bytes as a `usize`. Used for optional count arguments.
345fn parse_usize(frame: &Frame, cmd: &str) -> Result<usize, ProtocolError> {
346    let n = parse_u64(frame, cmd)?;
347    usize::try_from(n).map_err(|_| {
348        ProtocolError::InvalidCommandFrame(format!("count is out of range for '{cmd}'"))
349    })
350}
351
352/// Parses an unsigned integer directly from a byte slice.
353fn parse_u64_bytes(buf: &[u8]) -> Option<u64> {
354    if buf.is_empty() {
355        return None;
356    }
357    let mut n: u64 = 0;
358    for &b in buf {
359        if !b.is_ascii_digit() {
360            return None;
361        }
362        n = n.checked_mul(10)?.checked_add((b - b'0') as u64)?;
363    }
364    Some(n)
365}
366
367/// Shorthand for the wrong-arity error returned by every parser.
368fn wrong_arity(cmd: &'static str) -> ProtocolError {
369    ProtocolError::WrongArity(cmd.into())
370}
371
372fn parse_ping(args: &[Frame]) -> Result<Command, ProtocolError> {
373    match args.len() {
374        0 => Ok(Command::Ping(None)),
375        1 => {
376            let msg = extract_bytes(&args[0])?;
377            Ok(Command::Ping(Some(msg)))
378        }
379        _ => Err(wrong_arity("PING")),
380    }
381}
382
383fn parse_echo(args: &[Frame]) -> Result<Command, ProtocolError> {
384    if args.len() != 1 {
385        return Err(wrong_arity("ECHO"));
386    }
387    let msg = extract_bytes(&args[0])?;
388    Ok(Command::Echo(msg))
389}
390
391fn parse_get(args: &[Frame]) -> Result<Command, ProtocolError> {
392    if args.len() != 1 {
393        return Err(wrong_arity("GET"));
394    }
395    let key = extract_string(&args[0])?;
396    Ok(Command::Get { key })
397}
398
399/// Parses NX / XX / EX / PX options from a slice of command arguments.
400///
401/// Returns `(expire, nx, xx)`. `cmd` is used in error messages.
402fn parse_set_options(
403    args: &[Frame],
404    cmd: &'static str,
405) -> Result<(Option<SetExpire>, bool, bool), ProtocolError> {
406    let mut expire = None;
407    let mut nx = false;
408    let mut xx = false;
409    let mut idx = 0;
410
411    while idx < args.len() {
412        let mut kw = [0u8; MAX_KEYWORD_LEN];
413        let flag = uppercase_arg(&args[idx], &mut kw)?;
414        match flag {
415            "NX" => {
416                nx = true;
417                idx += 1;
418            }
419            "XX" => {
420                xx = true;
421                idx += 1;
422            }
423            "EX" => {
424                idx += 1;
425                if idx >= args.len() {
426                    return Err(wrong_arity(cmd));
427                }
428                let amount = parse_u64(&args[idx], cmd)?;
429                if amount == 0 {
430                    return Err(ProtocolError::InvalidCommandFrame(format!(
431                        "invalid expire time in '{cmd}' command"
432                    )));
433                }
434                expire = Some(SetExpire::Ex(amount));
435                idx += 1;
436            }
437            "PX" => {
438                idx += 1;
439                if idx >= args.len() {
440                    return Err(wrong_arity(cmd));
441                }
442                let amount = parse_u64(&args[idx], cmd)?;
443                if amount == 0 {
444                    return Err(ProtocolError::InvalidCommandFrame(format!(
445                        "invalid expire time in '{cmd}' command"
446                    )));
447                }
448                expire = Some(SetExpire::Px(amount));
449                idx += 1;
450            }
451            _ => {
452                return Err(ProtocolError::InvalidCommandFrame(format!(
453                    "unsupported {cmd} option '{flag}'"
454                )));
455            }
456        }
457    }
458
459    if nx && xx {
460        return Err(ProtocolError::InvalidCommandFrame(
461            "XX and NX options at the same time are not compatible".into(),
462        ));
463    }
464
465    Ok((expire, nx, xx))
466}
467
468fn parse_set(args: &[Frame]) -> Result<Command, ProtocolError> {
469    if args.len() < 2 {
470        return Err(wrong_arity("SET"));
471    }
472
473    let key = extract_string(&args[0])?;
474    let value = extract_bytes(&args[1])?;
475    let (expire, nx, xx) = parse_set_options(&args[2..], "SET")?;
476
477    Ok(Command::Set {
478        key,
479        value,
480        expire,
481        nx,
482        xx,
483    })
484}
485
486fn parse_incr(args: &[Frame]) -> Result<Command, ProtocolError> {
487    if args.len() != 1 {
488        return Err(wrong_arity("INCR"));
489    }
490    let key = extract_string(&args[0])?;
491    Ok(Command::Incr { key })
492}
493
494fn parse_decr(args: &[Frame]) -> Result<Command, ProtocolError> {
495    if args.len() != 1 {
496        return Err(wrong_arity("DECR"));
497    }
498    let key = extract_string(&args[0])?;
499    Ok(Command::Decr { key })
500}
501
502fn parse_incrby(args: &[Frame]) -> Result<Command, ProtocolError> {
503    if args.len() != 2 {
504        return Err(wrong_arity("INCRBY"));
505    }
506    let key = extract_string(&args[0])?;
507    let delta = parse_i64(&args[1], "INCRBY")?;
508    Ok(Command::IncrBy { key, delta })
509}
510
511fn parse_decrby(args: &[Frame]) -> Result<Command, ProtocolError> {
512    if args.len() != 2 {
513        return Err(wrong_arity("DECRBY"));
514    }
515    let key = extract_string(&args[0])?;
516    let delta = parse_i64(&args[1], "DECRBY")?;
517    Ok(Command::DecrBy { key, delta })
518}
519
520fn parse_incrbyfloat(args: &[Frame]) -> Result<Command, ProtocolError> {
521    if args.len() != 2 {
522        return Err(wrong_arity("INCRBYFLOAT"));
523    }
524    let key = extract_string(&args[0])?;
525    let s = extract_string(&args[1])?;
526    let delta: f64 = s.parse().map_err(|_| {
527        ProtocolError::InvalidCommandFrame("value is not a valid float for 'INCRBYFLOAT'".into())
528    })?;
529    if delta.is_nan() || delta.is_infinite() {
530        return Err(ProtocolError::InvalidCommandFrame(
531            "increment would produce NaN or Infinity".into(),
532        ));
533    }
534    Ok(Command::IncrByFloat { key, delta })
535}
536
537fn parse_append(args: &[Frame]) -> Result<Command, ProtocolError> {
538    if args.len() != 2 {
539        return Err(wrong_arity("APPEND"));
540    }
541    let key = extract_string(&args[0])?;
542    let value = extract_bytes(&args[1])?;
543    Ok(Command::Append { key, value })
544}
545
546fn parse_strlen(args: &[Frame]) -> Result<Command, ProtocolError> {
547    if args.len() != 1 {
548        return Err(wrong_arity("STRLEN"));
549    }
550    let key = extract_string(&args[0])?;
551    Ok(Command::Strlen { key })
552}
553
554/// SETNX key value — set key only if it does not exist.
555/// Equivalent to `SET key value NX`.
556fn parse_setnx(args: &[Frame]) -> Result<Command, ProtocolError> {
557    if args.len() != 2 {
558        return Err(wrong_arity("SETNX"));
559    }
560    let key = extract_string(&args[0])?;
561    let value = extract_bytes(&args[1])?;
562    Ok(Command::Set {
563        key,
564        value,
565        expire: None,
566        nx: true,
567        xx: false,
568    })
569}
570
571/// SETEX key seconds value — set key with an expiration in seconds.
572/// Equivalent to `SET key value EX seconds`.
573fn parse_setex(args: &[Frame]) -> Result<Command, ProtocolError> {
574    if args.len() != 3 {
575        return Err(wrong_arity("SETEX"));
576    }
577    let key = extract_string(&args[0])?;
578    let seconds = parse_u64(&args[1], "SETEX")?;
579    if seconds == 0 {
580        return Err(ProtocolError::InvalidCommandFrame(
581            "invalid expire time in 'SETEX' command".into(),
582        ));
583    }
584    let value = extract_bytes(&args[2])?;
585    Ok(Command::Set {
586        key,
587        value,
588        expire: Some(SetExpire::Ex(seconds)),
589        nx: false,
590        xx: false,
591    })
592}
593
594/// PSETEX key milliseconds value — set key with an expiration in milliseconds.
595/// Equivalent to `SET key value PX milliseconds`.
596fn parse_psetex(args: &[Frame]) -> Result<Command, ProtocolError> {
597    if args.len() != 3 {
598        return Err(wrong_arity("PSETEX"));
599    }
600    let key = extract_string(&args[0])?;
601    let ms = parse_u64(&args[1], "PSETEX")?;
602    if ms == 0 {
603        return Err(ProtocolError::InvalidCommandFrame(
604            "invalid expire time in 'PSETEX' command".into(),
605        ));
606    }
607    let value = extract_bytes(&args[2])?;
608    Ok(Command::Set {
609        key,
610        value,
611        expire: Some(SetExpire::Px(ms)),
612        nx: false,
613        xx: false,
614    })
615}
616
617/// GETRANGE key start end (also aliases SUBSTR).
618fn parse_getrange(args: &[Frame]) -> Result<Command, ProtocolError> {
619    if args.len() != 3 {
620        return Err(wrong_arity("GETRANGE"));
621    }
622    let key = extract_string(&args[0])?;
623    let start = parse_i64(&args[1], "GETRANGE")?;
624    let end = parse_i64(&args[2], "GETRANGE")?;
625    Ok(Command::GetRange { key, start, end })
626}
627
628/// SETRANGE key offset value.
629fn parse_setrange(args: &[Frame]) -> Result<Command, ProtocolError> {
630    if args.len() != 3 {
631        return Err(wrong_arity("SETRANGE"));
632    }
633    let key = extract_string(&args[0])?;
634    let offset = parse_u64(&args[1], "SETRANGE")? as usize;
635    let value = extract_bytes(&args[2])?;
636    Ok(Command::SetRange { key, offset, value })
637}
638
639/// GETBIT key offset.
640fn parse_getbit(args: &[Frame]) -> Result<Command, ProtocolError> {
641    if args.len() != 2 {
642        return Err(wrong_arity("GETBIT"));
643    }
644    let key = extract_string(&args[0])?;
645    let offset = parse_u64(&args[1], "GETBIT")?;
646    Ok(Command::GetBit { key, offset })
647}
648
649/// SETBIT key offset value.
650fn parse_setbit(args: &[Frame]) -> Result<Command, ProtocolError> {
651    if args.len() != 3 {
652        return Err(wrong_arity("SETBIT"));
653    }
654    let key = extract_string(&args[0])?;
655    let offset = parse_u64(&args[1], "SETBIT")?;
656    let raw = parse_u64(&args[2], "SETBIT")?;
657    if raw > 1 {
658        return Err(ProtocolError::InvalidCommandFrame(
659            "SETBIT: bit value must be 0 or 1".into(),
660        ));
661    }
662    Ok(Command::SetBit {
663        key,
664        offset,
665        value: raw as u8,
666    })
667}
668
669/// Parses an optional `[start end [BYTE|BIT]]` suffix for BITCOUNT / BITPOS.
670///
671/// Accepts 0, 2, or 3 trailing arguments. Returns `None` when there are none.
672fn parse_bit_range(args: &[Frame], cmd: &str) -> Result<Option<BitRange>, ProtocolError> {
673    match args.len() {
674        0 => Ok(None),
675        2 | 3 => {
676            let start = parse_i64(&args[0], cmd)?;
677            let end = parse_i64(&args[1], cmd)?;
678            let unit = if args.len() == 3 {
679                let mut kw = [0u8; MAX_KEYWORD_LEN];
680                match uppercase_arg(&args[2], &mut kw)? {
681                    "BYTE" => BitRangeUnit::Byte,
682                    "BIT" => BitRangeUnit::Bit,
683                    other => {
684                        return Err(ProtocolError::InvalidCommandFrame(format!(
685                            "{cmd}: invalid unit '{other}', expected BYTE or BIT"
686                        )));
687                    }
688                }
689            } else {
690                BitRangeUnit::Byte
691            };
692            Ok(Some(BitRange { start, end, unit }))
693        }
694        _ => Err(ProtocolError::InvalidCommandFrame(format!(
695            "{cmd}: wrong number of arguments"
696        ))),
697    }
698}
699
700/// BITCOUNT key [start end [BYTE|BIT]].
701fn parse_bitcount(args: &[Frame]) -> Result<Command, ProtocolError> {
702    if args.is_empty() {
703        return Err(wrong_arity("BITCOUNT"));
704    }
705    let key = extract_string(&args[0])?;
706    let range = parse_bit_range(&args[1..], "BITCOUNT")?;
707    Ok(Command::BitCount { key, range })
708}
709
710/// BITPOS key bit [start [end [BYTE|BIT]]].
711///
712/// Redis allows 1, 2, or 3 trailing args (start, start+end, start+end+unit).
713/// No trailing args means "search the whole string".
714fn parse_bitpos(args: &[Frame]) -> Result<Command, ProtocolError> {
715    if args.len() < 2 {
716        return Err(wrong_arity("BITPOS"));
717    }
718    let key = extract_string(&args[0])?;
719    let raw = parse_u64(&args[1], "BITPOS")?;
720    if raw > 1 {
721        return Err(ProtocolError::InvalidCommandFrame(
722            "BITPOS: bit value must be 0 or 1".into(),
723        ));
724    }
725    let bit = raw as u8;
726    let range = match args.len() - 2 {
727        0 => None,
728        1 => {
729            let start = parse_i64(&args[2], "BITPOS")?;
730            Some(BitRange {
731                start,
732                end: -1,
733                unit: BitRangeUnit::Byte,
734            })
735        }
736        2 | 3 => parse_bit_range(&args[2..], "BITPOS")?,
737        _ => {
738            return Err(ProtocolError::InvalidCommandFrame(
739                "BITPOS: wrong number of arguments".into(),
740            ))
741        }
742    };
743    Ok(Command::BitPos { key, bit, range })
744}
745
746/// BITOP AND|OR|XOR|NOT destkey key [key ...].
747fn parse_bitop(args: &[Frame]) -> Result<Command, ProtocolError> {
748    if args.len() < 3 {
749        return Err(wrong_arity("BITOP"));
750    }
751    let mut kw = [0u8; MAX_KEYWORD_LEN];
752    let op = match uppercase_arg(&args[0], &mut kw)? {
753        "AND" => BitOpKind::And,
754        "OR" => BitOpKind::Or,
755        "XOR" => BitOpKind::Xor,
756        "NOT" => BitOpKind::Not,
757        other => {
758            return Err(ProtocolError::InvalidCommandFrame(format!(
759                "BITOP: unknown operation '{other}'"
760            )));
761        }
762    };
763    let dest = extract_string(&args[1])?;
764    let keys = extract_strings(&args[2..])?;
765    if op == BitOpKind::Not && keys.len() != 1 {
766        return Err(ProtocolError::InvalidCommandFrame(
767            "BITOP NOT must be called with a single source key".into(),
768        ));
769    }
770    Ok(Command::BitOp { op, dest, keys })
771}
772
773fn parse_keys(args: &[Frame]) -> Result<Command, ProtocolError> {
774    if args.len() != 1 {
775        return Err(wrong_arity("KEYS"));
776    }
777    let pattern = extract_string(&args[0])?;
778    Ok(Command::Keys { pattern })
779}
780
781fn parse_rename(args: &[Frame]) -> Result<Command, ProtocolError> {
782    if args.len() != 2 {
783        return Err(wrong_arity("RENAME"));
784    }
785    let key = extract_string(&args[0])?;
786    let newkey = extract_string(&args[1])?;
787    Ok(Command::Rename { key, newkey })
788}
789
790fn parse_del(args: &[Frame]) -> Result<Command, ProtocolError> {
791    if args.is_empty() {
792        return Err(wrong_arity("DEL"));
793    }
794    let keys = extract_strings(args)?;
795    Ok(Command::Del { keys })
796}
797
798fn parse_exists(args: &[Frame]) -> Result<Command, ProtocolError> {
799    if args.is_empty() {
800        return Err(wrong_arity("EXISTS"));
801    }
802    let keys = extract_strings(args)?;
803    Ok(Command::Exists { keys })
804}
805
806fn parse_mget(args: &[Frame]) -> Result<Command, ProtocolError> {
807    if args.is_empty() {
808        return Err(wrong_arity("MGET"));
809    }
810    let keys = extract_strings(args)?;
811    Ok(Command::MGet { keys })
812}
813
814fn parse_mset(args: &[Frame]) -> Result<Command, ProtocolError> {
815    if args.is_empty() || !args.len().is_multiple_of(2) {
816        return Err(wrong_arity("MSET"));
817    }
818    let mut pairs = Vec::with_capacity(args.len() / 2);
819    for chunk in args.chunks(2) {
820        let key = extract_string(&chunk[0])?;
821        let value = extract_bytes(&chunk[1])?;
822        pairs.push((key, value));
823    }
824    Ok(Command::MSet { pairs })
825}
826
827fn parse_msetnx(args: &[Frame]) -> Result<Command, ProtocolError> {
828    if args.is_empty() || !args.len().is_multiple_of(2) {
829        return Err(wrong_arity("MSETNX"));
830    }
831    let mut pairs = Vec::with_capacity(args.len() / 2);
832    for chunk in args.chunks(2) {
833        let key = extract_string(&chunk[0])?;
834        let value = extract_bytes(&chunk[1])?;
835        pairs.push((key, value));
836    }
837    Ok(Command::MSetNx { pairs })
838}
839
840fn parse_getset(args: &[Frame]) -> Result<Command, ProtocolError> {
841    if args.len() != 2 {
842        return Err(wrong_arity("GETSET"));
843    }
844    let key = extract_string(&args[0])?;
845    let value = extract_bytes(&args[1])?;
846    Ok(Command::GetSet { key, value })
847}
848
849fn parse_expire(args: &[Frame]) -> Result<Command, ProtocolError> {
850    if args.len() != 2 {
851        return Err(wrong_arity("EXPIRE"));
852    }
853    let key = extract_string(&args[0])?;
854    let seconds = parse_u64(&args[1], "EXPIRE")?;
855
856    if seconds == 0 {
857        return Err(ProtocolError::InvalidCommandFrame(
858            "invalid expire time in 'EXPIRE' command".into(),
859        ));
860    }
861
862    Ok(Command::Expire { key, seconds })
863}
864
865fn parse_ttl(args: &[Frame]) -> Result<Command, ProtocolError> {
866    if args.len() != 1 {
867        return Err(wrong_arity("TTL"));
868    }
869    let key = extract_string(&args[0])?;
870    Ok(Command::Ttl { key })
871}
872
873fn parse_persist(args: &[Frame]) -> Result<Command, ProtocolError> {
874    if args.len() != 1 {
875        return Err(wrong_arity("PERSIST"));
876    }
877    let key = extract_string(&args[0])?;
878    Ok(Command::Persist { key })
879}
880
881fn parse_pttl(args: &[Frame]) -> Result<Command, ProtocolError> {
882    if args.len() != 1 {
883        return Err(wrong_arity("PTTL"));
884    }
885    let key = extract_string(&args[0])?;
886    Ok(Command::Pttl { key })
887}
888
889fn parse_pexpire(args: &[Frame]) -> Result<Command, ProtocolError> {
890    if args.len() != 2 {
891        return Err(wrong_arity("PEXPIRE"));
892    }
893    let key = extract_string(&args[0])?;
894    let milliseconds = parse_u64(&args[1], "PEXPIRE")?;
895
896    if milliseconds == 0 {
897        return Err(ProtocolError::InvalidCommandFrame(
898            "invalid expire time in 'PEXPIRE' command".into(),
899        ));
900    }
901
902    Ok(Command::Pexpire { key, milliseconds })
903}
904
905fn parse_expireat(args: &[Frame]) -> Result<Command, ProtocolError> {
906    if args.len() != 2 {
907        return Err(wrong_arity("EXPIREAT"));
908    }
909    let key = extract_string(&args[0])?;
910    let timestamp = parse_u64(&args[1], "EXPIREAT")?;
911    Ok(Command::Expireat { key, timestamp })
912}
913
914fn parse_pexpireat_cmd(args: &[Frame]) -> Result<Command, ProtocolError> {
915    if args.len() != 2 {
916        return Err(wrong_arity("PEXPIREAT"));
917    }
918    let key = extract_string(&args[0])?;
919    let timestamp_ms = parse_u64(&args[1], "PEXPIREAT")?;
920    Ok(Command::Pexpireat { key, timestamp_ms })
921}
922
923fn parse_expiretime(args: &[Frame]) -> Result<Command, ProtocolError> {
924    if args.len() != 1 {
925        return Err(wrong_arity("EXPIRETIME"));
926    }
927    let key = extract_string(&args[0])?;
928    Ok(Command::Expiretime { key })
929}
930
931fn parse_pexpiretime(args: &[Frame]) -> Result<Command, ProtocolError> {
932    if args.len() != 1 {
933        return Err(wrong_arity("PEXPIRETIME"));
934    }
935    let key = extract_string(&args[0])?;
936    Ok(Command::Pexpiretime { key })
937}
938
939fn parse_smove(args: &[Frame]) -> Result<Command, ProtocolError> {
940    if args.len() != 3 {
941        return Err(wrong_arity("SMOVE"));
942    }
943    let source = extract_string(&args[0])?;
944    let destination = extract_string(&args[1])?;
945    let member = extract_string(&args[2])?;
946    Ok(Command::SMove {
947        source,
948        destination,
949        member,
950    })
951}
952
953fn parse_sintercard(args: &[Frame]) -> Result<Command, ProtocolError> {
954    if args.len() < 2 {
955        return Err(wrong_arity("SINTERCARD"));
956    }
957    let numkeys = parse_u64(&args[0], "SINTERCARD")? as usize;
958    if numkeys == 0 || args.len() < 1 + numkeys {
959        return Err(ProtocolError::InvalidCommandFrame(
960            "SINTERCARD numkeys must be positive and match the number of keys provided".into(),
961        ));
962    }
963    let keys: Vec<String> = args[1..=numkeys]
964        .iter()
965        .map(extract_string)
966        .collect::<Result<_, _>>()?;
967    // optional LIMIT n
968    let limit = if args.len() == numkeys + 3 {
969        let tag = extract_string(&args[numkeys + 1])?.to_ascii_uppercase();
970        if tag != "LIMIT" {
971            return Err(ProtocolError::InvalidCommandFrame(
972                "SINTERCARD: expected LIMIT keyword".into(),
973            ));
974        }
975        parse_u64(&args[numkeys + 2], "SINTERCARD")? as usize
976    } else if args.len() == numkeys + 1 {
977        0
978    } else {
979        return Err(wrong_arity("SINTERCARD"));
980    };
981    Ok(Command::SInterCard { keys, limit })
982}
983
984fn parse_dbsize(args: &[Frame]) -> Result<Command, ProtocolError> {
985    if !args.is_empty() {
986        return Err(wrong_arity("DBSIZE"));
987    }
988    Ok(Command::DbSize)
989}
990
991fn parse_info(args: &[Frame]) -> Result<Command, ProtocolError> {
992    match args.len() {
993        0 => Ok(Command::Info { section: None }),
994        1 => {
995            let section = extract_string(&args[0])?;
996            Ok(Command::Info {
997                section: Some(section),
998            })
999        }
1000        _ => Err(wrong_arity("INFO")),
1001    }
1002}
1003
1004fn parse_bgsave(args: &[Frame]) -> Result<Command, ProtocolError> {
1005    if !args.is_empty() {
1006        return Err(wrong_arity("BGSAVE"));
1007    }
1008    Ok(Command::BgSave)
1009}
1010
1011fn parse_bgrewriteaof(args: &[Frame]) -> Result<Command, ProtocolError> {
1012    if !args.is_empty() {
1013        return Err(wrong_arity("BGREWRITEAOF"));
1014    }
1015    Ok(Command::BgRewriteAof)
1016}
1017
1018fn parse_flushdb(args: &[Frame]) -> Result<Command, ProtocolError> {
1019    if args.is_empty() {
1020        return Ok(Command::FlushDb { async_mode: false });
1021    }
1022    if args.len() == 1 {
1023        let arg = extract_string(&args[0])?;
1024        if arg.eq_ignore_ascii_case("ASYNC") {
1025            return Ok(Command::FlushDb { async_mode: true });
1026        }
1027    }
1028    Err(wrong_arity("FLUSHDB"))
1029}
1030
1031fn parse_flushall(args: &[Frame]) -> Result<Command, ProtocolError> {
1032    if args.is_empty() {
1033        return Ok(Command::FlushAll { async_mode: false });
1034    }
1035    if args.len() == 1 {
1036        let arg = extract_string(&args[0])?;
1037        if arg.eq_ignore_ascii_case("ASYNC") {
1038            return Ok(Command::FlushAll { async_mode: true });
1039        }
1040    }
1041    Err(wrong_arity("FLUSHALL"))
1042}
1043
1044fn parse_memory_cmd(args: &[Frame]) -> Result<Command, ProtocolError> {
1045    if args.is_empty() {
1046        return Err(ProtocolError::WrongArity("memory".into()));
1047    }
1048    let subcommand = extract_string(&args[0])?;
1049    if subcommand.eq_ignore_ascii_case("USAGE") {
1050        if args.len() < 2 {
1051            return Err(wrong_arity("MEMORY USAGE"));
1052        }
1053        let key = extract_string(&args[1])?;
1054        // Accept but ignore SAMPLES count — we always use the cached value size.
1055        Ok(Command::MemoryUsage { key })
1056    } else {
1057        Err(ProtocolError::InvalidCommandFrame(format!(
1058            "unknown subcommand '{}' for 'memory' command",
1059            subcommand
1060        )))
1061    }
1062}
1063
1064fn parse_unlink(args: &[Frame]) -> Result<Command, ProtocolError> {
1065    if args.is_empty() {
1066        return Err(wrong_arity("UNLINK"));
1067    }
1068    let keys = extract_strings(args)?;
1069    Ok(Command::Unlink { keys })
1070}
1071
1072fn parse_scan(args: &[Frame]) -> Result<Command, ProtocolError> {
1073    if args.is_empty() {
1074        return Err(wrong_arity("SCAN"));
1075    }
1076
1077    let cursor = parse_u64(&args[0], "SCAN")?;
1078    let mut pattern = None;
1079    let mut count = None;
1080    let mut idx = 1;
1081
1082    while idx < args.len() {
1083        let mut kw = [0u8; MAX_KEYWORD_LEN];
1084        let flag = uppercase_arg(&args[idx], &mut kw)?;
1085        match flag {
1086            "MATCH" => {
1087                idx += 1;
1088                if idx >= args.len() {
1089                    return Err(wrong_arity("SCAN"));
1090                }
1091                pattern = Some(extract_string(&args[idx])?);
1092                idx += 1;
1093            }
1094            "COUNT" => {
1095                idx += 1;
1096                if idx >= args.len() {
1097                    return Err(wrong_arity("SCAN"));
1098                }
1099                let n = parse_u64(&args[idx], "SCAN")?;
1100                if n > MAX_SCAN_COUNT {
1101                    return Err(ProtocolError::InvalidCommandFrame(format!(
1102                        "SCAN COUNT {n} exceeds max {MAX_SCAN_COUNT}"
1103                    )));
1104                }
1105                count = Some(n as usize);
1106                idx += 1;
1107            }
1108            _ => {
1109                return Err(ProtocolError::InvalidCommandFrame(format!(
1110                    "unsupported SCAN option '{flag}'"
1111                )));
1112            }
1113        }
1114    }
1115
1116    Ok(Command::Scan {
1117        cursor,
1118        pattern,
1119        count,
1120    })
1121}
1122
1123/// Shared parser for SSCAN, HSCAN, ZSCAN.
1124///
1125/// All three share the same shape: `key cursor [MATCH pattern] [COUNT count]`.
1126fn parse_key_scan(args: &[Frame], cmd: &'static str) -> Result<Command, ProtocolError> {
1127    if args.len() < 2 {
1128        return Err(wrong_arity(cmd));
1129    }
1130
1131    let key = extract_string(&args[0])?;
1132    let cursor = parse_u64(&args[1], cmd)?;
1133    let mut pattern = None;
1134    let mut count = None;
1135    let mut idx = 2;
1136
1137    while idx < args.len() {
1138        let mut kw = [0u8; MAX_KEYWORD_LEN];
1139        let flag = uppercase_arg(&args[idx], &mut kw)?;
1140        match flag {
1141            "MATCH" => {
1142                idx += 1;
1143                if idx >= args.len() {
1144                    return Err(wrong_arity(cmd));
1145                }
1146                pattern = Some(extract_string(&args[idx])?);
1147                idx += 1;
1148            }
1149            "COUNT" => {
1150                idx += 1;
1151                if idx >= args.len() {
1152                    return Err(wrong_arity(cmd));
1153                }
1154                let n = parse_u64(&args[idx], cmd)?;
1155                if n > MAX_SCAN_COUNT {
1156                    return Err(ProtocolError::InvalidCommandFrame(format!(
1157                        "{cmd} COUNT {n} exceeds max {MAX_SCAN_COUNT}"
1158                    )));
1159                }
1160                count = Some(n as usize);
1161                idx += 1;
1162            }
1163            _ => {
1164                return Err(ProtocolError::InvalidCommandFrame(format!(
1165                    "unsupported {cmd} option '{flag}'"
1166                )));
1167            }
1168        }
1169    }
1170
1171    match cmd {
1172        "SSCAN" => Ok(Command::SScan {
1173            key,
1174            cursor,
1175            pattern,
1176            count,
1177        }),
1178        "HSCAN" => Ok(Command::HScan {
1179            key,
1180            cursor,
1181            pattern,
1182            count,
1183        }),
1184        "ZSCAN" => Ok(Command::ZScan {
1185            key,
1186            cursor,
1187            pattern,
1188            count,
1189        }),
1190        _ => Err(ProtocolError::InvalidCommandFrame(format!(
1191            "unknown scan command '{cmd}'"
1192        ))),
1193    }
1194}
1195
1196/// Parses a frame's bytes directly as an i64 without allocating a String.
1197fn parse_i64(frame: &Frame, cmd: &str) -> Result<i64, ProtocolError> {
1198    let bytes = extract_raw_bytes(frame)?;
1199    parse_i64_bytes(bytes).ok_or_else(|| {
1200        ProtocolError::InvalidCommandFrame(format!("value is not a valid integer for '{cmd}'"))
1201    })
1202}
1203
1204/// Parses a signed integer directly from a byte slice. Accumulates in the
1205/// negative direction for negative numbers so that `i64::MIN` is representable.
1206fn parse_i64_bytes(buf: &[u8]) -> Option<i64> {
1207    if buf.is_empty() {
1208        return None;
1209    }
1210    let (negative, digits) = if buf[0] == b'-' {
1211        (true, &buf[1..])
1212    } else {
1213        (false, buf)
1214    };
1215    if digits.is_empty() {
1216        return None;
1217    }
1218    if negative {
1219        let mut n: i64 = 0;
1220        for &b in digits {
1221            if !b.is_ascii_digit() {
1222                return None;
1223            }
1224            n = n.checked_mul(10)?.checked_sub((b - b'0') as i64)?;
1225        }
1226        Some(n)
1227    } else {
1228        let mut n: i64 = 0;
1229        for &b in digits {
1230            if !b.is_ascii_digit() {
1231                return None;
1232            }
1233            n = n.checked_mul(10)?.checked_add((b - b'0') as i64)?;
1234        }
1235        Some(n)
1236    }
1237}
1238
1239fn parse_lpush(args: &[Frame]) -> Result<Command, ProtocolError> {
1240    if args.len() < 2 {
1241        return Err(wrong_arity("LPUSH"));
1242    }
1243    let key = extract_string(&args[0])?;
1244    let values = extract_bytes_vec(&args[1..])?;
1245    Ok(Command::LPush { key, values })
1246}
1247
1248fn parse_rpush(args: &[Frame]) -> Result<Command, ProtocolError> {
1249    if args.len() < 2 {
1250        return Err(wrong_arity("RPUSH"));
1251    }
1252    let key = extract_string(&args[0])?;
1253    let values = extract_bytes_vec(&args[1..])?;
1254    Ok(Command::RPush { key, values })
1255}
1256
1257fn parse_lpop(args: &[Frame]) -> Result<Command, ProtocolError> {
1258    if args.is_empty() || args.len() > 2 {
1259        return Err(wrong_arity("LPOP"));
1260    }
1261    let key = extract_string(&args[0])?;
1262    let count = if args.len() == 2 {
1263        Some(parse_usize(&args[1], "LPOP")?)
1264    } else {
1265        None
1266    };
1267    Ok(Command::LPop { key, count })
1268}
1269
1270fn parse_rpop(args: &[Frame]) -> Result<Command, ProtocolError> {
1271    if args.is_empty() || args.len() > 2 {
1272        return Err(wrong_arity("RPOP"));
1273    }
1274    let key = extract_string(&args[0])?;
1275    let count = if args.len() == 2 {
1276        Some(parse_usize(&args[1], "RPOP")?)
1277    } else {
1278        None
1279    };
1280    Ok(Command::RPop { key, count })
1281}
1282
1283fn parse_lrange(args: &[Frame]) -> Result<Command, ProtocolError> {
1284    if args.len() != 3 {
1285        return Err(wrong_arity("LRANGE"));
1286    }
1287    let key = extract_string(&args[0])?;
1288    let start = parse_i64(&args[1], "LRANGE")?;
1289    let stop = parse_i64(&args[2], "LRANGE")?;
1290    Ok(Command::LRange { key, start, stop })
1291}
1292
1293fn parse_llen(args: &[Frame]) -> Result<Command, ProtocolError> {
1294    if args.len() != 1 {
1295        return Err(wrong_arity("LLEN"));
1296    }
1297    let key = extract_string(&args[0])?;
1298    Ok(Command::LLen { key })
1299}
1300
1301/// Parses BLPOP/BRPOP: all args except the last are keys, the last is the
1302/// timeout in seconds (float). At least one key is required.
1303fn parse_blpop(args: &[Frame]) -> Result<Command, ProtocolError> {
1304    if args.len() < 2 {
1305        return Err(wrong_arity("BLPOP"));
1306    }
1307    let timeout_secs = parse_timeout(&args[args.len() - 1], "BLPOP")?;
1308    let keys = extract_strings(&args[..args.len() - 1])?;
1309    Ok(Command::BLPop { keys, timeout_secs })
1310}
1311
1312fn parse_brpop(args: &[Frame]) -> Result<Command, ProtocolError> {
1313    if args.len() < 2 {
1314        return Err(wrong_arity("BRPOP"));
1315    }
1316    let timeout_secs = parse_timeout(&args[args.len() - 1], "BRPOP")?;
1317    let keys = extract_strings(&args[..args.len() - 1])?;
1318    Ok(Command::BRPop { keys, timeout_secs })
1319}
1320
1321fn parse_lindex(args: &[Frame]) -> Result<Command, ProtocolError> {
1322    if args.len() != 2 {
1323        return Err(wrong_arity("LINDEX"));
1324    }
1325    let key = extract_string(&args[0])?;
1326    let index = parse_i64(&args[1], "LINDEX")?;
1327    Ok(Command::LIndex { key, index })
1328}
1329
1330fn parse_lset(args: &[Frame]) -> Result<Command, ProtocolError> {
1331    if args.len() != 3 {
1332        return Err(wrong_arity("LSET"));
1333    }
1334    let key = extract_string(&args[0])?;
1335    let index = parse_i64(&args[1], "LSET")?;
1336    let value = extract_bytes(&args[2])?;
1337    Ok(Command::LSet { key, index, value })
1338}
1339
1340fn parse_ltrim(args: &[Frame]) -> Result<Command, ProtocolError> {
1341    if args.len() != 3 {
1342        return Err(wrong_arity("LTRIM"));
1343    }
1344    let key = extract_string(&args[0])?;
1345    let start = parse_i64(&args[1], "LTRIM")?;
1346    let stop = parse_i64(&args[2], "LTRIM")?;
1347    Ok(Command::LTrim { key, start, stop })
1348}
1349
1350fn parse_linsert(args: &[Frame]) -> Result<Command, ProtocolError> {
1351    if args.len() != 4 {
1352        return Err(wrong_arity("LINSERT"));
1353    }
1354    let key = extract_string(&args[0])?;
1355    let direction = extract_string(&args[1])?;
1356    let before = match direction.to_ascii_uppercase().as_str() {
1357        "BEFORE" => true,
1358        "AFTER" => false,
1359        _ => {
1360            return Err(ProtocolError::InvalidCommandFrame(
1361                "ERR syntax error".into(),
1362            ))
1363        }
1364    };
1365    let pivot = extract_bytes(&args[2])?;
1366    let value = extract_bytes(&args[3])?;
1367    Ok(Command::LInsert {
1368        key,
1369        before,
1370        pivot,
1371        value,
1372    })
1373}
1374
1375fn parse_lrem(args: &[Frame]) -> Result<Command, ProtocolError> {
1376    if args.len() != 3 {
1377        return Err(wrong_arity("LREM"));
1378    }
1379    let key = extract_string(&args[0])?;
1380    let count = parse_i64(&args[1], "LREM")?;
1381    let value = extract_bytes(&args[2])?;
1382    Ok(Command::LRem { key, count, value })
1383}
1384
1385fn parse_lpos(args: &[Frame]) -> Result<Command, ProtocolError> {
1386    if args.is_empty() {
1387        return Err(wrong_arity("LPOS"));
1388    }
1389    let key = extract_string(&args[0])?;
1390    if args.len() < 2 {
1391        return Err(wrong_arity("LPOS"));
1392    }
1393    let element = extract_bytes(&args[1])?;
1394
1395    let mut rank: i64 = 1;
1396    let mut count: Option<usize> = None;
1397    let mut maxlen: usize = 0;
1398
1399    let mut i = 2;
1400    while i < args.len() {
1401        let opt = extract_string(&args[i])?.to_ascii_uppercase();
1402        match opt.as_str() {
1403            "RANK" => {
1404                i += 1;
1405                if i >= args.len() {
1406                    return Err(ProtocolError::InvalidCommandFrame(
1407                        "ERR syntax error".into(),
1408                    ));
1409                }
1410                rank = parse_i64(&args[i], "LPOS")?;
1411                if rank == 0 {
1412                    return Err(ProtocolError::InvalidCommandFrame(
1413                        "ERR RANK can't be zero: use 1 to start from the first match, 2 from the second ... or use negative values for starting from the end of the list".into(),
1414                    ));
1415                }
1416            }
1417            "COUNT" => {
1418                i += 1;
1419                if i >= args.len() {
1420                    return Err(ProtocolError::InvalidCommandFrame(
1421                        "ERR syntax error".into(),
1422                    ));
1423                }
1424                let n = parse_i64(&args[i], "LPOS")?;
1425                if n < 0 {
1426                    return Err(ProtocolError::InvalidCommandFrame(
1427                        "ERR COUNT can't be negative".into(),
1428                    ));
1429                }
1430                count = Some(n as usize);
1431            }
1432            "MAXLEN" => {
1433                i += 1;
1434                if i >= args.len() {
1435                    return Err(ProtocolError::InvalidCommandFrame(
1436                        "ERR syntax error".into(),
1437                    ));
1438                }
1439                let n = parse_i64(&args[i], "LPOS")?;
1440                if n < 0 {
1441                    return Err(ProtocolError::InvalidCommandFrame(
1442                        "ERR MAXLEN can't be negative".into(),
1443                    ));
1444                }
1445                maxlen = n as usize;
1446            }
1447            _ => {
1448                return Err(ProtocolError::InvalidCommandFrame(
1449                    "ERR syntax error".into(),
1450                ))
1451            }
1452        }
1453        i += 1;
1454    }
1455
1456    Ok(Command::LPos {
1457        key,
1458        element,
1459        rank,
1460        count,
1461        maxlen,
1462    })
1463}
1464
1465/// Extracts the timeout argument for BLPOP/BRPOP. Redis accepts integer or
1466/// float seconds; negative values are an error.
1467fn parse_timeout(frame: &Frame, cmd: &str) -> Result<f64, ProtocolError> {
1468    let bytes = extract_raw_bytes(frame)?;
1469    let s = std::str::from_utf8(bytes).map_err(|_| {
1470        ProtocolError::InvalidCommandFrame(format!(
1471            "timeout is not a float or out of range for '{cmd}'"
1472        ))
1473    })?;
1474    let val: f64 = s.parse().map_err(|_| {
1475        ProtocolError::InvalidCommandFrame(format!(
1476            "timeout is not a float or out of range for '{cmd}'"
1477        ))
1478    })?;
1479    if val < 0.0 {
1480        return Err(ProtocolError::InvalidCommandFrame(format!(
1481            "timeout is negative for '{cmd}'"
1482        )));
1483    }
1484    Ok(val)
1485}
1486
1487fn parse_type(args: &[Frame]) -> Result<Command, ProtocolError> {
1488    if args.len() != 1 {
1489        return Err(wrong_arity("TYPE"));
1490    }
1491    let key = extract_string(&args[0])?;
1492    Ok(Command::Type { key })
1493}
1494
1495/// Parses a string argument as an f64 score.
1496fn parse_f64(frame: &Frame, cmd: &str) -> Result<f64, ProtocolError> {
1497    let bytes = extract_raw_bytes(frame)?;
1498    let s = std::str::from_utf8(bytes).map_err(|_| {
1499        ProtocolError::InvalidCommandFrame(format!("value is not a valid float for '{cmd}'"))
1500    })?;
1501    let v = s.parse::<f64>().map_err(|_| {
1502        ProtocolError::InvalidCommandFrame(format!("value is not a valid float for '{cmd}'"))
1503    })?;
1504    if v.is_nan() || v.is_infinite() {
1505        return Err(ProtocolError::InvalidCommandFrame(format!(
1506            "value is not a valid finite float for '{cmd}'"
1507        )));
1508    }
1509    Ok(v)
1510}
1511
1512fn parse_zadd(args: &[Frame]) -> Result<Command, ProtocolError> {
1513    // ZADD key [NX|XX] [GT|LT] [CH] score member [score member ...]
1514    if args.len() < 3 {
1515        return Err(wrong_arity("ZADD"));
1516    }
1517
1518    let key = extract_string(&args[0])?;
1519    let mut flags = ZAddFlags::default();
1520    let mut idx = 1;
1521
1522    // parse optional flags before score/member pairs
1523    while idx < args.len() {
1524        let mut kw = [0u8; MAX_KEYWORD_LEN];
1525        let s = uppercase_arg(&args[idx], &mut kw)?;
1526        match s {
1527            "NX" => {
1528                flags.nx = true;
1529                idx += 1;
1530            }
1531            "XX" => {
1532                flags.xx = true;
1533                idx += 1;
1534            }
1535            "GT" => {
1536                flags.gt = true;
1537                idx += 1;
1538            }
1539            "LT" => {
1540                flags.lt = true;
1541                idx += 1;
1542            }
1543            "CH" => {
1544                flags.ch = true;
1545                idx += 1;
1546            }
1547            _ => break,
1548        }
1549    }
1550
1551    // NX and XX are mutually exclusive
1552    if flags.nx && flags.xx {
1553        return Err(ProtocolError::InvalidCommandFrame(
1554            "XX and NX options at the same time are not compatible".into(),
1555        ));
1556    }
1557    // GT and LT are mutually exclusive
1558    if flags.gt && flags.lt {
1559        return Err(ProtocolError::InvalidCommandFrame(
1560            "GT and LT options at the same time are not compatible".into(),
1561        ));
1562    }
1563
1564    // remaining args must be score/member pairs
1565    let remaining = &args[idx..];
1566    if remaining.is_empty() || !remaining.len().is_multiple_of(2) {
1567        return Err(wrong_arity("ZADD"));
1568    }
1569
1570    let mut members = Vec::with_capacity(remaining.len() / 2);
1571    for pair in remaining.chunks(2) {
1572        let score = parse_f64(&pair[0], "ZADD")?;
1573        let member = extract_string(&pair[1])?;
1574        members.push((score, member));
1575    }
1576
1577    Ok(Command::ZAdd {
1578        key,
1579        flags,
1580        members,
1581    })
1582}
1583
1584fn parse_zcard(args: &[Frame]) -> Result<Command, ProtocolError> {
1585    if args.len() != 1 {
1586        return Err(wrong_arity("ZCARD"));
1587    }
1588    let key = extract_string(&args[0])?;
1589    Ok(Command::ZCard { key })
1590}
1591
1592fn parse_zrem(args: &[Frame]) -> Result<Command, ProtocolError> {
1593    if args.len() < 2 {
1594        return Err(wrong_arity("ZREM"));
1595    }
1596    let key = extract_string(&args[0])?;
1597    let members = extract_strings(&args[1..])?;
1598    Ok(Command::ZRem { key, members })
1599}
1600
1601fn parse_zscore(args: &[Frame]) -> Result<Command, ProtocolError> {
1602    if args.len() != 2 {
1603        return Err(wrong_arity("ZSCORE"));
1604    }
1605    let key = extract_string(&args[0])?;
1606    let member = extract_string(&args[1])?;
1607    Ok(Command::ZScore { key, member })
1608}
1609
1610fn parse_zrank(args: &[Frame]) -> Result<Command, ProtocolError> {
1611    if args.len() != 2 {
1612        return Err(wrong_arity("ZRANK"));
1613    }
1614    let key = extract_string(&args[0])?;
1615    let member = extract_string(&args[1])?;
1616    Ok(Command::ZRank { key, member })
1617}
1618
1619fn parse_zrange(args: &[Frame]) -> Result<Command, ProtocolError> {
1620    if args.len() < 3 || args.len() > 4 {
1621        return Err(wrong_arity("ZRANGE"));
1622    }
1623    let key = extract_string(&args[0])?;
1624    let start = parse_i64(&args[1], "ZRANGE")?;
1625    let stop = parse_i64(&args[2], "ZRANGE")?;
1626
1627    let with_scores = if args.len() == 4 {
1628        let mut kw = [0u8; MAX_KEYWORD_LEN];
1629        let opt = uppercase_arg(&args[3], &mut kw)?;
1630        if opt != "WITHSCORES" {
1631            return Err(ProtocolError::InvalidCommandFrame(format!(
1632                "unsupported ZRANGE option '{opt}'"
1633            )));
1634        }
1635        true
1636    } else {
1637        false
1638    };
1639
1640    Ok(Command::ZRange {
1641        key,
1642        start,
1643        stop,
1644        with_scores,
1645    })
1646}
1647
1648fn parse_zrevrange(args: &[Frame]) -> Result<Command, ProtocolError> {
1649    if args.len() < 3 || args.len() > 4 {
1650        return Err(wrong_arity("ZREVRANGE"));
1651    }
1652    let key = extract_string(&args[0])?;
1653    let start = parse_i64(&args[1], "ZREVRANGE")?;
1654    let stop = parse_i64(&args[2], "ZREVRANGE")?;
1655
1656    let with_scores = if args.len() == 4 {
1657        let mut kw = [0u8; MAX_KEYWORD_LEN];
1658        let opt = uppercase_arg(&args[3], &mut kw)?;
1659        if opt != "WITHSCORES" {
1660            return Err(ProtocolError::InvalidCommandFrame(format!(
1661                "unsupported ZREVRANGE option '{opt}'"
1662            )));
1663        }
1664        true
1665    } else {
1666        false
1667    };
1668
1669    Ok(Command::ZRevRange {
1670        key,
1671        start,
1672        stop,
1673        with_scores,
1674    })
1675}
1676
1677fn parse_zrevrank(args: &[Frame]) -> Result<Command, ProtocolError> {
1678    if args.len() != 2 {
1679        return Err(wrong_arity("ZREVRANK"));
1680    }
1681    let key = extract_string(&args[0])?;
1682    let member = extract_string(&args[1])?;
1683    Ok(Command::ZRevRank { key, member })
1684}
1685
1686/// Parses a Redis score bound string.
1687///
1688/// Supports `-inf`, `+inf`, `inf`, exclusive `(value`, and plain inclusive values.
1689fn parse_score_bound(frame: &Frame, cmd: &str) -> Result<ScoreBound, ProtocolError> {
1690    let bytes = extract_raw_bytes(frame)?;
1691    let s = std::str::from_utf8(bytes).map_err(|_| {
1692        ProtocolError::InvalidCommandFrame(format!("invalid score bound for '{cmd}'"))
1693    })?;
1694
1695    match s {
1696        "-inf" => Ok(ScoreBound::NegInf),
1697        "+inf" | "inf" => Ok(ScoreBound::PosInf),
1698        _ if s.starts_with('(') => {
1699            let val = s[1..].parse::<f64>().map_err(|_| {
1700                ProtocolError::InvalidCommandFrame(format!("min or max is not a float for '{cmd}'"))
1701            })?;
1702            Ok(ScoreBound::Exclusive(val))
1703        }
1704        _ => {
1705            let val = s.parse::<f64>().map_err(|_| {
1706                ProtocolError::InvalidCommandFrame(format!("min or max is not a float for '{cmd}'"))
1707            })?;
1708            Ok(ScoreBound::Inclusive(val))
1709        }
1710    }
1711}
1712
1713fn parse_zcount(args: &[Frame]) -> Result<Command, ProtocolError> {
1714    if args.len() != 3 {
1715        return Err(wrong_arity("ZCOUNT"));
1716    }
1717    let key = extract_string(&args[0])?;
1718    let min = parse_score_bound(&args[1], "ZCOUNT")?;
1719    let max = parse_score_bound(&args[2], "ZCOUNT")?;
1720    Ok(Command::ZCount { key, min, max })
1721}
1722
1723fn parse_zincrby(args: &[Frame]) -> Result<Command, ProtocolError> {
1724    if args.len() != 3 {
1725        return Err(wrong_arity("ZINCRBY"));
1726    }
1727    let key = extract_string(&args[0])?;
1728    let increment = parse_f64(&args[1], "ZINCRBY")?;
1729    let member = extract_string(&args[2])?;
1730    Ok(Command::ZIncrBy {
1731        key,
1732        increment,
1733        member,
1734    })
1735}
1736
1737/// Parses ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
1738fn parse_zrangebyscore(args: &[Frame]) -> Result<Command, ProtocolError> {
1739    if args.len() < 3 {
1740        return Err(wrong_arity("ZRANGEBYSCORE"));
1741    }
1742    let key = extract_string(&args[0])?;
1743    let min = parse_score_bound(&args[1], "ZRANGEBYSCORE")?;
1744    let max = parse_score_bound(&args[2], "ZRANGEBYSCORE")?;
1745
1746    let mut with_scores = false;
1747    let mut offset = 0usize;
1748    let mut count = None;
1749    let mut idx = 3;
1750
1751    while idx < args.len() {
1752        let mut kw = [0u8; MAX_KEYWORD_LEN];
1753        let opt = uppercase_arg(&args[idx], &mut kw)?;
1754        match opt {
1755            "WITHSCORES" => {
1756                with_scores = true;
1757                idx += 1;
1758            }
1759            "LIMIT" => {
1760                if idx + 2 >= args.len() {
1761                    return Err(wrong_arity("ZRANGEBYSCORE"));
1762                }
1763                offset = parse_i64(&args[idx + 1], "ZRANGEBYSCORE")? as usize;
1764                count = Some(parse_i64(&args[idx + 2], "ZRANGEBYSCORE")? as usize);
1765                idx += 3;
1766            }
1767            _ => {
1768                return Err(ProtocolError::InvalidCommandFrame(format!(
1769                    "unsupported ZRANGEBYSCORE option '{opt}'"
1770                )));
1771            }
1772        }
1773    }
1774
1775    Ok(Command::ZRangeByScore {
1776        key,
1777        min,
1778        max,
1779        with_scores,
1780        offset,
1781        count,
1782    })
1783}
1784
1785/// Parses ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
1786///
1787/// Note: Redis reverses min/max argument order for this command.
1788fn parse_zrevrangebyscore(args: &[Frame]) -> Result<Command, ProtocolError> {
1789    if args.len() < 3 {
1790        return Err(wrong_arity("ZREVRANGEBYSCORE"));
1791    }
1792    let key = extract_string(&args[0])?;
1793    // Redis: ZREVRANGEBYSCORE key max min — the order is reversed
1794    let max = parse_score_bound(&args[1], "ZREVRANGEBYSCORE")?;
1795    let min = parse_score_bound(&args[2], "ZREVRANGEBYSCORE")?;
1796
1797    let mut with_scores = false;
1798    let mut offset = 0usize;
1799    let mut count = None;
1800    let mut idx = 3;
1801
1802    while idx < args.len() {
1803        let mut kw = [0u8; MAX_KEYWORD_LEN];
1804        let opt = uppercase_arg(&args[idx], &mut kw)?;
1805        match opt {
1806            "WITHSCORES" => {
1807                with_scores = true;
1808                idx += 1;
1809            }
1810            "LIMIT" => {
1811                if idx + 2 >= args.len() {
1812                    return Err(wrong_arity("ZREVRANGEBYSCORE"));
1813                }
1814                offset = parse_i64(&args[idx + 1], "ZREVRANGEBYSCORE")? as usize;
1815                count = Some(parse_i64(&args[idx + 2], "ZREVRANGEBYSCORE")? as usize);
1816                idx += 3;
1817            }
1818            _ => {
1819                return Err(ProtocolError::InvalidCommandFrame(format!(
1820                    "unsupported ZREVRANGEBYSCORE option '{opt}'"
1821                )));
1822            }
1823        }
1824    }
1825
1826    Ok(Command::ZRevRangeByScore {
1827        key,
1828        min,
1829        max,
1830        with_scores,
1831        offset,
1832        count,
1833    })
1834}
1835
1836/// Shared argument parsing for ZPOPMIN/ZPOPMAX: key [count]
1837fn parse_zpop_args(args: &[Frame], cmd: &'static str) -> Result<(String, usize), ProtocolError> {
1838    if args.is_empty() || args.len() > 2 {
1839        return Err(wrong_arity(cmd));
1840    }
1841    let key = extract_string(&args[0])?;
1842    let count = if args.len() == 2 {
1843        let c = parse_i64(&args[1], cmd)?;
1844        if c < 0 {
1845            return Err(ProtocolError::InvalidCommandFrame(format!(
1846                "value is out of range for '{cmd}'"
1847            )));
1848        }
1849        c as usize
1850    } else {
1851        1
1852    };
1853    Ok((key, count))
1854}
1855
1856// LMPOP numkeys key [key ...] LEFT|RIGHT [COUNT n]
1857fn parse_lmpop(args: &[Frame]) -> Result<Command, ProtocolError> {
1858    if args.len() < 3 {
1859        return Err(wrong_arity("LMPOP"));
1860    }
1861    let numkeys = parse_u64(&args[0], "LMPOP")? as usize;
1862    if numkeys == 0 || args.len() < 1 + numkeys + 1 {
1863        return Err(ProtocolError::InvalidCommandFrame(
1864            "LMPOP numkeys must match key count".into(),
1865        ));
1866    }
1867    let keys: Vec<String> = args[1..=numkeys]
1868        .iter()
1869        .map(extract_string)
1870        .collect::<Result<_, _>>()?;
1871    let dir = extract_string(&args[numkeys + 1])?.to_ascii_uppercase();
1872    let left = match dir.as_str() {
1873        "LEFT" => true,
1874        "RIGHT" => false,
1875        _ => {
1876            return Err(ProtocolError::InvalidCommandFrame(
1877                "LMPOP: direction must be LEFT or RIGHT".into(),
1878            ))
1879        }
1880    };
1881    let count = if args.len() == numkeys + 4 {
1882        let tag = extract_string(&args[numkeys + 2])?.to_ascii_uppercase();
1883        if tag != "COUNT" {
1884            return Err(ProtocolError::InvalidCommandFrame(
1885                "LMPOP: expected COUNT".into(),
1886            ));
1887        }
1888        let n = parse_u64(&args[numkeys + 3], "LMPOP")? as usize;
1889        if n == 0 {
1890            return Err(ProtocolError::InvalidCommandFrame(
1891                "LMPOP: COUNT must be positive".into(),
1892            ));
1893        }
1894        n
1895    } else if args.len() == numkeys + 2 {
1896        1
1897    } else {
1898        return Err(wrong_arity("LMPOP"));
1899    };
1900    Ok(Command::Lmpop { keys, left, count })
1901}
1902
1903// ZMPOP numkeys key [key ...] MIN|MAX [COUNT n]
1904fn parse_zmpop(args: &[Frame]) -> Result<Command, ProtocolError> {
1905    if args.len() < 3 {
1906        return Err(wrong_arity("ZMPOP"));
1907    }
1908    let numkeys = parse_u64(&args[0], "ZMPOP")? as usize;
1909    if numkeys == 0 || args.len() < 1 + numkeys + 1 {
1910        return Err(ProtocolError::InvalidCommandFrame(
1911            "ZMPOP numkeys must match key count".into(),
1912        ));
1913    }
1914    let keys: Vec<String> = args[1..=numkeys]
1915        .iter()
1916        .map(extract_string)
1917        .collect::<Result<_, _>>()?;
1918    let order = extract_string(&args[numkeys + 1])?.to_ascii_uppercase();
1919    let min = match order.as_str() {
1920        "MIN" => true,
1921        "MAX" => false,
1922        _ => {
1923            return Err(ProtocolError::InvalidCommandFrame(
1924                "ZMPOP: order must be MIN or MAX".into(),
1925            ))
1926        }
1927    };
1928    let count = if args.len() == numkeys + 4 {
1929        let tag = extract_string(&args[numkeys + 2])?.to_ascii_uppercase();
1930        if tag != "COUNT" {
1931            return Err(ProtocolError::InvalidCommandFrame(
1932                "ZMPOP: expected COUNT".into(),
1933            ));
1934        }
1935        let n = parse_u64(&args[numkeys + 3], "ZMPOP")? as usize;
1936        if n == 0 {
1937            return Err(ProtocolError::InvalidCommandFrame(
1938                "ZMPOP: COUNT must be positive".into(),
1939            ));
1940        }
1941        n
1942    } else if args.len() == numkeys + 2 {
1943        1
1944    } else {
1945        return Err(wrong_arity("ZMPOP"));
1946    };
1947    Ok(Command::Zmpop { keys, min, count })
1948}
1949
1950// --- hash commands ---
1951
1952fn parse_hset(args: &[Frame]) -> Result<Command, ProtocolError> {
1953    // HSET key field value [field value ...]
1954    // args = [key, field, value, ...]
1955    // Need at least 3 args, and after key we need pairs (so remaining count must be even)
1956    if args.len() < 3 || !(args.len() - 1).is_multiple_of(2) {
1957        return Err(wrong_arity("HSET"));
1958    }
1959
1960    let key = extract_string(&args[0])?;
1961    let mut fields = Vec::with_capacity((args.len() - 1) / 2);
1962
1963    for chunk in args[1..].chunks(2) {
1964        let field = extract_string(&chunk[0])?;
1965        let value = extract_bytes(&chunk[1])?;
1966        fields.push((field, value));
1967    }
1968
1969    Ok(Command::HSet { key, fields })
1970}
1971
1972fn parse_hget(args: &[Frame]) -> Result<Command, ProtocolError> {
1973    if args.len() != 2 {
1974        return Err(wrong_arity("HGET"));
1975    }
1976    let key = extract_string(&args[0])?;
1977    let field = extract_string(&args[1])?;
1978    Ok(Command::HGet { key, field })
1979}
1980
1981fn parse_hgetall(args: &[Frame]) -> Result<Command, ProtocolError> {
1982    if args.len() != 1 {
1983        return Err(wrong_arity("HGETALL"));
1984    }
1985    let key = extract_string(&args[0])?;
1986    Ok(Command::HGetAll { key })
1987}
1988
1989fn parse_hdel(args: &[Frame]) -> Result<Command, ProtocolError> {
1990    if args.len() < 2 {
1991        return Err(wrong_arity("HDEL"));
1992    }
1993    let key = extract_string(&args[0])?;
1994    let fields = extract_strings(&args[1..])?;
1995    Ok(Command::HDel { key, fields })
1996}
1997
1998fn parse_hexists(args: &[Frame]) -> Result<Command, ProtocolError> {
1999    if args.len() != 2 {
2000        return Err(wrong_arity("HEXISTS"));
2001    }
2002    let key = extract_string(&args[0])?;
2003    let field = extract_string(&args[1])?;
2004    Ok(Command::HExists { key, field })
2005}
2006
2007fn parse_hlen(args: &[Frame]) -> Result<Command, ProtocolError> {
2008    if args.len() != 1 {
2009        return Err(wrong_arity("HLEN"));
2010    }
2011    let key = extract_string(&args[0])?;
2012    Ok(Command::HLen { key })
2013}
2014
2015fn parse_hincrby(args: &[Frame]) -> Result<Command, ProtocolError> {
2016    if args.len() != 3 {
2017        return Err(wrong_arity("HINCRBY"));
2018    }
2019    let key = extract_string(&args[0])?;
2020    let field = extract_string(&args[1])?;
2021    let delta = parse_i64(&args[2], "HINCRBY")?;
2022    Ok(Command::HIncrBy { key, field, delta })
2023}
2024
2025fn parse_hincrbyfloat(args: &[Frame]) -> Result<Command, ProtocolError> {
2026    if args.len() != 3 {
2027        return Err(wrong_arity("HINCRBYFLOAT"));
2028    }
2029    let key = extract_string(&args[0])?;
2030    let field = extract_string(&args[1])?;
2031    let delta = parse_f64(&args[2], "HINCRBYFLOAT")?;
2032    Ok(Command::HIncrByFloat { key, field, delta })
2033}
2034
2035fn parse_hkeys(args: &[Frame]) -> Result<Command, ProtocolError> {
2036    if args.len() != 1 {
2037        return Err(wrong_arity("HKEYS"));
2038    }
2039    let key = extract_string(&args[0])?;
2040    Ok(Command::HKeys { key })
2041}
2042
2043fn parse_hvals(args: &[Frame]) -> Result<Command, ProtocolError> {
2044    if args.len() != 1 {
2045        return Err(wrong_arity("HVALS"));
2046    }
2047    let key = extract_string(&args[0])?;
2048    Ok(Command::HVals { key })
2049}
2050
2051fn parse_hmget(args: &[Frame]) -> Result<Command, ProtocolError> {
2052    if args.len() < 2 {
2053        return Err(wrong_arity("HMGET"));
2054    }
2055    let key = extract_string(&args[0])?;
2056    let fields = extract_strings(&args[1..])?;
2057    Ok(Command::HMGet { key, fields })
2058}
2059
2060fn parse_hrandfield(args: &[Frame]) -> Result<Command, ProtocolError> {
2061    if args.is_empty() {
2062        return Err(wrong_arity("HRANDFIELD"));
2063    }
2064    let key = extract_string(&args[0])?;
2065    let (count, with_values) = match args.len() {
2066        1 => (None, false),
2067        2 => (Some(parse_i64(&args[1], "HRANDFIELD")?), false),
2068        3 => {
2069            let count = parse_i64(&args[1], "HRANDFIELD")?;
2070            let flag = extract_string(&args[2])?.to_ascii_uppercase();
2071            if flag != "WITHVALUES" {
2072                return Err(ProtocolError::InvalidCommandFrame(
2073                    "HRANDFIELD: expected WITHVALUES".into(),
2074                ));
2075            }
2076            (Some(count), true)
2077        }
2078        _ => return Err(wrong_arity("HRANDFIELD")),
2079    };
2080    Ok(Command::HRandField {
2081        key,
2082        count,
2083        with_values,
2084    })
2085}
2086
2087// --- set commands ---
2088
2089fn parse_sadd(args: &[Frame]) -> Result<Command, ProtocolError> {
2090    if args.len() < 2 {
2091        return Err(wrong_arity("SADD"));
2092    }
2093    let key = extract_string(&args[0])?;
2094    let members = extract_strings(&args[1..])?;
2095    Ok(Command::SAdd { key, members })
2096}
2097
2098fn parse_srem(args: &[Frame]) -> Result<Command, ProtocolError> {
2099    if args.len() < 2 {
2100        return Err(wrong_arity("SREM"));
2101    }
2102    let key = extract_string(&args[0])?;
2103    let members = extract_strings(&args[1..])?;
2104    Ok(Command::SRem { key, members })
2105}
2106
2107fn parse_smembers(args: &[Frame]) -> Result<Command, ProtocolError> {
2108    if args.len() != 1 {
2109        return Err(wrong_arity("SMEMBERS"));
2110    }
2111    let key = extract_string(&args[0])?;
2112    Ok(Command::SMembers { key })
2113}
2114
2115fn parse_sismember(args: &[Frame]) -> Result<Command, ProtocolError> {
2116    if args.len() != 2 {
2117        return Err(wrong_arity("SISMEMBER"));
2118    }
2119    let key = extract_string(&args[0])?;
2120    let member = extract_string(&args[1])?;
2121    Ok(Command::SIsMember { key, member })
2122}
2123
2124fn parse_scard(args: &[Frame]) -> Result<Command, ProtocolError> {
2125    if args.len() != 1 {
2126        return Err(wrong_arity("SCARD"));
2127    }
2128    let key = extract_string(&args[0])?;
2129    Ok(Command::SCard { key })
2130}
2131
2132fn parse_multi_key_set(cmd: &'static str, args: &[Frame]) -> Result<Command, ProtocolError> {
2133    if args.is_empty() {
2134        return Err(wrong_arity(cmd));
2135    }
2136    let keys = extract_strings(args)?;
2137    match cmd {
2138        "SUNION" => Ok(Command::SUnion { keys }),
2139        "SINTER" => Ok(Command::SInter { keys }),
2140        "SDIFF" => Ok(Command::SDiff { keys }),
2141        _ => Err(wrong_arity(cmd)),
2142    }
2143}
2144
2145fn parse_store_set(cmd: &'static str, args: &[Frame]) -> Result<Command, ProtocolError> {
2146    if args.len() < 2 {
2147        return Err(wrong_arity(cmd));
2148    }
2149    let dest = extract_string(&args[0])?;
2150    let keys = extract_strings(&args[1..])?;
2151    match cmd {
2152        "SUNIONSTORE" => Ok(Command::SUnionStore { dest, keys }),
2153        "SINTERSTORE" => Ok(Command::SInterStore { dest, keys }),
2154        "SDIFFSTORE" => Ok(Command::SDiffStore { dest, keys }),
2155        _ => Err(wrong_arity(cmd)),
2156    }
2157}
2158
2159fn parse_srandmember(args: &[Frame]) -> Result<Command, ProtocolError> {
2160    if args.is_empty() || args.len() > 2 {
2161        return Err(wrong_arity("SRANDMEMBER"));
2162    }
2163    let key = extract_string(&args[0])?;
2164    let count = if args.len() == 2 {
2165        let s = extract_string(&args[1])?;
2166        let n: i64 = s.parse().map_err(|_| {
2167            ProtocolError::InvalidCommandFrame("ERR value is not an integer or out of range".into())
2168        })?;
2169        Some(n)
2170    } else {
2171        None
2172    };
2173    Ok(Command::SRandMember { key, count })
2174}
2175
2176fn parse_spop(args: &[Frame]) -> Result<Command, ProtocolError> {
2177    if args.is_empty() || args.len() > 2 {
2178        return Err(wrong_arity("SPOP"));
2179    }
2180    let key = extract_string(&args[0])?;
2181    let count = if args.len() == 2 {
2182        let s = extract_string(&args[1])?;
2183        let n: i64 = s.parse().map_err(|_| {
2184            ProtocolError::InvalidCommandFrame("ERR value is not an integer or out of range".into())
2185        })?;
2186        if n < 0 {
2187            return Err(ProtocolError::InvalidCommandFrame(
2188                "ERR value is not an integer or out of range".into(),
2189            ));
2190        }
2191        n as usize
2192    } else {
2193        1
2194    };
2195    Ok(Command::SPop { key, count })
2196}
2197
2198fn parse_smismember(args: &[Frame]) -> Result<Command, ProtocolError> {
2199    if args.len() < 2 {
2200        return Err(wrong_arity("SMISMEMBER"));
2201    }
2202    let key = extract_string(&args[0])?;
2203    let members = extract_strings(&args[1..])?;
2204    Ok(Command::SMisMember { key, members })
2205}
2206
2207// --- cluster commands ---
2208
2209fn parse_cluster(args: &[Frame]) -> Result<Command, ProtocolError> {
2210    if args.is_empty() {
2211        return Err(wrong_arity("CLUSTER"));
2212    }
2213
2214    let mut kw = [0u8; MAX_KEYWORD_LEN];
2215    let subcommand = uppercase_arg(&args[0], &mut kw)?;
2216    match subcommand {
2217        "INFO" => {
2218            if args.len() != 1 {
2219                return Err(wrong_arity("CLUSTER INFO"));
2220            }
2221            Ok(Command::ClusterInfo)
2222        }
2223        "NODES" => {
2224            if args.len() != 1 {
2225                return Err(wrong_arity("CLUSTER NODES"));
2226            }
2227            Ok(Command::ClusterNodes)
2228        }
2229        "SLOTS" => {
2230            if args.len() != 1 {
2231                return Err(wrong_arity("CLUSTER SLOTS"));
2232            }
2233            Ok(Command::ClusterSlots)
2234        }
2235        "KEYSLOT" => {
2236            if args.len() != 2 {
2237                return Err(wrong_arity("CLUSTER KEYSLOT"));
2238            }
2239            let key = extract_string(&args[1])?;
2240            Ok(Command::ClusterKeySlot { key })
2241        }
2242        "MYID" => {
2243            if args.len() != 1 {
2244                return Err(wrong_arity("CLUSTER MYID"));
2245            }
2246            Ok(Command::ClusterMyId)
2247        }
2248        "SETSLOT" => parse_cluster_setslot(&args[1..]),
2249        "MEET" => {
2250            if args.len() != 3 {
2251                return Err(wrong_arity("CLUSTER MEET"));
2252            }
2253            let ip = extract_string(&args[1])?;
2254            let p = parse_u64(&args[2], "CLUSTER MEET")?;
2255            let port = u16::try_from(p)
2256                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid port number".into()))?;
2257            Ok(Command::ClusterMeet { ip, port })
2258        }
2259        "ADDSLOTS" => {
2260            if args.len() < 2 {
2261                return Err(wrong_arity("CLUSTER ADDSLOTS"));
2262            }
2263            let slots = parse_slot_list(&args[1..])?;
2264            Ok(Command::ClusterAddSlots { slots })
2265        }
2266        "ADDSLOTSRANGE" => {
2267            // arguments are pairs: start1 end1 [start2 end2 ...]
2268            if args.len() < 3 || !(args.len() - 1).is_multiple_of(2) {
2269                return Err(wrong_arity("CLUSTER ADDSLOTSRANGE"));
2270            }
2271            let mut ranges = Vec::new();
2272            for pair in args[1..].chunks(2) {
2273                let s = parse_u64(&pair[0], "CLUSTER ADDSLOTSRANGE")?;
2274                let start = u16::try_from(s)
2275                    .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot".into()))?;
2276                let e = parse_u64(&pair[1], "CLUSTER ADDSLOTSRANGE")?;
2277                let end = u16::try_from(e)
2278                    .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot".into()))?;
2279                if start > end || end >= 16384 {
2280                    return Err(ProtocolError::InvalidCommandFrame(
2281                        "invalid slot range: start must be <= end and both must be 0-16383".into(),
2282                    ));
2283                }
2284                ranges.push((start, end));
2285            }
2286            Ok(Command::ClusterAddSlotsRange { ranges })
2287        }
2288        "DELSLOTS" => {
2289            if args.len() < 2 {
2290                return Err(wrong_arity("CLUSTER DELSLOTS"));
2291            }
2292            let slots = parse_slot_list(&args[1..])?;
2293            Ok(Command::ClusterDelSlots { slots })
2294        }
2295        "FORGET" => {
2296            if args.len() != 2 {
2297                return Err(wrong_arity("CLUSTER FORGET"));
2298            }
2299            let node_id = extract_string(&args[1])?;
2300            Ok(Command::ClusterForget { node_id })
2301        }
2302        "REPLICATE" => {
2303            if args.len() != 2 {
2304                return Err(wrong_arity("CLUSTER REPLICATE"));
2305            }
2306            let node_id = extract_string(&args[1])?;
2307            Ok(Command::ClusterReplicate { node_id })
2308        }
2309        "FAILOVER" => {
2310            let mut force = false;
2311            let mut takeover = false;
2312            for arg in &args[1..] {
2313                let mut kw2 = [0u8; MAX_KEYWORD_LEN];
2314                let opt = uppercase_arg(arg, &mut kw2)?;
2315                match opt {
2316                    "FORCE" => force = true,
2317                    "TAKEOVER" => takeover = true,
2318                    _ => {
2319                        return Err(ProtocolError::InvalidCommandFrame(format!(
2320                            "unknown CLUSTER FAILOVER option '{opt}'"
2321                        )))
2322                    }
2323                }
2324            }
2325            Ok(Command::ClusterFailover { force, takeover })
2326        }
2327        "COUNTKEYSINSLOT" => {
2328            if args.len() != 2 {
2329                return Err(wrong_arity("CLUSTER COUNTKEYSINSLOT"));
2330            }
2331            let n = parse_u64(&args[1], "CLUSTER COUNTKEYSINSLOT")?;
2332            let slot = u16::try_from(n)
2333                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
2334            Ok(Command::ClusterCountKeysInSlot { slot })
2335        }
2336        "GETKEYSINSLOT" => {
2337            if args.len() != 3 {
2338                return Err(wrong_arity("CLUSTER GETKEYSINSLOT"));
2339            }
2340            let n = parse_u64(&args[1], "CLUSTER GETKEYSINSLOT")?;
2341            let slot = u16::try_from(n)
2342                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
2343            let c = parse_u64(&args[2], "CLUSTER GETKEYSINSLOT")?;
2344            let count = u32::try_from(c)
2345                .map_err(|_| ProtocolError::InvalidCommandFrame("invalid count".into()))?;
2346            Ok(Command::ClusterGetKeysInSlot { slot, count })
2347        }
2348        _ => Err(ProtocolError::InvalidCommandFrame(format!(
2349            "unknown CLUSTER subcommand '{subcommand}'"
2350        ))),
2351    }
2352}
2353
2354fn parse_asking(args: &[Frame]) -> Result<Command, ProtocolError> {
2355    if !args.is_empty() {
2356        return Err(wrong_arity("ASKING"));
2357    }
2358    Ok(Command::Asking)
2359}
2360
2361fn parse_watch(args: &[Frame]) -> Result<Command, ProtocolError> {
2362    if args.is_empty() {
2363        return Err(wrong_arity("WATCH"));
2364    }
2365    let keys = extract_strings(args)?;
2366    Ok(Command::Watch { keys })
2367}
2368
2369fn parse_no_args(
2370    name: &'static str,
2371    args: &[Frame],
2372    cmd: Command,
2373) -> Result<Command, ProtocolError> {
2374    if !args.is_empty() {
2375        return Err(wrong_arity(name));
2376    }
2377    Ok(cmd)
2378}
2379
2380fn parse_acl(args: &[Frame]) -> Result<Command, ProtocolError> {
2381    if args.is_empty() {
2382        return Err(wrong_arity("ACL"));
2383    }
2384
2385    let mut kw = [0u8; MAX_KEYWORD_LEN];
2386    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2387    match subcmd {
2388        "WHOAMI" => {
2389            if args.len() != 1 {
2390                return Err(wrong_arity("ACL|WHOAMI"));
2391            }
2392            Ok(Command::AclWhoAmI)
2393        }
2394        "LIST" => {
2395            if args.len() != 1 {
2396                return Err(wrong_arity("ACL|LIST"));
2397            }
2398            Ok(Command::AclList)
2399        }
2400        "USERS" => {
2401            if args.len() != 1 {
2402                return Err(wrong_arity("ACL|USERS"));
2403            }
2404            Ok(Command::AclUsers)
2405        }
2406        "GETUSER" => {
2407            if args.len() != 2 {
2408                return Err(wrong_arity("ACL|GETUSER"));
2409            }
2410            let username = extract_string(&args[1])?;
2411            Ok(Command::AclGetUser { username })
2412        }
2413        "DELUSER" => {
2414            if args.len() < 2 {
2415                return Err(wrong_arity("ACL|DELUSER"));
2416            }
2417            let usernames = extract_strings(&args[1..])?;
2418            Ok(Command::AclDelUser { usernames })
2419        }
2420        "SETUSER" => {
2421            if args.len() < 2 {
2422                return Err(wrong_arity("ACL|SETUSER"));
2423            }
2424            let username = extract_string(&args[1])?;
2425            let rules = if args.len() > 2 {
2426                extract_strings(&args[2..])?
2427            } else {
2428                Vec::new()
2429            };
2430            Ok(Command::AclSetUser { username, rules })
2431        }
2432        "CAT" => {
2433            if args.len() > 2 {
2434                return Err(wrong_arity("ACL|CAT"));
2435            }
2436            let category = if args.len() == 2 {
2437                Some(extract_string(&args[1])?)
2438            } else {
2439                None
2440            };
2441            Ok(Command::AclCat { category })
2442        }
2443        other => Err(ProtocolError::InvalidCommandFrame(format!(
2444            "unknown ACL subcommand '{other}'"
2445        ))),
2446    }
2447}
2448
2449fn parse_config(args: &[Frame]) -> Result<Command, ProtocolError> {
2450    if args.is_empty() {
2451        return Err(wrong_arity("CONFIG"));
2452    }
2453
2454    let mut kw = [0u8; MAX_KEYWORD_LEN];
2455    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2456    match subcmd {
2457        "GET" => {
2458            if args.len() != 2 {
2459                return Err(wrong_arity("CONFIG|GET"));
2460            }
2461            let pattern = extract_string(&args[1])?;
2462            Ok(Command::ConfigGet { pattern })
2463        }
2464        "SET" => {
2465            if args.len() != 3 {
2466                return Err(wrong_arity("CONFIG|SET"));
2467            }
2468            let param = extract_string(&args[1])?;
2469            let value = extract_string(&args[2])?;
2470            Ok(Command::ConfigSet { param, value })
2471        }
2472        "REWRITE" => {
2473            if args.len() != 1 {
2474                return Err(wrong_arity("CONFIG|REWRITE"));
2475            }
2476            Ok(Command::ConfigRewrite)
2477        }
2478        other => Err(ProtocolError::InvalidCommandFrame(format!(
2479            "unknown CONFIG subcommand '{other}'"
2480        ))),
2481    }
2482}
2483
2484fn parse_slowlog(args: &[Frame]) -> Result<Command, ProtocolError> {
2485    if args.is_empty() {
2486        return Err(wrong_arity("SLOWLOG"));
2487    }
2488
2489    let mut kw = [0u8; MAX_KEYWORD_LEN];
2490    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2491    match subcmd {
2492        "GET" => {
2493            let count = if args.len() > 1 {
2494                Some(parse_u64(&args[1], "SLOWLOG")? as usize)
2495            } else {
2496                None
2497            };
2498            Ok(Command::SlowLogGet { count })
2499        }
2500        "LEN" => Ok(Command::SlowLogLen),
2501        "RESET" => Ok(Command::SlowLogReset),
2502        other => Err(ProtocolError::InvalidCommandFrame(format!(
2503            "unknown SLOWLOG subcommand '{other}'"
2504        ))),
2505    }
2506}
2507
2508fn parse_client(args: &[Frame]) -> Result<Command, ProtocolError> {
2509    if args.is_empty() {
2510        return Err(wrong_arity("CLIENT"));
2511    }
2512
2513    let mut kw = [0u8; MAX_KEYWORD_LEN];
2514    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2515    match subcmd {
2516        "ID" => Ok(Command::ClientId),
2517        "GETNAME" => Ok(Command::ClientGetName),
2518        "LIST" => Ok(Command::ClientList),
2519        "SETNAME" => {
2520            if args.len() < 2 {
2521                return Err(wrong_arity("CLIENT SETNAME"));
2522            }
2523            let name = extract_string(&args[1])?;
2524            Ok(Command::ClientSetName { name })
2525        }
2526        other => Err(ProtocolError::InvalidCommandFrame(format!(
2527            "unknown CLIENT subcommand '{other}'"
2528        ))),
2529    }
2530}
2531
2532fn parse_object(args: &[Frame]) -> Result<Command, ProtocolError> {
2533    if args.is_empty() {
2534        return Err(wrong_arity("OBJECT"));
2535    }
2536
2537    let mut kw = [0u8; MAX_KEYWORD_LEN];
2538    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2539    match subcmd {
2540        "ENCODING" => {
2541            if args.len() != 2 {
2542                return Err(wrong_arity("OBJECT|ENCODING"));
2543            }
2544            let key = extract_string(&args[1])?;
2545            Ok(Command::ObjectEncoding { key })
2546        }
2547        "REFCOUNT" => {
2548            if args.len() != 2 {
2549                return Err(wrong_arity("OBJECT|REFCOUNT"));
2550            }
2551            let key = extract_string(&args[1])?;
2552            Ok(Command::ObjectRefcount { key })
2553        }
2554        other => Err(ProtocolError::InvalidCommandFrame(format!(
2555            "unknown OBJECT subcommand '{other}'"
2556        ))),
2557    }
2558}
2559
2560fn parse_copy(args: &[Frame]) -> Result<Command, ProtocolError> {
2561    if args.len() < 2 {
2562        return Err(wrong_arity("COPY"));
2563    }
2564    let source = extract_string(&args[0])?;
2565    let destination = extract_string(&args[1])?;
2566
2567    let mut replace = false;
2568    let mut i = 2;
2569    while i < args.len() {
2570        let mut kw = [0u8; MAX_KEYWORD_LEN];
2571        let arg = uppercase_arg(&args[i], &mut kw)?;
2572        match arg {
2573            "REPLACE" => replace = true,
2574            "DB" => {
2575                // consume and ignore the DB argument (single-db server)
2576                i += 1;
2577                if i >= args.len() {
2578                    return Err(wrong_arity("COPY"));
2579                }
2580            }
2581            _ => {
2582                return Err(ProtocolError::InvalidCommandFrame(format!(
2583                    "unsupported COPY option '{arg}'"
2584                )));
2585            }
2586        }
2587        i += 1;
2588    }
2589
2590    Ok(Command::Copy {
2591        source,
2592        destination,
2593        replace,
2594    })
2595}
2596
2597fn parse_slot_list(args: &[Frame]) -> Result<Vec<u16>, ProtocolError> {
2598    let mut slots = Vec::with_capacity(args.len());
2599    for arg in args {
2600        let n = parse_u64(arg, "CLUSTER")?;
2601        let slot = u16::try_from(n)
2602            .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
2603        if slot >= 16384 {
2604            return Err(ProtocolError::InvalidCommandFrame(format!(
2605                "invalid slot {slot}: must be 0-16383"
2606            )));
2607        }
2608        slots.push(slot);
2609    }
2610    Ok(slots)
2611}
2612
2613fn parse_cluster_setslot(args: &[Frame]) -> Result<Command, ProtocolError> {
2614    if args.is_empty() {
2615        return Err(wrong_arity("CLUSTER SETSLOT"));
2616    }
2617
2618    let n = parse_u64(&args[0], "CLUSTER SETSLOT")?;
2619    let slot = u16::try_from(n)
2620        .map_err(|_| ProtocolError::InvalidCommandFrame("invalid slot number".into()))?;
2621    if slot >= 16384 {
2622        return Err(ProtocolError::InvalidCommandFrame(format!(
2623            "invalid slot {slot}: must be 0-16383"
2624        )));
2625    }
2626
2627    if args.len() < 2 {
2628        return Err(wrong_arity("CLUSTER SETSLOT"));
2629    }
2630
2631    let mut kw = [0u8; MAX_KEYWORD_LEN];
2632    let action = uppercase_arg(&args[1], &mut kw)?;
2633    match action {
2634        "IMPORTING" => {
2635            if args.len() != 3 {
2636                return Err(ProtocolError::WrongArity(
2637                    "CLUSTER SETSLOT IMPORTING".into(),
2638                ));
2639            }
2640            let node_id = extract_string(&args[2])?;
2641            Ok(Command::ClusterSetSlotImporting { slot, node_id })
2642        }
2643        "MIGRATING" => {
2644            if args.len() != 3 {
2645                return Err(ProtocolError::WrongArity(
2646                    "CLUSTER SETSLOT MIGRATING".into(),
2647                ));
2648            }
2649            let node_id = extract_string(&args[2])?;
2650            Ok(Command::ClusterSetSlotMigrating { slot, node_id })
2651        }
2652        "NODE" => {
2653            if args.len() != 3 {
2654                return Err(wrong_arity("CLUSTER SETSLOT NODE"));
2655            }
2656            let node_id = extract_string(&args[2])?;
2657            Ok(Command::ClusterSetSlotNode { slot, node_id })
2658        }
2659        "STABLE" => {
2660            if args.len() != 2 {
2661                return Err(wrong_arity("CLUSTER SETSLOT STABLE"));
2662            }
2663            Ok(Command::ClusterSetSlotStable { slot })
2664        }
2665        _ => Err(ProtocolError::InvalidCommandFrame(format!(
2666            "unknown CLUSTER SETSLOT action '{action}'"
2667        ))),
2668    }
2669}
2670
2671fn parse_migrate(args: &[Frame]) -> Result<Command, ProtocolError> {
2672    // MIGRATE host port key db timeout [COPY] [REPLACE]
2673    if args.len() < 5 {
2674        return Err(wrong_arity("MIGRATE"));
2675    }
2676
2677    let host = extract_string(&args[0])?;
2678    let p = parse_u64(&args[1], "MIGRATE")?;
2679    let port = u16::try_from(p)
2680        .map_err(|_| ProtocolError::InvalidCommandFrame("invalid port number".into()))?;
2681    let key = extract_string(&args[2])?;
2682    let d = parse_u64(&args[3], "MIGRATE")?;
2683    let db = u32::try_from(d)
2684        .map_err(|_| ProtocolError::InvalidCommandFrame("invalid db number".into()))?;
2685    let timeout_ms = parse_u64(&args[4], "MIGRATE")?;
2686
2687    let mut copy = false;
2688    let mut replace = false;
2689
2690    for arg in &args[5..] {
2691        let mut kw = [0u8; MAX_KEYWORD_LEN];
2692        let opt = uppercase_arg(arg, &mut kw)?;
2693        match opt {
2694            "COPY" => copy = true,
2695            "REPLACE" => replace = true,
2696            _ => {
2697                return Err(ProtocolError::InvalidCommandFrame(format!(
2698                    "unknown MIGRATE option '{opt}'"
2699                )))
2700            }
2701        }
2702    }
2703
2704    Ok(Command::Migrate {
2705        host,
2706        port,
2707        key,
2708        db,
2709        timeout_ms,
2710        copy,
2711        replace,
2712    })
2713}
2714
2715fn parse_restore(args: &[Frame]) -> Result<Command, ProtocolError> {
2716    // RESTORE key ttl serialized-value [REPLACE]
2717    if args.len() < 3 {
2718        return Err(wrong_arity("RESTORE"));
2719    }
2720
2721    let key = extract_string(&args[0])?;
2722    let ttl_ms = parse_u64(&args[1], "RESTORE")?;
2723    let data = extract_bytes(&args[2])?;
2724
2725    let mut replace = false;
2726    for arg in &args[3..] {
2727        let mut kw = [0u8; MAX_KEYWORD_LEN];
2728        let opt = uppercase_arg(arg, &mut kw)?;
2729        if opt == "REPLACE" {
2730            replace = true;
2731        } else {
2732            return Err(ProtocolError::InvalidCommandFrame(format!(
2733                "unknown RESTORE option '{opt}'"
2734            )));
2735        }
2736    }
2737
2738    Ok(Command::Restore {
2739        key,
2740        ttl_ms,
2741        data,
2742        replace,
2743    })
2744}
2745
2746fn parse_subscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2747    if args.is_empty() {
2748        return Err(wrong_arity("SUBSCRIBE"));
2749    }
2750    let channels: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2751    Ok(Command::Subscribe { channels })
2752}
2753
2754fn parse_unsubscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2755    let channels: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2756    Ok(Command::Unsubscribe { channels })
2757}
2758
2759fn parse_psubscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2760    if args.is_empty() {
2761        return Err(wrong_arity("PSUBSCRIBE"));
2762    }
2763    let patterns: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2764    Ok(Command::PSubscribe { patterns })
2765}
2766
2767fn parse_punsubscribe(args: &[Frame]) -> Result<Command, ProtocolError> {
2768    let patterns: Vec<String> = args.iter().map(extract_string).collect::<Result<_, _>>()?;
2769    Ok(Command::PUnsubscribe { patterns })
2770}
2771
2772fn parse_publish(args: &[Frame]) -> Result<Command, ProtocolError> {
2773    if args.len() != 2 {
2774        return Err(wrong_arity("PUBLISH"));
2775    }
2776    let channel = extract_string(&args[0])?;
2777    let message = extract_bytes(&args[1])?;
2778    Ok(Command::Publish { channel, message })
2779}
2780
2781fn parse_pubsub(args: &[Frame]) -> Result<Command, ProtocolError> {
2782    if args.is_empty() {
2783        return Err(wrong_arity("PUBSUB"));
2784    }
2785
2786    let mut kw = [0u8; MAX_KEYWORD_LEN];
2787    let subcmd = uppercase_arg(&args[0], &mut kw)?;
2788    match subcmd {
2789        "CHANNELS" => {
2790            let pattern = if args.len() > 1 {
2791                Some(extract_string(&args[1])?)
2792            } else {
2793                None
2794            };
2795            Ok(Command::PubSubChannels { pattern })
2796        }
2797        "NUMSUB" => {
2798            let channels: Vec<String> = args[1..]
2799                .iter()
2800                .map(extract_string)
2801                .collect::<Result<_, _>>()?;
2802            Ok(Command::PubSubNumSub { channels })
2803        }
2804        "NUMPAT" => Ok(Command::PubSubNumPat),
2805        other => Err(ProtocolError::InvalidCommandFrame(format!(
2806            "unknown PUBSUB subcommand '{other}'"
2807        ))),
2808    }
2809}
2810
2811// --- vector command parsers ---
2812
2813/// Parses METRIC / QUANT / M / EF flags from a slice of command arguments.
2814///
2815/// Returns `(metric, quantization, connectivity, expansion_add)`.
2816/// `cmd` is used in error messages (e.g., "VADD" or "VADD_BATCH").
2817fn parse_vector_flags(
2818    args: &[Frame],
2819    cmd: &'static str,
2820) -> Result<(u8, u8, u32, u32), ProtocolError> {
2821    let mut metric: u8 = 0; // cosine default
2822    let mut quantization: u8 = 0; // f32 default
2823    let mut connectivity: u32 = 16;
2824    let mut expansion_add: u32 = 64;
2825    let mut idx = 0;
2826
2827    while idx < args.len() {
2828        let mut kw = [0u8; MAX_KEYWORD_LEN];
2829        let flag = uppercase_arg(&args[idx], &mut kw)?;
2830        match flag {
2831            "METRIC" => {
2832                idx += 1;
2833                if idx >= args.len() {
2834                    return Err(ProtocolError::InvalidCommandFrame(format!(
2835                        "{cmd}: METRIC requires a value"
2836                    )));
2837                }
2838                let mut kw2 = [0u8; MAX_KEYWORD_LEN];
2839                let val = uppercase_arg(&args[idx], &mut kw2)?;
2840                metric = match val {
2841                    "COSINE" => 0,
2842                    "L2" => 1,
2843                    "IP" => 2,
2844                    _ => {
2845                        return Err(ProtocolError::InvalidCommandFrame(format!(
2846                            "{cmd}: unknown metric '{val}'"
2847                        )))
2848                    }
2849                };
2850                idx += 1;
2851            }
2852            "QUANT" => {
2853                idx += 1;
2854                if idx >= args.len() {
2855                    return Err(ProtocolError::InvalidCommandFrame(format!(
2856                        "{cmd}: QUANT requires a value"
2857                    )));
2858                }
2859                let mut kw2 = [0u8; MAX_KEYWORD_LEN];
2860                let val = uppercase_arg(&args[idx], &mut kw2)?;
2861                quantization = match val {
2862                    "F32" => 0,
2863                    "F16" => 1,
2864                    "I8" | "Q8" => 2,
2865                    _ => {
2866                        return Err(ProtocolError::InvalidCommandFrame(format!(
2867                            "{cmd}: unknown quantization '{val}'"
2868                        )))
2869                    }
2870                };
2871                idx += 1;
2872            }
2873            "M" => {
2874                idx += 1;
2875                if idx >= args.len() {
2876                    return Err(ProtocolError::InvalidCommandFrame(format!(
2877                        "{cmd}: M requires a value"
2878                    )));
2879                }
2880                let m = parse_u64(&args[idx], cmd)?;
2881                if m > MAX_HNSW_PARAM {
2882                    return Err(ProtocolError::InvalidCommandFrame(format!(
2883                        "{cmd}: M value {m} exceeds max {MAX_HNSW_PARAM}"
2884                    )));
2885                }
2886                connectivity = m as u32;
2887                idx += 1;
2888            }
2889            "EF" => {
2890                idx += 1;
2891                if idx >= args.len() {
2892                    return Err(ProtocolError::InvalidCommandFrame(format!(
2893                        "{cmd}: EF requires a value"
2894                    )));
2895                }
2896                let ef = parse_u64(&args[idx], cmd)?;
2897                if ef > MAX_HNSW_PARAM {
2898                    return Err(ProtocolError::InvalidCommandFrame(format!(
2899                        "{cmd}: EF value {ef} exceeds max {MAX_HNSW_PARAM}"
2900                    )));
2901                }
2902                expansion_add = ef as u32;
2903                idx += 1;
2904            }
2905            _ => {
2906                return Err(ProtocolError::InvalidCommandFrame(format!(
2907                    "{cmd}: unexpected argument '{flag}'"
2908                )));
2909            }
2910        }
2911    }
2912
2913    Ok((metric, quantization, connectivity, expansion_add))
2914}
2915
2916/// VADD key element f32 [f32 ...] [METRIC COSINE|L2|IP] [QUANT F32|F16|I8] [M n] [EF n]
2917fn parse_vadd(args: &[Frame]) -> Result<Command, ProtocolError> {
2918    // minimum: key + element + at least one float
2919    if args.len() < 3 {
2920        return Err(wrong_arity("VADD"));
2921    }
2922
2923    let key = extract_string(&args[0])?;
2924    let element = extract_string(&args[1])?;
2925
2926    // parse vector values until we hit a non-numeric argument, end, or dim limit
2927    let mut idx = 2;
2928    let mut vector = Vec::new();
2929    while idx < args.len() {
2930        if vector.len() >= MAX_VECTOR_DIMS {
2931            return Err(ProtocolError::InvalidCommandFrame(format!(
2932                "VADD: vector exceeds {MAX_VECTOR_DIMS} dimensions"
2933            )));
2934        }
2935        let s = extract_string(&args[idx])?;
2936        if let Ok(v) = s.parse::<f32>() {
2937            if v.is_nan() || v.is_infinite() {
2938                return Err(ProtocolError::InvalidCommandFrame(
2939                    "VADD: vector components must be finite (no NaN/infinity)".into(),
2940                ));
2941            }
2942            vector.push(v);
2943            idx += 1;
2944        } else {
2945            break;
2946        }
2947    }
2948
2949    if vector.is_empty() {
2950        return Err(ProtocolError::InvalidCommandFrame(
2951            "VADD: at least one vector dimension required".into(),
2952        ));
2953    }
2954
2955    // parse optional flags
2956    let (metric, quantization, connectivity, expansion_add) =
2957        parse_vector_flags(&args[idx..], "VADD")?;
2958
2959    Ok(Command::VAdd {
2960        key,
2961        element,
2962        vector,
2963        metric,
2964        quantization,
2965        connectivity,
2966        expansion_add,
2967    })
2968}
2969
2970/// VADD_BATCH key DIM n [BINARY] element1 f32...|<blob> element2 f32...|<blob>
2971/// [METRIC COSINE|L2|IP] [QUANT F32|F16|I8] [M n] [EF n]
2972///
2973/// When BINARY is specified, each vector is a single bulk string of `dim * 4`
2974/// raw little-endian f32 bytes instead of `dim` separate text arguments.
2975/// This eliminates string serialization overhead on both client and server.
2976fn parse_vadd_batch(args: &[Frame]) -> Result<Command, ProtocolError> {
2977    // minimum: key + DIM + n (even an empty batch needs the DIM declaration)
2978    if args.len() < 3 {
2979        return Err(wrong_arity("VADD_BATCH"));
2980    }
2981
2982    let key = extract_string(&args[0])?;
2983
2984    // require DIM keyword
2985    let mut kw = [0u8; MAX_KEYWORD_LEN];
2986    let dim_kw = uppercase_arg(&args[1], &mut kw)?;
2987    if dim_kw != "DIM" {
2988        return Err(ProtocolError::InvalidCommandFrame(
2989            "VADD_BATCH: expected DIM keyword".into(),
2990        ));
2991    }
2992
2993    let dim = parse_u64(&args[2], "VADD_BATCH")? as usize;
2994    if dim == 0 {
2995        return Err(ProtocolError::InvalidCommandFrame(
2996            "VADD_BATCH: DIM must be at least 1".into(),
2997        ));
2998    }
2999    if dim > MAX_VECTOR_DIMS {
3000        return Err(ProtocolError::InvalidCommandFrame(format!(
3001            "VADD_BATCH: DIM {dim} exceeds max {MAX_VECTOR_DIMS}"
3002        )));
3003    }
3004
3005    // check for optional BINARY flag after DIM
3006    let mut idx = 3;
3007    let binary_mode = if idx < args.len() {
3008        let mut kw2 = [0u8; MAX_KEYWORD_LEN];
3009        matches!(uppercase_arg(&args[idx], &mut kw2), Ok("BINARY"))
3010    } else {
3011        false
3012    };
3013    if binary_mode {
3014        idx += 1;
3015    }
3016
3017    let mut entries: Vec<(String, Vec<f32>)> = Vec::new();
3018
3019    if binary_mode {
3020        // binary mode: each entry is element_name + single blob of dim*4 bytes
3021        let blob_len = dim * 4;
3022        let entry_len = 2; // element name + blob
3023
3024        while idx < args.len() {
3025            if idx + entry_len > args.len() {
3026                break;
3027            }
3028
3029            // peek: if the second arg isn't exactly blob_len bytes, we've
3030            // hit the flags section (flags are short text strings)
3031            let blob_bytes = extract_bytes(&args[idx + 1])?;
3032            if blob_bytes.len() != blob_len {
3033                break;
3034            }
3035
3036            let element = extract_string(&args[idx])?;
3037            idx += 1;
3038
3039            // skip extract_bytes again — reuse what we already have
3040            idx += 1;
3041
3042            // reinterpret raw LE bytes as f32 slice
3043            let vector = bytes_to_f32_vec(&blob_bytes, dim)?;
3044
3045            entries.push((element, vector));
3046
3047            if entries.len() >= MAX_VADD_BATCH_SIZE {
3048                return Err(ProtocolError::InvalidCommandFrame(format!(
3049                    "VADD_BATCH: batch size exceeds max {MAX_VADD_BATCH_SIZE}"
3050                )));
3051            }
3052        }
3053    } else {
3054        // text mode: each entry is element_name followed by exactly `dim` floats.
3055        // we detect the end of entries by checking whether enough args remain
3056        // for a full entry (1 name + dim floats). this avoids misinterpreting
3057        // element names like "metric" as flags.
3058        let entry_len = 1 + dim; // element name + dim floats
3059
3060        while idx < args.len() {
3061            // not enough remaining args for a full entry — must be flags
3062            if idx + entry_len > args.len() {
3063                break;
3064            }
3065
3066            // peek: if the token after the element name isn't a valid float,
3067            // we've reached the flags section
3068            if dim > 0 {
3069                let peek = extract_string(&args[idx + 1])?;
3070                if peek.parse::<f32>().is_err() {
3071                    break;
3072                }
3073            }
3074
3075            let element = extract_string(&args[idx])?;
3076            idx += 1;
3077
3078            let mut vector = Vec::with_capacity(dim);
3079            for _ in 0..dim {
3080                let s = extract_string(&args[idx])?;
3081                let v = s.parse::<f32>().map_err(|_| {
3082                    ProtocolError::InvalidCommandFrame(format!(
3083                        "VADD_BATCH: expected float, got '{s}'"
3084                    ))
3085                })?;
3086                if v.is_nan() || v.is_infinite() {
3087                    return Err(ProtocolError::InvalidCommandFrame(
3088                        "VADD_BATCH: vector components must be finite (no NaN/infinity)".into(),
3089                    ));
3090                }
3091                vector.push(v);
3092                idx += 1;
3093            }
3094
3095            entries.push((element, vector));
3096
3097            if entries.len() >= MAX_VADD_BATCH_SIZE {
3098                return Err(ProtocolError::InvalidCommandFrame(format!(
3099                    "VADD_BATCH: batch size exceeds max {MAX_VADD_BATCH_SIZE}"
3100                )));
3101            }
3102        }
3103    }
3104
3105    // parse optional flags (same logic as parse_vadd)
3106    let (metric, quantization, connectivity, expansion_add) =
3107        parse_vector_flags(&args[idx..], "VADD_BATCH")?;
3108
3109    Ok(Command::VAddBatch {
3110        key,
3111        entries,
3112        dim,
3113        metric,
3114        quantization,
3115        connectivity,
3116        expansion_add,
3117    })
3118}
3119
3120/// Converts a raw byte buffer of little-endian f32s into a Vec<f32>.
3121///
3122/// Validates that all values are finite (no NaN/infinity). The buffer
3123/// must be exactly `dim * 4` bytes.
3124fn bytes_to_f32_vec(data: &[u8], dim: usize) -> Result<Vec<f32>, ProtocolError> {
3125    // compile-time endianness check — binary protocol assumes little-endian
3126    #[cfg(not(target_endian = "little"))]
3127    compile_error!("VADD_BATCH BINARY mode requires a little-endian target");
3128
3129    debug_assert_eq!(data.len(), dim * 4);
3130
3131    let mut vector = Vec::with_capacity(dim);
3132    for chunk in data.chunks_exact(4) {
3133        let v = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
3134        if !v.is_finite() {
3135            return Err(ProtocolError::InvalidCommandFrame(
3136                "VADD_BATCH BINARY: vector contains non-finite value (NaN/infinity)".into(),
3137            ));
3138        }
3139        vector.push(v);
3140    }
3141    Ok(vector)
3142}
3143
3144/// VSIM key f32 [f32 ...] COUNT k [EF n] [WITHSCORES]
3145fn parse_vsim(args: &[Frame]) -> Result<Command, ProtocolError> {
3146    // minimum: key + at least one float + COUNT + k
3147    if args.len() < 4 {
3148        return Err(wrong_arity("VSIM"));
3149    }
3150
3151    let key = extract_string(&args[0])?;
3152
3153    // parse query vector until we hit a non-numeric argument, end, or dim limit
3154    let mut idx = 1;
3155    let mut query = Vec::new();
3156    while idx < args.len() {
3157        if query.len() >= MAX_VECTOR_DIMS {
3158            return Err(ProtocolError::InvalidCommandFrame(format!(
3159                "VSIM: query exceeds {MAX_VECTOR_DIMS} dimensions"
3160            )));
3161        }
3162        let s = extract_string(&args[idx])?;
3163        if let Ok(v) = s.parse::<f32>() {
3164            if v.is_nan() || v.is_infinite() {
3165                return Err(ProtocolError::InvalidCommandFrame(
3166                    "VSIM: query components must be finite (no NaN/infinity)".into(),
3167                ));
3168            }
3169            query.push(v);
3170            idx += 1;
3171        } else {
3172            break;
3173        }
3174    }
3175
3176    if query.is_empty() {
3177        return Err(ProtocolError::InvalidCommandFrame(
3178            "VSIM: at least one query dimension required".into(),
3179        ));
3180    }
3181
3182    // COUNT k is required
3183    let mut count: Option<usize> = None;
3184    let mut ef_search: usize = 0;
3185    let mut with_scores = false;
3186
3187    while idx < args.len() {
3188        let mut kw = [0u8; MAX_KEYWORD_LEN];
3189        let flag = uppercase_arg(&args[idx], &mut kw)?;
3190        match flag {
3191            "COUNT" => {
3192                idx += 1;
3193                if idx >= args.len() {
3194                    return Err(ProtocolError::InvalidCommandFrame(
3195                        "VSIM: COUNT requires a value".into(),
3196                    ));
3197                }
3198                let c = parse_u64(&args[idx], "VSIM")?;
3199                if c > MAX_VSIM_COUNT {
3200                    return Err(ProtocolError::InvalidCommandFrame(format!(
3201                        "VSIM: COUNT {c} exceeds max {MAX_VSIM_COUNT}"
3202                    )));
3203                }
3204                count = Some(c as usize);
3205                idx += 1;
3206            }
3207            "EF" => {
3208                idx += 1;
3209                if idx >= args.len() {
3210                    return Err(ProtocolError::InvalidCommandFrame(
3211                        "VSIM: EF requires a value".into(),
3212                    ));
3213                }
3214                let ef = parse_u64(&args[idx], "VSIM")?;
3215                if ef > MAX_VSIM_EF {
3216                    return Err(ProtocolError::InvalidCommandFrame(format!(
3217                        "VSIM: EF {ef} exceeds max {MAX_VSIM_EF}"
3218                    )));
3219                }
3220                ef_search = ef as usize;
3221                idx += 1;
3222            }
3223            "WITHSCORES" => {
3224                with_scores = true;
3225                idx += 1;
3226            }
3227            _ => {
3228                return Err(ProtocolError::InvalidCommandFrame(format!(
3229                    "VSIM: unexpected argument '{flag}'"
3230                )));
3231            }
3232        }
3233    }
3234
3235    let count = count
3236        .ok_or_else(|| ProtocolError::InvalidCommandFrame("VSIM: COUNT is required".into()))?;
3237
3238    Ok(Command::VSim {
3239        key,
3240        query,
3241        count,
3242        ef_search,
3243        with_scores,
3244    })
3245}
3246
3247/// VREM key element
3248fn parse_vrem(args: &[Frame]) -> Result<Command, ProtocolError> {
3249    if args.len() != 2 {
3250        return Err(wrong_arity("VREM"));
3251    }
3252    let key = extract_string(&args[0])?;
3253    let element = extract_string(&args[1])?;
3254    Ok(Command::VRem { key, element })
3255}
3256
3257/// VGET key element
3258fn parse_vget(args: &[Frame]) -> Result<Command, ProtocolError> {
3259    if args.len() != 2 {
3260        return Err(wrong_arity("VGET"));
3261    }
3262    let key = extract_string(&args[0])?;
3263    let element = extract_string(&args[1])?;
3264    Ok(Command::VGet { key, element })
3265}
3266
3267/// VCARD key
3268fn parse_vcard(args: &[Frame]) -> Result<Command, ProtocolError> {
3269    if args.len() != 1 {
3270        return Err(wrong_arity("VCARD"));
3271    }
3272    let key = extract_string(&args[0])?;
3273    Ok(Command::VCard { key })
3274}
3275
3276/// VDIM key
3277fn parse_vdim(args: &[Frame]) -> Result<Command, ProtocolError> {
3278    if args.len() != 1 {
3279        return Err(wrong_arity("VDIM"));
3280    }
3281    let key = extract_string(&args[0])?;
3282    Ok(Command::VDim { key })
3283}
3284
3285/// VINFO key
3286fn parse_vinfo(args: &[Frame]) -> Result<Command, ProtocolError> {
3287    if args.len() != 1 {
3288        return Err(wrong_arity("VINFO"));
3289    }
3290    let key = extract_string(&args[0])?;
3291    Ok(Command::VInfo { key })
3292}
3293
3294// --- proto command parsers ---
3295
3296fn parse_proto_register(args: &[Frame]) -> Result<Command, ProtocolError> {
3297    if args.len() != 2 {
3298        return Err(wrong_arity("PROTO.REGISTER"));
3299    }
3300    let name = extract_string(&args[0])?;
3301    let descriptor = extract_bytes(&args[1])?;
3302    Ok(Command::ProtoRegister { name, descriptor })
3303}
3304
3305fn parse_proto_set(args: &[Frame]) -> Result<Command, ProtocolError> {
3306    if args.len() < 3 {
3307        return Err(wrong_arity("PROTO.SET"));
3308    }
3309
3310    let key = extract_string(&args[0])?;
3311    let type_name = extract_string(&args[1])?;
3312    let data = extract_bytes(&args[2])?;
3313    let (expire, nx, xx) = parse_set_options(&args[3..], "PROTO.SET")?;
3314
3315    Ok(Command::ProtoSet {
3316        key,
3317        type_name,
3318        data,
3319        expire,
3320        nx,
3321        xx,
3322    })
3323}
3324
3325fn parse_proto_get(args: &[Frame]) -> Result<Command, ProtocolError> {
3326    if args.len() != 1 {
3327        return Err(wrong_arity("PROTO.GET"));
3328    }
3329    let key = extract_string(&args[0])?;
3330    Ok(Command::ProtoGet { key })
3331}
3332
3333fn parse_proto_type(args: &[Frame]) -> Result<Command, ProtocolError> {
3334    if args.len() != 1 {
3335        return Err(wrong_arity("PROTO.TYPE"));
3336    }
3337    let key = extract_string(&args[0])?;
3338    Ok(Command::ProtoType { key })
3339}
3340
3341fn parse_proto_schemas(args: &[Frame]) -> Result<Command, ProtocolError> {
3342    if !args.is_empty() {
3343        return Err(wrong_arity("PROTO.SCHEMAS"));
3344    }
3345    Ok(Command::ProtoSchemas)
3346}
3347
3348fn parse_proto_describe(args: &[Frame]) -> Result<Command, ProtocolError> {
3349    if args.len() != 1 {
3350        return Err(wrong_arity("PROTO.DESCRIBE"));
3351    }
3352    let name = extract_string(&args[0])?;
3353    Ok(Command::ProtoDescribe { name })
3354}
3355
3356fn parse_proto_getfield(args: &[Frame]) -> Result<Command, ProtocolError> {
3357    if args.len() != 2 {
3358        return Err(wrong_arity("PROTO.GETFIELD"));
3359    }
3360    let key = extract_string(&args[0])?;
3361    let field_path = extract_string(&args[1])?;
3362    Ok(Command::ProtoGetField { key, field_path })
3363}
3364
3365fn parse_proto_setfield(args: &[Frame]) -> Result<Command, ProtocolError> {
3366    if args.len() != 3 {
3367        return Err(wrong_arity("PROTO.SETFIELD"));
3368    }
3369    let key = extract_string(&args[0])?;
3370    let field_path = extract_string(&args[1])?;
3371    let value = extract_string(&args[2])?;
3372    Ok(Command::ProtoSetField {
3373        key,
3374        field_path,
3375        value,
3376    })
3377}
3378
3379fn parse_proto_delfield(args: &[Frame]) -> Result<Command, ProtocolError> {
3380    if args.len() != 2 {
3381        return Err(wrong_arity("PROTO.DELFIELD"));
3382    }
3383    let key = extract_string(&args[0])?;
3384    let field_path = extract_string(&args[1])?;
3385    Ok(Command::ProtoDelField { key, field_path })
3386}
3387
3388fn parse_auth(args: &[Frame]) -> Result<Command, ProtocolError> {
3389    match args.len() {
3390        1 => {
3391            let password = extract_string(&args[0])?;
3392            Ok(Command::Auth {
3393                username: None,
3394                password,
3395            })
3396        }
3397        2 => {
3398            let username = extract_string(&args[0])?;
3399            let password = extract_string(&args[1])?;
3400            Ok(Command::Auth {
3401                username: Some(username),
3402                password,
3403            })
3404        }
3405        _ => Err(wrong_arity("AUTH")),
3406    }
3407}
3408
3409fn parse_quit(args: &[Frame]) -> Result<Command, ProtocolError> {
3410    if !args.is_empty() {
3411        return Err(wrong_arity("QUIT"));
3412    }
3413    Ok(Command::Quit)
3414}
3415
3416fn parse_monitor(args: &[Frame]) -> Result<Command, ProtocolError> {
3417    if !args.is_empty() {
3418        return Err(wrong_arity("MONITOR"));
3419    }
3420    Ok(Command::Monitor)
3421}
3422
3423fn parse_touch(args: &[Frame]) -> Result<Command, ProtocolError> {
3424    if args.is_empty() {
3425        return Err(wrong_arity("TOUCH"));
3426    }
3427    let keys = extract_strings(args)?;
3428    Ok(Command::Touch { keys })
3429}
3430
3431fn parse_sort(args: &[Frame]) -> Result<Command, ProtocolError> {
3432    if args.is_empty() {
3433        return Err(wrong_arity("SORT"));
3434    }
3435    let key = extract_string(&args[0])?;
3436    let mut desc = false;
3437    let mut alpha = false;
3438    let mut limit = None;
3439    let mut store = None;
3440    let mut i = 1;
3441    while i < args.len() {
3442        let flag = extract_string(&args[i])?.to_uppercase();
3443        match flag.as_str() {
3444            "ASC" => {
3445                desc = false;
3446                i += 1;
3447            }
3448            "DESC" => {
3449                desc = true;
3450                i += 1;
3451            }
3452            "ALPHA" => {
3453                alpha = true;
3454                i += 1;
3455            }
3456            "LIMIT" => {
3457                if i + 2 >= args.len() {
3458                    return Err(ProtocolError::InvalidCommandFrame(
3459                        "SORT LIMIT requires offset and count".into(),
3460                    ));
3461                }
3462                let offset = extract_string(&args[i + 1])?.parse::<i64>().map_err(|_| {
3463                    ProtocolError::InvalidCommandFrame(
3464                        "SORT LIMIT offset is not a valid integer".into(),
3465                    )
3466                })?;
3467                let count = extract_string(&args[i + 2])?.parse::<i64>().map_err(|_| {
3468                    ProtocolError::InvalidCommandFrame(
3469                        "SORT LIMIT count is not a valid integer".into(),
3470                    )
3471                })?;
3472                limit = Some((offset, count));
3473                i += 3;
3474            }
3475            "STORE" => {
3476                if i + 1 >= args.len() {
3477                    return Err(wrong_arity("SORT"));
3478                }
3479                store = Some(extract_string(&args[i + 1])?);
3480                i += 2;
3481            }
3482            _ => {
3483                return Err(ProtocolError::InvalidCommandFrame(format!(
3484                    "SORT: unsupported flag '{flag}'"
3485                )));
3486            }
3487        }
3488    }
3489    Ok(Command::Sort {
3490        key,
3491        desc,
3492        alpha,
3493        limit,
3494        store,
3495    })
3496}
3497
3498// --- Redis 6.2+ commands ---
3499
3500fn parse_lmove(args: &[Frame]) -> Result<Command, ProtocolError> {
3501    if args.len() != 4 {
3502        return Err(wrong_arity("LMOVE"));
3503    }
3504    let source = extract_string(&args[0])?;
3505    let destination = extract_string(&args[1])?;
3506
3507    let mut kw = [0u8; MAX_KEYWORD_LEN];
3508    let src_left = match uppercase_arg(&args[2], &mut kw)? {
3509        "LEFT" => true,
3510        "RIGHT" => false,
3511        other => {
3512            return Err(ProtocolError::InvalidCommandFrame(format!(
3513                "LMOVE: invalid wherefrom '{other}', expected LEFT or RIGHT"
3514            )));
3515        }
3516    };
3517    let mut kw = [0u8; MAX_KEYWORD_LEN];
3518    let dst_left = match uppercase_arg(&args[3], &mut kw)? {
3519        "LEFT" => true,
3520        "RIGHT" => false,
3521        other => {
3522            return Err(ProtocolError::InvalidCommandFrame(format!(
3523                "LMOVE: invalid whereto '{other}', expected LEFT or RIGHT"
3524            )));
3525        }
3526    };
3527
3528    Ok(Command::LMove {
3529        source,
3530        destination,
3531        src_left,
3532        dst_left,
3533    })
3534}
3535
3536fn parse_getdel(args: &[Frame]) -> Result<Command, ProtocolError> {
3537    if args.len() != 1 {
3538        return Err(wrong_arity("GETDEL"));
3539    }
3540    let key = extract_string(&args[0])?;
3541    Ok(Command::GetDel { key })
3542}
3543
3544fn parse_getex(args: &[Frame]) -> Result<Command, ProtocolError> {
3545    if args.is_empty() {
3546        return Err(wrong_arity("GETEX"));
3547    }
3548    let key = extract_string(&args[0])?;
3549    let rest = &args[1..];
3550
3551    let expire = if rest.is_empty() {
3552        // no options — TTL unchanged
3553        None
3554    } else {
3555        let mut kw = [0u8; MAX_KEYWORD_LEN];
3556        match uppercase_arg(&rest[0], &mut kw)? {
3557            "PERSIST" => Some(None),
3558            "EX" => {
3559                if rest.len() < 2 {
3560                    return Err(wrong_arity("GETEX"));
3561                }
3562                let n = parse_u64(&rest[1], "GETEX")?;
3563                if n == 0 {
3564                    return Err(ProtocolError::InvalidCommandFrame(
3565                        "invalid expire time in 'GETEX' command".into(),
3566                    ));
3567                }
3568                Some(Some(SetExpire::Ex(n)))
3569            }
3570            "PX" => {
3571                if rest.len() < 2 {
3572                    return Err(wrong_arity("GETEX"));
3573                }
3574                let n = parse_u64(&rest[1], "GETEX")?;
3575                if n == 0 {
3576                    return Err(ProtocolError::InvalidCommandFrame(
3577                        "invalid expire time in 'GETEX' command".into(),
3578                    ));
3579                }
3580                Some(Some(SetExpire::Px(n)))
3581            }
3582            "EXAT" => {
3583                if rest.len() < 2 {
3584                    return Err(wrong_arity("GETEX"));
3585                }
3586                let n = parse_u64(&rest[1], "GETEX")?;
3587                Some(Some(SetExpire::ExAt(n)))
3588            }
3589            "PXAT" => {
3590                if rest.len() < 2 {
3591                    return Err(wrong_arity("GETEX"));
3592                }
3593                let n = parse_u64(&rest[1], "GETEX")?;
3594                Some(Some(SetExpire::PxAt(n)))
3595            }
3596            other => {
3597                return Err(ProtocolError::InvalidCommandFrame(format!(
3598                    "GETEX: unsupported option '{other}'"
3599                )));
3600            }
3601        }
3602    };
3603
3604    Ok(Command::GetEx { key, expire })
3605}
3606
3607/// Parses ZDIFF/ZINTER/ZUNION: `numkeys key [key ...] [WITHSCORES]`.
3608fn parse_zset_multi(cmd: &'static str, args: &[Frame]) -> Result<Command, ProtocolError> {
3609    if args.is_empty() {
3610        return Err(wrong_arity(cmd));
3611    }
3612    let numkeys = parse_u64(&args[0], cmd)? as usize;
3613    if numkeys == 0 {
3614        return Err(ProtocolError::InvalidCommandFrame(format!(
3615            "{cmd}: numkeys must be positive"
3616        )));
3617    }
3618    if args.len() < 1 + numkeys {
3619        return Err(wrong_arity(cmd));
3620    }
3621    let keys = extract_strings(&args[1..1 + numkeys])?;
3622
3623    let mut with_scores = false;
3624    for frame in &args[1 + numkeys..] {
3625        let mut kw = [0u8; MAX_KEYWORD_LEN];
3626        if let Ok("WITHSCORES") = uppercase_arg(frame, &mut kw) {
3627            with_scores = true;
3628        }
3629    }
3630
3631    match cmd {
3632        "ZDIFF" => Ok(Command::ZDiff { keys, with_scores }),
3633        "ZINTER" => Ok(Command::ZInter { keys, with_scores }),
3634        "ZUNION" => Ok(Command::ZUnion { keys, with_scores }),
3635        _ => Err(wrong_arity(cmd)),
3636    }
3637}
3638
3639/// Parses ZDIFFSTORE/ZINTERSTORE/ZUNIONSTORE: `destkey numkeys key [key ...]`.
3640fn parse_zset_store(cmd: &'static str, args: &[Frame]) -> Result<Command, ProtocolError> {
3641    // need at least: dest numkeys key
3642    if args.len() < 3 {
3643        return Err(wrong_arity(cmd));
3644    }
3645    let dest = extract_string(&args[0])?;
3646    let numkeys = parse_u64(&args[1], cmd)? as usize;
3647    if numkeys == 0 {
3648        return Err(ProtocolError::InvalidCommandFrame(format!(
3649            "{cmd}: numkeys must be positive"
3650        )));
3651    }
3652    if args.len() < 2 + numkeys {
3653        return Err(wrong_arity(cmd));
3654    }
3655    let keys = extract_strings(&args[2..2 + numkeys])?;
3656
3657    match cmd {
3658        "ZDIFFSTORE" => Ok(Command::ZDiffStore { dest, keys }),
3659        "ZINTERSTORE" => Ok(Command::ZInterStore { dest, keys }),
3660        "ZUNIONSTORE" => Ok(Command::ZUnionStore { dest, keys }),
3661        _ => Err(wrong_arity(cmd)),
3662    }
3663}
3664
3665fn parse_zrandmember(args: &[Frame]) -> Result<Command, ProtocolError> {
3666    if args.is_empty() {
3667        return Err(wrong_arity("ZRANDMEMBER"));
3668    }
3669    let key = extract_string(&args[0])?;
3670    let (count, with_scores) = match args.len() {
3671        1 => (None, false),
3672        2 => (Some(parse_i64(&args[1], "ZRANDMEMBER")?), false),
3673        3 => {
3674            let count = parse_i64(&args[1], "ZRANDMEMBER")?;
3675            let flag = extract_string(&args[2])?.to_ascii_uppercase();
3676            if flag != "WITHSCORES" {
3677                return Err(ProtocolError::InvalidCommandFrame(
3678                    "ZRANDMEMBER: expected WITHSCORES".into(),
3679                ));
3680            }
3681            (Some(count), true)
3682        }
3683        _ => return Err(wrong_arity("ZRANDMEMBER")),
3684    };
3685    Ok(Command::ZRandMember {
3686        key,
3687        count,
3688        with_scores,
3689    })
3690}
3691
3692fn parse_wait(args: &[Frame]) -> Result<Command, ProtocolError> {
3693    if args.len() != 2 {
3694        return Err(wrong_arity("WAIT"));
3695    }
3696    let numreplicas_str = extract_string(&args[0])?;
3697    let timeout_ms_str = extract_string(&args[1])?;
3698    let numreplicas = numreplicas_str.parse::<u64>().map_err(|_| {
3699        ProtocolError::InvalidCommandFrame("WAIT numreplicas must be an integer".into())
3700    })?;
3701    let timeout_ms = timeout_ms_str.parse::<u64>().map_err(|_| {
3702        ProtocolError::InvalidCommandFrame("WAIT timeout must be an integer".into())
3703    })?;
3704    Ok(Command::Wait {
3705        numreplicas,
3706        timeout_ms,
3707    })
3708}
3709
3710fn parse_command_cmd(args: &[Frame]) -> Result<Command, ProtocolError> {
3711    if args.is_empty() {
3712        return Ok(Command::Command {
3713            subcommand: None,
3714            args: vec![],
3715        });
3716    }
3717    let sub = extract_string(&args[0])?.to_ascii_uppercase();
3718    match sub.as_str() {
3719        "COUNT" => Ok(Command::Command {
3720            subcommand: Some("COUNT".into()),
3721            args: vec![],
3722        }),
3723        "INFO" => {
3724            let names = args[1..]
3725                .iter()
3726                .map(|f| extract_string(f).map(|s| s.to_ascii_uppercase()))
3727                .collect::<Result<Vec<_>, _>>()?;
3728            Ok(Command::Command {
3729                subcommand: Some("INFO".into()),
3730                args: names,
3731            })
3732        }
3733        "DOCS" => {
3734            let names = args[1..]
3735                .iter()
3736                .map(|f| extract_string(f).map(|s| s.to_ascii_uppercase()))
3737                .collect::<Result<Vec<_>, _>>()?;
3738            Ok(Command::Command {
3739                subcommand: Some("DOCS".into()),
3740                args: names,
3741            })
3742        }
3743        "GETKEYS" => Ok(Command::Command {
3744            subcommand: Some("GETKEYS".into()),
3745            args: vec![],
3746        }),
3747        "LIST" => Ok(Command::Command {
3748            subcommand: Some("LIST".into()),
3749            args: vec![],
3750        }),
3751        _ => Err(ProtocolError::InvalidCommandFrame(format!(
3752            "unknown COMMAND subcommand: {sub}"
3753        ))),
3754    }
3755}