1#[cfg(feature = "redis")]
2use std::collections::VecDeque;
3use std::net::ToSocketAddrs;
4
5use crate::commands::del::{self, Del};
6use crate::commands::exists::{self, Exists};
7use crate::commands::expire::{self, Expire};
8use crate::commands::get::{self, Get};
9use crate::commands::getex::{self, GetEx};
10#[cfg(feature = "redis")]
11use crate::commands::redis::{
12 self, RedisCommand as OptimizedRedisCommand, RedisCommandKind, RedisCommandRouteKeys,
13 RedisRespCommand, RedisResponse,
14};
15use crate::commands::resp::RespCommand;
16use crate::commands::set::{self, Set};
17use crate::commands::setex::{self, SetEx};
18use crate::commands::ttl::{self, Ttl};
19use crate::connection::ScnpConnection;
20use crate::error::{Result, ShardCacheClientError};
21#[cfg(feature = "redis")]
22use crate::routing::ShardCacheRoute;
23use crate::routing::{ShardCacheDirectRouter, ShardCacheRouteMode};
24
25#[cfg(feature = "redis")]
26#[derive(Debug, Clone, Copy)]
27enum RedisPipelineResponse {
28 Native,
29 Resp,
30}
31
32#[derive(Debug)]
34pub struct ShardCacheClient {
35 conn: ScnpConnection,
36 #[cfg(feature = "redis")]
37 redis_pipeline_responses: VecDeque<RedisPipelineResponse>,
38}
39
40impl ShardCacheClient {
41 pub fn connect(addr: impl ToSocketAddrs) -> Result<Self> {
43 Ok(Self {
44 conn: ScnpConnection::connect(addr)?,
45 #[cfg(feature = "redis")]
46 redis_pipeline_responses: VecDeque::new(),
47 })
48 }
49
50 pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
52 self.conn.execute(Get::new(key, out))
53 }
54
55 pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
57 self.conn.execute(Set::new(key, value))
58 }
59
60 pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
62 self.conn.execute(SetEx::new(key, value, ttl_ms))
63 }
64
65 pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
67 self.conn.execute(GetEx::new(key, ttl_ms, out))
68 }
69
70 pub fn del(&mut self, key: &[u8]) -> Result<bool> {
72 self.conn.execute(Del::new(key))
73 }
74
75 pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
77 self.conn.execute(Exists::new(key))
78 }
79
80 pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
82 self.conn.execute(Ttl::new(key))
83 }
84
85 pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
87 self.conn.execute(Expire::new(key, ttl_ms))
88 }
89
90 #[cfg(feature = "redis")]
92 pub fn redis(&mut self) -> crate::Redis<'_, Self> {
93 crate::Redis::new(self)
94 }
95
96 #[cfg(feature = "redis")]
98 pub fn redis_command(
99 &mut self,
100 command: RedisCommandKind,
101 args: &[&[u8]],
102 ) -> Result<RedisResponse> {
103 self.conn.execute(OptimizedRedisCommand::new(command, args))
104 }
105
106 #[cfg(feature = "redis")]
111 pub fn redis_command_by_name(
112 &mut self,
113 command: &[u8],
114 args: &[&[u8]],
115 ) -> Result<RedisResponse> {
116 match RedisCommandKind::from_name(command) {
117 Some(command) => self.redis_command(command, args),
118 None => self.redis_resp_command(command, args),
119 }
120 }
121
122 #[cfg(feature = "redis")]
127 pub fn redis_resp_command(&mut self, command: &[u8], args: &[&[u8]]) -> Result<RedisResponse> {
128 validate_redis_command_name(command)?;
129 self.conn.execute(RedisRespCommand::new(command, args))
130 }
131
132 pub fn resp_command_into(&mut self, parts: &[&[u8]], out: &mut Vec<u8>) -> Result<bool> {
137 self.conn.execute(RespCommand::new(parts, out))
138 }
139
140 pub fn scan_resp_into(&mut self, cursor: u64, count: usize, out: &mut Vec<u8>) -> Result<bool> {
142 let cursor = cursor.to_string();
143 let count = count.to_string();
144 self.resp_command_into(
145 &[b"SCNP.SCAN", cursor.as_bytes(), b"COUNT", count.as_bytes()],
146 out,
147 )
148 }
149
150 pub fn scan_shard_resp_into(
153 &mut self,
154 shard_id: usize,
155 cursor: u64,
156 count: usize,
157 out: &mut Vec<u8>,
158 ) -> Result<bool> {
159 let shard_id = shard_id.to_string();
160 let cursor = cursor.to_string();
161 let count = count.to_string();
162 self.resp_command_into(
163 &[
164 b"SCNP.SCANSHARD",
165 shard_id.as_bytes(),
166 cursor.as_bytes(),
167 b"COUNT",
168 count.as_bytes(),
169 ],
170 out,
171 )
172 }
173
174 pub fn begin_pipeline_get(&mut self, key: &[u8]) -> Result<()> {
176 get::write_request(&mut self.conn, None, key)
177 }
178
179 pub fn begin_pipeline_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
181 set::write_request(&mut self.conn, None, key, value)
182 }
183
184 pub fn begin_pipeline_set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
186 setex::write_request(&mut self.conn, None, key, value, ttl_ms)
187 }
188
189 pub fn begin_pipeline_get_ex(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
191 getex::write_request(&mut self.conn, None, key, ttl_ms)
192 }
193
194 pub fn begin_pipeline_del(&mut self, key: &[u8]) -> Result<()> {
196 del::write_request(&mut self.conn, None, key)
197 }
198
199 pub fn begin_pipeline_exists(&mut self, key: &[u8]) -> Result<()> {
201 exists::write_request(&mut self.conn, None, key)
202 }
203
204 pub fn begin_pipeline_ttl(&mut self, key: &[u8]) -> Result<()> {
206 ttl::write_request(&mut self.conn, None, key)
207 }
208
209 pub fn begin_pipeline_expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
211 expire::write_request(&mut self.conn, None, key, ttl_ms)
212 }
213
214 #[cfg(feature = "redis")]
216 pub fn begin_pipeline_redis_command(
217 &mut self,
218 command: RedisCommandKind,
219 args: &[&[u8]],
220 ) -> Result<()> {
221 redis::write_request(&mut self.conn, command, None, args)?;
222 self.redis_pipeline_responses
223 .push_back(RedisPipelineResponse::Native);
224 Ok(())
225 }
226
227 #[cfg(feature = "redis")]
234 pub fn begin_pipeline_redis_command_by_name(
235 &mut self,
236 command: &[u8],
237 args: &[&[u8]],
238 ) -> Result<()> {
239 match RedisCommandKind::from_name(command) {
240 Some(command) => self.begin_pipeline_redis_command(command, args),
241 None => self.begin_pipeline_redis_resp_command(command, args),
242 }
243 }
244
245 #[cfg(feature = "redis")]
247 pub fn begin_pipeline_redis_resp_command(
248 &mut self,
249 command: &[u8],
250 args: &[&[u8]],
251 ) -> Result<()> {
252 validate_redis_command_name(command)?;
253 redis::write_resp_request(&mut self.conn, command, args)?;
254 self.redis_pipeline_responses
255 .push_back(RedisPipelineResponse::Resp);
256 Ok(())
257 }
258
259 pub fn flush_pipeline(&mut self) -> Result<()> {
261 self.conn.flush()
262 }
263
264 pub fn finish_pipeline_get_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
266 self.conn
267 .read_value(<Get as crate::commands::ScnpCommand>::NAME, out)
268 }
269
270 pub fn finish_pipeline_set(&mut self) -> Result<()> {
272 self.conn
273 .expect_ok(<Set as crate::commands::ScnpCommand>::NAME)
274 }
275
276 pub fn finish_pipeline_set_ex(&mut self) -> Result<()> {
278 self.conn
279 .expect_ok(<SetEx as crate::commands::ScnpCommand>::NAME)
280 }
281
282 pub fn finish_pipeline_get_ex_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
284 self.conn
285 .read_value(<GetEx as crate::commands::ScnpCommand>::NAME, out)
286 }
287
288 pub fn finish_pipeline_del(&mut self) -> Result<bool> {
290 self.conn
291 .read_integer(<Del as crate::commands::ScnpCommand>::NAME)
292 .map(|deleted| deleted != 0)
293 }
294
295 pub fn finish_pipeline_exists(&mut self) -> Result<bool> {
297 self.conn
298 .read_integer(<Exists as crate::commands::ScnpCommand>::NAME)
299 .map(|exists| exists != 0)
300 }
301
302 pub fn finish_pipeline_ttl(&mut self) -> Result<i64> {
304 self.conn
305 .read_integer(<Ttl as crate::commands::ScnpCommand>::NAME)
306 }
307
308 pub fn finish_pipeline_expire(&mut self) -> Result<bool> {
310 self.conn
311 .read_integer(<Expire as crate::commands::ScnpCommand>::NAME)
312 .map(|changed| changed != 0)
313 }
314
315 #[cfg(feature = "redis")]
317 pub fn finish_pipeline_redis_command(&mut self) -> Result<RedisResponse> {
318 match self
319 .redis_pipeline_responses
320 .pop_front()
321 .unwrap_or(RedisPipelineResponse::Native)
322 {
323 RedisPipelineResponse::Native => self.conn.read_native_redis_response("REDIS"),
324 RedisPipelineResponse::Resp => self.conn.read_resp_redis_response("RESP"),
325 }
326 }
327}
328
329impl ShardCacheDirectRouter {
330 pub fn connect_shard(&self, shard_id: usize) -> Result<ShardCacheDirectShardClient> {
332 Ok(ShardCacheDirectShardClient {
333 router: *self,
334 shard_id,
335 conn: ScnpConnection::connect(self.shard_addr(shard_id)?)?,
336 })
337 }
338}
339
340#[derive(Debug)]
342pub struct ShardCacheDirectClient {
343 router: ShardCacheDirectRouter,
344 conns: Vec<ScnpConnection>,
345}
346
347impl ShardCacheDirectClient {
348 pub fn connect(addr: impl ToSocketAddrs, shard_count: usize) -> Result<Self> {
352 let router = ShardCacheDirectRouter::new(addr, shard_count)?;
353 Self::connect_with_router(router)
354 }
355
356 pub fn connect_with_route_mode(
358 addr: impl ToSocketAddrs,
359 shard_count: usize,
360 route_mode: ShardCacheRouteMode,
361 ) -> Result<Self> {
362 let router = ShardCacheDirectRouter::new(addr, shard_count)?.with_route_mode(route_mode);
363 Self::connect_with_router(router)
364 }
365
366 fn connect_with_router(router: ShardCacheDirectRouter) -> Result<Self> {
367 let mut conns = Vec::with_capacity(router.shard_count());
368 for shard_id in 0..router.shard_count() {
369 conns.push(ScnpConnection::connect(router.shard_addr(shard_id)?)?);
370 }
371 Ok(Self { router, conns })
372 }
373
374 pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
376 let route = self.router.route_key(key);
377 self.conns[route.shard_id].execute(Get::routed(route, key, out))
378 }
379
380 pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
382 let route = self.router.route_key(key);
383 self.conns[route.shard_id].execute(Set::routed(route, key, value))
384 }
385
386 pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
388 let route = self.router.route_key(key);
389 self.conns[route.shard_id].execute(SetEx::routed(route, key, value, ttl_ms))
390 }
391
392 pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
394 let route = self.router.route_key(key);
395 self.conns[route.shard_id].execute(GetEx::routed(route, key, ttl_ms, out))
396 }
397
398 pub fn del(&mut self, key: &[u8]) -> Result<bool> {
400 let route = self.router.route_key(key);
401 self.conns[route.shard_id].execute(Del::routed(route, key))
402 }
403
404 pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
406 let route = self.router.route_key(key);
407 self.conns[route.shard_id].execute(Exists::routed(route, key))
408 }
409
410 pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
412 let route = self.router.route_key(key);
413 self.conns[route.shard_id].execute(Ttl::routed(route, key))
414 }
415
416 pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
418 let route = self.router.route_key(key);
419 self.conns[route.shard_id].execute(Expire::routed(route, key, ttl_ms))
420 }
421
422 #[cfg(feature = "redis")]
424 pub fn redis(&mut self) -> crate::Redis<'_, Self> {
425 crate::Redis::new(self)
426 }
427
428 #[cfg(feature = "redis")]
433 pub fn redis_command(
434 &mut self,
435 command: RedisCommandKind,
436 args: &[&[u8]],
437 ) -> Result<RedisResponse> {
438 let route = redis_direct_route(&self.router, command, args)?;
439 let shard_id = route.map_or(0, |route| route.shard_id);
440 self.conns[shard_id].execute(OptimizedRedisCommand::routed(command, route, args))
441 }
442
443 #[cfg(feature = "redis")]
445 pub fn redis_command_by_name(
446 &mut self,
447 command: &[u8],
448 args: &[&[u8]],
449 ) -> Result<RedisResponse> {
450 self.redis_command(redis_command_kind_from_name(command)?, args)
451 }
452
453 pub fn scan_shard_resp_into(
457 &mut self,
458 shard_id: usize,
459 cursor: u64,
460 count: usize,
461 out: &mut Vec<u8>,
462 ) -> Result<bool> {
463 if shard_id >= self.conns.len() {
464 return Err(ShardCacheClientError::Config(format!(
465 "shard {shard_id} is outside configured shard count {}",
466 self.conns.len()
467 )));
468 }
469 let shard_id_text = shard_id.to_string();
470 let cursor = cursor.to_string();
471 let count = count.to_string();
472 self.conns[shard_id].execute(RespCommand::new(
473 &[
474 b"SCNP.SCANSHARD",
475 shard_id_text.as_bytes(),
476 cursor.as_bytes(),
477 b"COUNT",
478 count.as_bytes(),
479 ],
480 out,
481 ))
482 }
483}
484
485#[derive(Debug)]
489pub struct ShardCacheDirectShardClient {
490 router: ShardCacheDirectRouter,
491 shard_id: usize,
492 conn: ScnpConnection,
493}
494
495impl ShardCacheDirectShardClient {
496 pub fn shard_id(&self) -> usize {
498 self.shard_id
499 }
500
501 pub fn get_into(&mut self, key: &[u8], out: &mut Vec<u8>) -> Result<bool> {
503 let route = self.checked_route(key)?;
504 self.conn.execute(Get::routed(route, key, out))
505 }
506
507 pub fn set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
509 let route = self.checked_route(key)?;
510 self.conn.execute(Set::routed(route, key, value))
511 }
512
513 pub fn set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
515 let route = self.checked_route(key)?;
516 self.conn.execute(SetEx::routed(route, key, value, ttl_ms))
517 }
518
519 pub fn get_ex_into(&mut self, key: &[u8], ttl_ms: u64, out: &mut Vec<u8>) -> Result<bool> {
521 let route = self.checked_route(key)?;
522 self.conn.execute(GetEx::routed(route, key, ttl_ms, out))
523 }
524
525 pub fn del(&mut self, key: &[u8]) -> Result<bool> {
527 let route = self.checked_route(key)?;
528 self.conn.execute(Del::routed(route, key))
529 }
530
531 pub fn exists(&mut self, key: &[u8]) -> Result<bool> {
533 let route = self.checked_route(key)?;
534 self.conn.execute(Exists::routed(route, key))
535 }
536
537 pub fn ttl(&mut self, key: &[u8]) -> Result<i64> {
539 let route = self.checked_route(key)?;
540 self.conn.execute(Ttl::routed(route, key))
541 }
542
543 pub fn expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<bool> {
545 let route = self.checked_route(key)?;
546 self.conn.execute(Expire::routed(route, key, ttl_ms))
547 }
548
549 #[cfg(feature = "redis")]
551 pub fn redis(&mut self) -> crate::Redis<'_, Self> {
552 crate::Redis::new(self)
553 }
554
555 #[cfg(feature = "redis")]
560 pub fn redis_command(
561 &mut self,
562 command: RedisCommandKind,
563 args: &[&[u8]],
564 ) -> Result<RedisResponse> {
565 let route = redis_direct_shard_route(&self.router, self.shard_id, command, args)?;
566 self.conn
567 .execute(OptimizedRedisCommand::routed(command, route, args))
568 }
569
570 #[cfg(feature = "redis")]
572 pub fn redis_command_by_name(
573 &mut self,
574 command: &[u8],
575 args: &[&[u8]],
576 ) -> Result<RedisResponse> {
577 self.redis_command(redis_command_kind_from_name(command)?, args)
578 }
579
580 pub fn scan_resp_into(&mut self, cursor: u64, count: usize, out: &mut Vec<u8>) -> Result<bool> {
582 let shard_id = self.shard_id.to_string();
583 let cursor = cursor.to_string();
584 let count = count.to_string();
585 self.conn.execute(RespCommand::new(
586 &[
587 b"SCNP.SCANSHARD",
588 shard_id.as_bytes(),
589 cursor.as_bytes(),
590 b"COUNT",
591 count.as_bytes(),
592 ],
593 out,
594 ))
595 }
596
597 pub fn begin_pipeline_get(&mut self, key: &[u8]) -> Result<()> {
599 let route = self.checked_route(key)?;
600 get::write_request(&mut self.conn, Some(route), key)
601 }
602
603 pub fn begin_pipeline_set(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
605 let route = self.checked_route(key)?;
606 set::write_request(&mut self.conn, Some(route), key, value)
607 }
608
609 pub fn begin_pipeline_set_ex(&mut self, key: &[u8], value: &[u8], ttl_ms: u64) -> Result<()> {
611 let route = self.checked_route(key)?;
612 setex::write_request(&mut self.conn, Some(route), key, value, ttl_ms)
613 }
614
615 pub fn begin_pipeline_get_ex(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
617 let route = self.checked_route(key)?;
618 getex::write_request(&mut self.conn, Some(route), key, ttl_ms)
619 }
620
621 pub fn begin_pipeline_del(&mut self, key: &[u8]) -> Result<()> {
623 let route = self.checked_route(key)?;
624 del::write_request(&mut self.conn, Some(route), key)
625 }
626
627 pub fn begin_pipeline_exists(&mut self, key: &[u8]) -> Result<()> {
629 let route = self.checked_route(key)?;
630 exists::write_request(&mut self.conn, Some(route), key)
631 }
632
633 pub fn begin_pipeline_ttl(&mut self, key: &[u8]) -> Result<()> {
635 let route = self.checked_route(key)?;
636 ttl::write_request(&mut self.conn, Some(route), key)
637 }
638
639 pub fn begin_pipeline_expire(&mut self, key: &[u8], ttl_ms: u64) -> Result<()> {
641 let route = self.checked_route(key)?;
642 expire::write_request(&mut self.conn, Some(route), key, ttl_ms)
643 }
644
645 #[cfg(feature = "redis")]
647 pub fn begin_pipeline_redis_command(
648 &mut self,
649 command: RedisCommandKind,
650 args: &[&[u8]],
651 ) -> Result<()> {
652 let route = redis_direct_shard_route(&self.router, self.shard_id, command, args)?;
653 redis::write_request(&mut self.conn, command, route, args)
654 }
655
656 #[cfg(feature = "redis")]
658 pub fn begin_pipeline_redis_command_by_name(
659 &mut self,
660 command: &[u8],
661 args: &[&[u8]],
662 ) -> Result<()> {
663 self.begin_pipeline_redis_command(redis_command_kind_from_name(command)?, args)
664 }
665
666 pub fn flush_pipeline(&mut self) -> Result<()> {
668 self.conn.flush()
669 }
670
671 pub fn finish_pipeline_get_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
673 self.conn
674 .read_value(<Get as crate::commands::ScnpCommand>::NAME, out)
675 }
676
677 pub fn finish_pipeline_set(&mut self) -> Result<()> {
679 self.conn
680 .expect_ok(<Set as crate::commands::ScnpCommand>::NAME)
681 }
682
683 pub fn finish_pipeline_set_ex(&mut self) -> Result<()> {
685 self.conn
686 .expect_ok(<SetEx as crate::commands::ScnpCommand>::NAME)
687 }
688
689 pub fn finish_pipeline_get_ex_into(&mut self, out: &mut Vec<u8>) -> Result<bool> {
691 self.conn
692 .read_value(<GetEx as crate::commands::ScnpCommand>::NAME, out)
693 }
694
695 pub fn finish_pipeline_del(&mut self) -> Result<bool> {
697 self.conn
698 .read_integer(<Del as crate::commands::ScnpCommand>::NAME)
699 .map(|deleted| deleted != 0)
700 }
701
702 pub fn finish_pipeline_exists(&mut self) -> Result<bool> {
704 self.conn
705 .read_integer(<Exists as crate::commands::ScnpCommand>::NAME)
706 .map(|exists| exists != 0)
707 }
708
709 pub fn finish_pipeline_ttl(&mut self) -> Result<i64> {
711 self.conn
712 .read_integer(<Ttl as crate::commands::ScnpCommand>::NAME)
713 }
714
715 pub fn finish_pipeline_expire(&mut self) -> Result<bool> {
717 self.conn
718 .read_integer(<Expire as crate::commands::ScnpCommand>::NAME)
719 .map(|changed| changed != 0)
720 }
721
722 #[cfg(feature = "redis")]
724 pub fn finish_pipeline_redis_command(&mut self) -> Result<RedisResponse> {
725 self.conn.read_native_redis_response("REDIS")
726 }
727
728 fn checked_route(&self, key: &[u8]) -> Result<crate::routing::ShardCacheRoute> {
729 let route = self.router.route_key(key);
730 if route.shard_id != self.shard_id {
731 return Err(ShardCacheClientError::Config(format!(
732 "key routes to shard {}, but client is connected to shard {}",
733 route.shard_id, self.shard_id
734 )));
735 }
736 Ok(route)
737 }
738}
739
740#[cfg(feature = "redis")]
741fn redis_command_kind_from_name(command: &[u8]) -> Result<RedisCommandKind> {
742 RedisCommandKind::from_name(command).ok_or_else(|| {
743 ShardCacheClientError::Config(format!(
744 "Redis command `{}` is not available on direct SCNP shard clients; use ShardCacheClient on the fanout listener for command-name fallback",
745 String::from_utf8_lossy(command)
746 ))
747 })
748}
749
750#[cfg(feature = "redis")]
751fn validate_redis_command_name(command: &[u8]) -> Result<()> {
752 if command.is_empty() {
753 return Err(ShardCacheClientError::Config(
754 "Redis command name cannot be empty".into(),
755 ));
756 }
757 if command.iter().any(|byte| byte.is_ascii_whitespace()) {
758 return Err(ShardCacheClientError::Config(format!(
759 "Redis command name cannot contain whitespace: `{}`",
760 String::from_utf8_lossy(command)
761 )));
762 }
763 Ok(())
764}
765
766#[cfg(feature = "redis")]
767fn redis_direct_route(
768 router: &ShardCacheDirectRouter,
769 command: RedisCommandKind,
770 args: &[&[u8]],
771) -> Result<Option<ShardCacheRoute>> {
772 let keys = match command.route_keys(args) {
773 RedisCommandRouteKeys::None => return Ok(None),
774 RedisCommandRouteKeys::AllShards => {
775 return Err(ShardCacheClientError::Config(format!(
776 "{} requires all shards; use ShardCacheClient on the fanout listener",
777 command.name()
778 )));
779 }
780 RedisCommandRouteKeys::Keys(keys) if keys.is_empty() => return Ok(None),
781 RedisCommandRouteKeys::Keys(keys) => keys,
782 };
783
784 let first_route = router.route_key(keys[0]);
785 for key in keys.iter().skip(1) {
786 let route = router.route_key(key);
787 if route.shard_id != first_route.shard_id {
788 return Err(ShardCacheClientError::Config(format!(
789 "{} keys span multiple direct shards",
790 command.name()
791 )));
792 }
793 }
794 Ok(Some(first_route))
795}
796
797#[cfg(feature = "redis")]
798fn redis_direct_shard_route(
799 router: &ShardCacheDirectRouter,
800 shard_id: usize,
801 command: RedisCommandKind,
802 args: &[&[u8]],
803) -> Result<Option<ShardCacheRoute>> {
804 let route = redis_direct_route(router, command, args)?;
805 if let Some(route) = route
806 && route.shard_id != shard_id
807 {
808 return Err(ShardCacheClientError::Config(format!(
809 "{} routes to shard {}, but client is connected to shard {}",
810 command.name(),
811 route.shard_id,
812 shard_id
813 )));
814 }
815 Ok(route)
816}