Skip to main content

shardcache_client_rs/
client.rs

1#[cfg(feature = "redis")]
2use std::collections::VecDeque;
3use std::net::ToSocketAddrs;
4
5use crate::commands::del::{self, Del};
6use crate::commands::exists::{self, Exists};
7use crate::commands::expire::{self, Expire};
8use crate::commands::get::{self, Get};
9use crate::commands::getex::{self, GetEx};
10#[cfg(feature = "redis")]
11use crate::commands::redis::{
12    self, RedisCommand as OptimizedRedisCommand, RedisCommandKind, RedisCommandRouteKeys,
13    RedisRespCommand, RedisResponse,
14};
15use crate::commands::resp::RespCommand;
16use crate::commands::set::{self, Set};
17use crate::commands::setex::{self, SetEx};
18use crate::commands::ttl::{self, Ttl};
19use crate::connection::ScnpConnection;
20use crate::error::{Result, ShardCacheClientError};
21#[cfg(feature = "redis")]
22use crate::routing::ShardCacheRoute;
23use crate::routing::{ShardCacheDirectRouter, ShardCacheRouteMode};
24
25#[cfg(feature = "redis")]
26#[derive(Debug, Clone, Copy)]
27enum RedisPipelineResponse {
28    Native,
29    Resp,
30}
31
32/// Blocking SCNP client for the ordinary server listener.
33#[derive(Debug)]
34pub struct ShardCacheClient {
35    conn: ScnpConnection,
36    #[cfg(feature = "redis")]
37    redis_pipeline_responses: VecDeque<RedisPipelineResponse>,
38}
39
40impl ShardCacheClient {
41    /// Connects to a shardcache server listener that accepts generic SCNP.
42    pub fn connect(addr: impl ToSocketAddrs) -> Result<Self> {
43        Ok(Self {
44            conn: ScnpConnection::connect(addr)?,
45            #[cfg(feature = "redis")]
46            redis_pipeline_responses: VecDeque::new(),
47        })
48    }
49
50    /// Reads `key` into `out`, returning `true` on hit.
51    pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
52        self.conn.execute(Get::new(key, out))
53    }
54
55    /// Sets `key` to `value`.
56    pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
57        self.conn.execute(Set::new(key, value))
58    }
59
60    /// Sets `key` to `value` with a millisecond TTL.
61    pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
62        self.conn.execute(SetEx::new(key, value, ttl_ms))
63    }
64
65    /// Reads `key` into `out` and sets a millisecond TTL, returning `true` on hit.
66    pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
67        self.conn.execute(GetEx::new(key, ttl_ms, out))
68    }
69
70    /// Deletes `key`, returning `true` when an entry was removed.
71    pub fn del(&mut self, key: &[u8]) -> Result<bool> {
72        self.conn.execute(Del::new(key))
73    }
74
75    /// Returns whether `key` exists.
76    pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
77        self.conn.execute(Exists::new(key))
78    }
79
80    /// Returns Redis-compatible TTL seconds for `key`.
81    pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
82        self.conn.execute(Ttl::new(key))
83    }
84
85    /// Sets a millisecond TTL on `key`, returning `true` when the TTL changed.
86    pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
87        self.conn.execute(Expire::new(key, ttl_ms))
88    }
89
90    /// Returns the first-party Redis command namespace.
91    #[cfg(feature = "redis")]
92    pub fn redis(&mut self) -> crate::Redis<'_, Self> {
93        crate::Redis::new(self)
94    }
95
96    /// Executes a Redis-compatible command through the compact opcode SCNP wrapper.
97    #[cfg(feature = "redis")]
98    pub fn redis_command(
99        &mut self,
100        command: RedisCommandKind,
101        args: &[&[u8]],
102    ) -> Result<RedisResponse> {
103        self.conn.execute(OptimizedRedisCommand::new(command, args))
104    }
105
106    /// Executes a Redis-compatible command by name through native SCNP.
107    ///
108    /// Commands with compact opcodes use the optimized Redis wrapper. Other
109    /// names use the SCNP command-name wrapper and return decoded RESP.
110    #[cfg(feature = "redis")]
111    pub fn redis_command_by_name(
112        &mut self,
113        command: &[u8],
114        args: &[&[u8]],
115    ) -> Result<RedisResponse> {
116        match RedisCommandKind::from_name(command) {
117            Some(command) => self.redis_command(command, args),
118            None => self.redis_resp_command(command, args),
119        }
120    }
121
122    /// Executes a Redis-compatible command through the SCNP command-name wrapper.
123    ///
124    /// This path is still native SCNP, but it carries the Redis command name in
125    /// the body so it can cover commands that do not have a compact opcode.
126    #[cfg(feature = "redis")]
127    pub fn redis_resp_command(&mut self, command: &[u8], args: &[&[u8]]) -> Result<RedisResponse> {
128        validate_redis_command_name(command)?;
129        self.conn.execute(RedisRespCommand::new(command, args))
130    }
131
132    /// Executes a Redis-compatible command through the generic SCNP wrapper.
133    ///
134    /// The server returns RESP bytes as an SCNP value. `out` receives those raw
135    /// bytes so callers can decode exactly the shape they requested.
136    pub fn resp_command_into(&mut self, parts: &[&[u8]], out: &mut Vec<u8>) -> Result<bool> {
137        self.conn.execute(RespCommand::new(parts, out))
138    }
139
140    /// Runs the global SCNP scan wrapper and returns the RESP scan reply bytes.
141    pub fn scan_resp_into(&mut self, cursor: u64, count: usize, out: &mut Vec<u8>) -> Result<bool> {
142        let cursor = cursor.to_string();
143        let count = count.to_string();
144        self.resp_command_into(
145            &[b"SCNP.SCAN", cursor.as_bytes(), b"COUNT", count.as_bytes()],
146            out,
147        )
148    }
149
150    /// Runs a shard-local SCNP scan. Call this concurrently per shard to avoid
151    /// a server-side fanout scan.
152    pub fn scan_shard_resp_into(
153        &mut self,
154        shard_id: usize,
155        cursor: u64,
156        count: usize,
157        out: &mut Vec<u8>,
158    ) -> Result<bool> {
159        let shard_id = shard_id.to_string();
160        let cursor = cursor.to_string();
161        let count = count.to_string();
162        self.resp_command_into(
163            &[
164                b"SCNP.SCANSHARD",
165                shard_id.as_bytes(),
166                cursor.as_bytes(),
167                b"COUNT",
168                count.as_bytes(),
169            ],
170            out,
171        )
172    }
173
174    /// Writes a GET request without flushing or reading its response.
175    pub fn begin_pipeline_get(&mut self, key: &[u8]) -> Result<()> {
176        get::write_request(&mut self.conn, None, key)
177    }
178
179    /// Writes a SET request without flushing or reading its response.
180    pub fn begin_pipeline_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
181        set::write_request(&mut self.conn, None, key, value)
182    }
183
184    /// Writes a SETEX request without flushing or reading its response.
185    pub fn begin_pipeline_set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
186        setex::write_request(&mut self.conn, None, key, value, ttl_ms)
187    }
188
189    /// Writes a GETEX request without flushing or reading its response.
190    pub fn begin_pipeline_get_ex(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
191        getex::write_request(&mut self.conn, None, key, ttl_ms)
192    }
193
194    /// Writes a DEL request without flushing or reading its response.
195    pub fn begin_pipeline_del(&mut self, key: &[u8]) -> Result<()> {
196        del::write_request(&mut self.conn, None, key)
197    }
198
199    /// Writes an EXISTS request without flushing or reading its response.
200    pub fn begin_pipeline_exists(&mut self, key: &[u8]) -> Result<()> {
201        exists::write_request(&mut self.conn, None, key)
202    }
203
204    /// Writes a TTL request without flushing or reading its response.
205    pub fn begin_pipeline_ttl(&mut self, key: &[u8]) -> Result<()> {
206        ttl::write_request(&mut self.conn, None, key)
207    }
208
209    /// Writes an EXPIRE request without flushing or reading its response.
210    pub fn begin_pipeline_expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
211        expire::write_request(&mut self.conn, None, key, ttl_ms)
212    }
213
214    /// Writes a compact Redis command request without flushing or reading its response.
215    #[cfg(feature = "redis")]
216    pub fn begin_pipeline_redis_command(
217        &mut self,
218        command: RedisCommandKind,
219        args: &[&[u8]],
220    ) -> Result<()> {
221        redis::write_request(&mut self.conn, command, None, args)?;
222        self.redis_pipeline_responses
223            .push_back(RedisPipelineResponse::Native);
224        Ok(())
225    }
226
227    /// Writes a Redis command request by name without flushing or reading its response.
228    ///
229    /// Compact-opcode commands use the optimized Redis wrapper. Other command
230    /// names use the SCNP command-name wrapper and decode the RESP payload when
231    /// [`finish_pipeline_redis_command`](Self::finish_pipeline_redis_command)
232    /// is called.
233    #[cfg(feature = "redis")]
234    pub fn begin_pipeline_redis_command_by_name(
235        &mut self,
236        command: &[u8],
237        args: &[&[u8]],
238    ) -> Result<()> {
239        match RedisCommandKind::from_name(command) {
240            Some(command) => self.begin_pipeline_redis_command(command, args),
241            None => self.begin_pipeline_redis_resp_command(command, args),
242        }
243    }
244
245    /// Writes a Redis command-name wrapper request without flushing or reading its response.
246    #[cfg(feature = "redis")]
247    pub fn begin_pipeline_redis_resp_command(
248        &mut self,
249        command: &[u8],
250        args: &[&[u8]],
251    ) -> Result<()> {
252        validate_redis_command_name(command)?;
253        redis::write_resp_request(&mut self.conn, command, args)?;
254        self.redis_pipeline_responses
255            .push_back(RedisPipelineResponse::Resp);
256        Ok(())
257    }
258
259    /// Flushes all queued pipelined requests.
260    pub fn flush_pipeline(&mut self) -> Result<()> {
261        self.conn.flush()
262    }
263
264    /// Reads the next pipelined GET response.
265    pub fn finish_pipeline_get_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
266        self.conn
267            .read_value(<Get as crate::commands::ScnpCommand>::NAME, out)
268    }
269
270    /// Reads the next pipelined SET response.
271    pub fn finish_pipeline_set(&mut self) -> Result<()> {
272        self.conn
273            .expect_ok(<Set as crate::commands::ScnpCommand>::NAME)
274    }
275
276    /// Reads the next pipelined SETEX response.
277    pub fn finish_pipeline_set_ex(&mut self) -> Result<()> {
278        self.conn
279            .expect_ok(<SetEx as crate::commands::ScnpCommand>::NAME)
280    }
281
282    /// Reads the next pipelined GETEX response.
283    pub fn finish_pipeline_get_ex_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
284        self.conn
285            .read_value(<GetEx as crate::commands::ScnpCommand>::NAME, out)
286    }
287
288    /// Reads the next pipelined DEL response.
289    pub fn finish_pipeline_del(&mut self) -> Result<bool> {
290        self.conn
291            .read_integer(<Del as crate::commands::ScnpCommand>::NAME)
292            .map(|deleted| deleted != 0)
293    }
294
295    /// Reads the next pipelined EXISTS response.
296    pub fn finish_pipeline_exists(&mut self) -> Result<bool> {
297        self.conn
298            .read_integer(<Exists as crate::commands::ScnpCommand>::NAME)
299            .map(|exists| exists != 0)
300    }
301
302    /// Reads the next pipelined TTL response.
303    pub fn finish_pipeline_ttl(&mut self) -> Result<i64> {
304        self.conn
305            .read_integer(<Ttl as crate::commands::ScnpCommand>::NAME)
306    }
307
308    /// Reads the next pipelined EXPIRE response.
309    pub fn finish_pipeline_expire(&mut self) -> Result<bool> {
310        self.conn
311            .read_integer(<Expire as crate::commands::ScnpCommand>::NAME)
312            .map(|changed| changed != 0)
313    }
314
315    /// Reads the next pipelined compact Redis command response.
316    #[cfg(feature = "redis")]
317    pub fn finish_pipeline_redis_command(&mut self) -> Result<RedisResponse> {
318        match self
319            .redis_pipeline_responses
320            .pop_front()
321            .unwrap_or(RedisPipelineResponse::Native)
322        {
323            RedisPipelineResponse::Native => self.conn.read_native_redis_response("REDIS"),
324            RedisPipelineResponse::Resp => self.conn.read_resp_redis_response("RESP"),
325        }
326    }
327}
328
329impl ShardCacheDirectRouter {
330    /// Connects directly to one shard-owned port.
331    pub fn connect_shard(&self, shard_id: usize) -> Result<ShardCacheDirectShardClient> {
332        Ok(ShardCacheDirectShardClient {
333            router: *self,
334            shard_id,
335            conn: ScnpConnection::connect(self.shard_addr(shard_id)?)?,
336        })
337    }
338}
339
340/// Blocking SCNP client that automatically routes each key to its shard port.
341#[derive(Debug)]
342pub struct ShardCacheDirectClient {
343    router: ShardCacheDirectRouter,
344    conns: Vec<ScnpConnection>,
345}
346
347impl ShardCacheDirectClient {
348    /// Connects to every shard-owned port starting at `addr`.
349    ///
350    /// `addr` must be the first direct shard port, not the fanout port.
351    pub fn connect(addr: impl ToSocketAddrs, shard_count: usize) -> Result<Self> {
352        let router = ShardCacheDirectRouter::new(addr, shard_count)?;
353        Self::connect_with_router(router)
354    }
355
356    /// Connects to every shard-owned port using an explicit route mode.
357    pub fn connect_with_route_mode(
358        addr: impl ToSocketAddrs,
359        shard_count: usize,
360        route_mode: ShardCacheRouteMode,
361    ) -> Result<Self> {
362        let router = ShardCacheDirectRouter::new(addr, shard_count)?.with_route_mode(route_mode);
363        Self::connect_with_router(router)
364    }
365
366    fn connect_with_router(router: ShardCacheDirectRouter) -> Result<Self> {
367        let mut conns = Vec::with_capacity(router.shard_count());
368        for shard_id in 0..router.shard_count() {
369            conns.push(ScnpConnection::connect(router.shard_addr(shard_id)?)?);
370        }
371        Ok(Self { router, conns })
372    }
373
374    /// Reads `key` from its owning shard into `out`, returning `true` on hit.
375    pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
376        let route = self.router.route_key(key);
377        self.conns[route.shard_id].execute(Get::routed(route, key, out))
378    }
379
380    /// Sets `key` on its owning shard.
381    pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
382        let route = self.router.route_key(key);
383        self.conns[route.shard_id].execute(Set::routed(route, key, value))
384    }
385
386    /// Sets `key` on its owning shard with a millisecond TTL.
387    pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
388        let route = self.router.route_key(key);
389        self.conns[route.shard_id].execute(SetEx::routed(route, key, value, ttl_ms))
390    }
391
392    /// Reads `key` from its owning shard into `out` and sets a millisecond TTL.
393    pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
394        let route = self.router.route_key(key);
395        self.conns[route.shard_id].execute(GetEx::routed(route, key, ttl_ms, out))
396    }
397
398    /// Deletes `key` from its owning shard.
399    pub fn del(&mut self, key: &[u8]) -> Result<bool> {
400        let route = self.router.route_key(key);
401        self.conns[route.shard_id].execute(Del::routed(route, key))
402    }
403
404    /// Returns whether `key` exists on its owning shard.
405    pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
406        let route = self.router.route_key(key);
407        self.conns[route.shard_id].execute(Exists::routed(route, key))
408    }
409
410    /// Returns Redis-compatible TTL seconds for `key` on its owning shard.
411    pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
412        let route = self.router.route_key(key);
413        self.conns[route.shard_id].execute(Ttl::routed(route, key))
414    }
415
416    /// Sets a millisecond TTL on `key` on its owning shard.
417    pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
418        let route = self.router.route_key(key);
419        self.conns[route.shard_id].execute(Expire::routed(route, key, ttl_ms))
420    }
421
422    /// Returns the first-party Redis command namespace for direct shard routing.
423    #[cfg(feature = "redis")]
424    pub fn redis(&mut self) -> crate::Redis<'_, Self> {
425        crate::Redis::new(self)
426    }
427
428    /// Executes a compact Redis command on the owning direct shard.
429    ///
430    /// Commands that require all shards are rejected; use [`ShardCacheClient`] against
431    /// the fanout listener for those.
432    #[cfg(feature = "redis")]
433    pub fn redis_command(
434        &mut self,
435        command: RedisCommandKind,
436        args: &[&[u8]],
437    ) -> Result<RedisResponse> {
438        let route = redis_direct_route(&self.router, command, args)?;
439        let shard_id = route.map_or(0, |route| route.shard_id);
440        self.conns[shard_id].execute(OptimizedRedisCommand::routed(command, route, args))
441    }
442
443    /// Executes a compact Redis command by name on the owning direct shard.
444    #[cfg(feature = "redis")]
445    pub fn redis_command_by_name(
446        &mut self,
447        command: &[u8],
448        args: &[&[u8]],
449    ) -> Result<RedisResponse> {
450        self.redis_command(redis_command_kind_from_name(command)?, args)
451    }
452
453    /// Runs a shard-local SCNP scan on one direct shard connection. Callers can
454    /// invoke this for different shards from different threads for parallel
455    /// scans.
456    pub fn scan_shard_resp_into(
457        &mut self,
458        shard_id: usize,
459        cursor: u64,
460        count: usize,
461        out: &mut Vec<u8>,
462    ) -> Result<bool> {
463        if shard_id >= self.conns.len() {
464            return Err(ShardCacheClientError::Config(format!(
465                "shard {shard_id} is outside configured shard count {}",
466                self.conns.len()
467            )));
468        }
469        let shard_id_text = shard_id.to_string();
470        let cursor = cursor.to_string();
471        let count = count.to_string();
472        self.conns[shard_id].execute(RespCommand::new(
473            &[
474                b"SCNP.SCANSHARD",
475                shard_id_text.as_bytes(),
476                cursor.as_bytes(),
477                b"COUNT",
478                count.as_bytes(),
479            ],
480            out,
481        ))
482    }
483}
484
485/// Blocking SCNP client pinned to one shard-owned port.
486///
487/// This is useful for thread-per-shard clients that pre-partition work.
488#[derive(Debug)]
489pub struct ShardCacheDirectShardClient {
490    router: ShardCacheDirectRouter,
491    shard_id: usize,
492    conn: ScnpConnection,
493}
494
495impl ShardCacheDirectShardClient {
496    /// Returns the shard this client is connected to.
497    pub fn shard_id(&self) -> usize {
498        self.shard_id
499    }
500
501    /// Reads `key` into `out`, returning `true` on hit.
502    pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
503        let route = self.checked_route(key)?;
504        self.conn.execute(Get::routed(route, key, out))
505    }
506
507    /// Sets `key` to `value`.
508    pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
509        let route = self.checked_route(key)?;
510        self.conn.execute(Set::routed(route, key, value))
511    }
512
513    /// Sets `key` to `value` with a millisecond TTL.
514    pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
515        let route = self.checked_route(key)?;
516        self.conn.execute(SetEx::routed(route, key, value, ttl_ms))
517    }
518
519    /// Reads `key` into `out` and sets a millisecond TTL, returning `true` on hit.
520    pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
521        let route = self.checked_route(key)?;
522        self.conn.execute(GetEx::routed(route, key, ttl_ms, out))
523    }
524
525    /// Deletes `key`, returning `true` when an entry was removed.
526    pub fn del(&mut self, key: &[u8]) -> Result<bool> {
527        let route = self.checked_route(key)?;
528        self.conn.execute(Del::routed(route, key))
529    }
530
531    /// Returns whether `key` exists.
532    pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
533        let route = self.checked_route(key)?;
534        self.conn.execute(Exists::routed(route, key))
535    }
536
537    /// Returns Redis-compatible TTL seconds for `key`.
538    pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
539        let route = self.checked_route(key)?;
540        self.conn.execute(Ttl::routed(route, key))
541    }
542
543    /// Sets a millisecond TTL on `key`, returning `true` when the TTL changed.
544    pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
545        let route = self.checked_route(key)?;
546        self.conn.execute(Expire::routed(route, key, ttl_ms))
547    }
548
549    /// Returns the first-party Redis command namespace for this shard.
550    #[cfg(feature = "redis")]
551    pub fn redis(&mut self) -> crate::Redis<'_, Self> {
552        crate::Redis::new(self)
553    }
554
555    /// Executes a compact Redis command on this direct shard.
556    ///
557    /// Commands that require all shards are rejected; use [`ShardCacheClient`] against
558    /// the fanout listener for those.
559    #[cfg(feature = "redis")]
560    pub fn redis_command(
561        &mut self,
562        command: RedisCommandKind,
563        args: &[&[u8]],
564    ) -> Result<RedisResponse> {
565        let route = redis_direct_shard_route(&self.router, self.shard_id, command, args)?;
566        self.conn
567            .execute(OptimizedRedisCommand::routed(command, route, args))
568    }
569
570    /// Executes a compact Redis command by name on this direct shard.
571    #[cfg(feature = "redis")]
572    pub fn redis_command_by_name(
573        &mut self,
574        command: &[u8],
575        args: &[&[u8]],
576    ) -> Result<RedisResponse> {
577        self.redis_command(redis_command_kind_from_name(command)?, args)
578    }
579
580    /// Runs a shard-local SCNP scan on this shard-owned connection.
581    pub fn scan_resp_into(&mut self, cursor: u64, count: usize, out: &mut Vec<u8>) -> Result<bool> {
582        let shard_id = self.shard_id.to_string();
583        let cursor = cursor.to_string();
584        let count = count.to_string();
585        self.conn.execute(RespCommand::new(
586            &[
587                b"SCNP.SCANSHARD",
588                shard_id.as_bytes(),
589                cursor.as_bytes(),
590                b"COUNT",
591                count.as_bytes(),
592            ],
593            out,
594        ))
595    }
596
597    /// Writes a routed GET request without flushing or reading its response.
598    pub fn begin_pipeline_get(&mut self, key: &[u8]) -> Result<()> {
599        let route = self.checked_route(key)?;
600        get::write_request(&mut self.conn, Some(route), key)
601    }
602
603    /// Writes a routed SET request without flushing or reading its response.
604    pub fn begin_pipeline_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
605        let route = self.checked_route(key)?;
606        set::write_request(&mut self.conn, Some(route), key, value)
607    }
608
609    /// Writes a routed SETEX request without flushing or reading its response.
610    pub fn begin_pipeline_set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
611        let route = self.checked_route(key)?;
612        setex::write_request(&mut self.conn, Some(route), key, value, ttl_ms)
613    }
614
615    /// Writes a routed GETEX request without flushing or reading its response.
616    pub fn begin_pipeline_get_ex(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
617        let route = self.checked_route(key)?;
618        getex::write_request(&mut self.conn, Some(route), key, ttl_ms)
619    }
620
621    /// Writes a routed DEL request without flushing or reading its response.
622    pub fn begin_pipeline_del(&mut self, key: &[u8]) -> Result<()> {
623        let route = self.checked_route(key)?;
624        del::write_request(&mut self.conn, Some(route), key)
625    }
626
627    /// Writes a routed EXISTS request without flushing or reading its response.
628    pub fn begin_pipeline_exists(&mut self, key: &[u8]) -> Result<()> {
629        let route = self.checked_route(key)?;
630        exists::write_request(&mut self.conn, Some(route), key)
631    }
632
633    /// Writes a routed TTL request without flushing or reading its response.
634    pub fn begin_pipeline_ttl(&mut self, key: &[u8]) -> Result<()> {
635        let route = self.checked_route(key)?;
636        ttl::write_request(&mut self.conn, Some(route), key)
637    }
638
639    /// Writes a routed EXPIRE request without flushing or reading its response.
640    pub fn begin_pipeline_expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
641        let route = self.checked_route(key)?;
642        expire::write_request(&mut self.conn, Some(route), key, ttl_ms)
643    }
644
645    /// Writes a compact Redis command request without flushing or reading its response.
646    #[cfg(feature = "redis")]
647    pub fn begin_pipeline_redis_command(
648        &mut self,
649        command: RedisCommandKind,
650        args: &[&[u8]],
651    ) -> Result<()> {
652        let route = redis_direct_shard_route(&self.router, self.shard_id, command, args)?;
653        redis::write_request(&mut self.conn, command, route, args)
654    }
655
656    /// Writes a compact Redis command request by name without flushing or reading its response.
657    #[cfg(feature = "redis")]
658    pub fn begin_pipeline_redis_command_by_name(
659        &mut self,
660        command: &[u8],
661        args: &[&[u8]],
662    ) -> Result<()> {
663        self.begin_pipeline_redis_command(redis_command_kind_from_name(command)?, args)
664    }
665
666    /// Flushes all queued pipelined requests.
667    pub fn flush_pipeline(&mut self) -> Result<()> {
668        self.conn.flush()
669    }
670
671    /// Reads the next pipelined GET response.
672    pub fn finish_pipeline_get_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
673        self.conn
674            .read_value(<Get as crate::commands::ScnpCommand>::NAME, out)
675    }
676
677    /// Reads the next pipelined SET response.
678    pub fn finish_pipeline_set(&mut self) -> Result<()> {
679        self.conn
680            .expect_ok(<Set as crate::commands::ScnpCommand>::NAME)
681    }
682
683    /// Reads the next pipelined SETEX response.
684    pub fn finish_pipeline_set_ex(&mut self) -> Result<()> {
685        self.conn
686            .expect_ok(<SetEx as crate::commands::ScnpCommand>::NAME)
687    }
688
689    /// Reads the next pipelined GETEX response.
690    pub fn finish_pipeline_get_ex_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
691        self.conn
692            .read_value(<GetEx as crate::commands::ScnpCommand>::NAME, out)
693    }
694
695    /// Reads the next pipelined DEL response.
696    pub fn finish_pipeline_del(&mut self) -> Result<bool> {
697        self.conn
698            .read_integer(<Del as crate::commands::ScnpCommand>::NAME)
699            .map(|deleted| deleted != 0)
700    }
701
702    /// Reads the next pipelined EXISTS response.
703    pub fn finish_pipeline_exists(&mut self) -> Result<bool> {
704        self.conn
705            .read_integer(<Exists as crate::commands::ScnpCommand>::NAME)
706            .map(|exists| exists != 0)
707    }
708
709    /// Reads the next pipelined TTL response.
710    pub fn finish_pipeline_ttl(&mut self) -> Result<i64> {
711        self.conn
712            .read_integer(<Ttl as crate::commands::ScnpCommand>::NAME)
713    }
714
715    /// Reads the next pipelined EXPIRE response.
716    pub fn finish_pipeline_expire(&mut self) -> Result<bool> {
717        self.conn
718            .read_integer(<Expire as crate::commands::ScnpCommand>::NAME)
719            .map(|changed| changed != 0)
720    }
721
722    /// Reads the next pipelined compact Redis command response.
723    #[cfg(feature = "redis")]
724    pub fn finish_pipeline_redis_command(&mut self) -> Result<RedisResponse> {
725        self.conn.read_native_redis_response("REDIS")
726    }
727
728    fn checked_route(&self, key: &[u8]) -> Result<crate::routing::ShardCacheRoute> {
729        let route = self.router.route_key(key);
730        if route.shard_id != self.shard_id {
731            return Err(ShardCacheClientError::Config(format!(
732                "key routes to shard {}, but client is connected to shard {}",
733                route.shard_id, self.shard_id
734            )));
735        }
736        Ok(route)
737    }
738}
739
740#[cfg(feature = "redis")]
741fn redis_command_kind_from_name(command: &[u8]) -> Result<RedisCommandKind> {
742    RedisCommandKind::from_name(command).ok_or_else(|| {
743        ShardCacheClientError::Config(format!(
744            "Redis command `{}` is not available on direct SCNP shard clients; use ShardCacheClient on the fanout listener for command-name fallback",
745            String::from_utf8_lossy(command)
746        ))
747    })
748}
749
750#[cfg(feature = "redis")]
751fn validate_redis_command_name(command: &[u8]) -> Result<()> {
752    if command.is_empty() {
753        return Err(ShardCacheClientError::Config(
754            "Redis command name cannot be empty".into(),
755        ));
756    }
757    if command.iter().any(|byte| byte.is_ascii_whitespace()) {
758        return Err(ShardCacheClientError::Config(format!(
759            "Redis command name cannot contain whitespace: `{}`",
760            String::from_utf8_lossy(command)
761        )));
762    }
763    Ok(())
764}
765
766#[cfg(feature = "redis")]
767fn redis_direct_route(
768    router: &ShardCacheDirectRouter,
769    command: RedisCommandKind,
770    args: &[&[u8]],
771) -> Result<Option<ShardCacheRoute>> {
772    let keys = match command.route_keys(args) {
773        RedisCommandRouteKeys::None => return Ok(None),
774        RedisCommandRouteKeys::AllShards => {
775            return Err(ShardCacheClientError::Config(format!(
776                "{} requires all shards; use ShardCacheClient on the fanout listener",
777                command.name()
778            )));
779        }
780        RedisCommandRouteKeys::Keys(keys) if keys.is_empty() => return Ok(None),
781        RedisCommandRouteKeys::Keys(keys) => keys,
782    };
783
784    let first_route = router.route_key(keys[0]);
785    for key in keys.iter().skip(1) {
786        let route = router.route_key(key);
787        if route.shard_id != first_route.shard_id {
788            return Err(ShardCacheClientError::Config(format!(
789                "{} keys span multiple direct shards",
790                command.name()
791            )));
792        }
793    }
794    Ok(Some(first_route))
795}
796
797#[cfg(feature = "redis")]
798fn redis_direct_shard_route(
799    router: &ShardCacheDirectRouter,
800    shard_id: usize,
801    command: RedisCommandKind,
802    args: &[&[u8]],
803) -> Result<Option<ShardCacheRoute>> {
804    let route = redis_direct_route(router, command, args)?;
805    if let Some(route) = route
806        && route.shard_id != shard_id
807    {
808        return Err(ShardCacheClientError::Config(format!(
809            "{} routes to shard {}, but client is connected to shard {}",
810            command.name(),
811            route.shard_id,
812            shard_id
813        )));
814    }
815    Ok(route)
816}