redis_driver/clients/
multiplexed_client.rs1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10 resp::{Command, CommandArg, FromValue, SingleArgOrCollection, Value},
11 BitmapCommands, Cache, ClientTrait, ClusterCommands, ConnectionCommands, Future,
12 GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands, InnerClient,
13 InternalPubSubCommands, IntoConfig, ListCommands, Pipeline, PreparedCommand, PubSubCommands,
14 PubSubStream, Result, ScriptingCommands, SentinelCommands, ServerCommands, SetCommands,
15 SortedSetCommands, StreamCommands, StringCommands, Transaction,
16};
17#[cfg(feature = "redis-bloom")]
18use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
19use std::future::IntoFuture;
20
21#[derive(Clone)]
33pub struct MultiplexedClient {
34 inner_client: InnerClient,
35}
36
37impl MultiplexedClient {
38 pub async fn connect(config: impl IntoConfig) -> Result<Self> {
43 let inner_client = InnerClient::connect(config).await?;
44 Ok(Self { inner_client })
45 }
46
47 pub async fn send(&mut self, command: Command) -> Result<Value> {
77 self.inner_client.send(command).await
78 }
79
80 pub fn send_and_forget(&mut self, command: Command) -> Result<()> {
85 self.inner_client.send_and_forget(command)
86 }
87
88 pub async fn send_batch(&mut self, commands: Vec<Command>) -> Result<Value> {
93 self.inner_client.send_batch(commands).await
94 }
95
96 pub fn create_pipeline(&mut self) -> Pipeline {
98 self.inner_client.create_pipeline()
99 }
100
101 pub fn create_transaction(&mut self) -> Transaction {
111 Transaction::new(self.inner_client.clone())
112 }
113}
114
115impl ClientTrait for MultiplexedClient {
116 fn create_pipeline(&mut self) -> Pipeline {
117 self.create_pipeline()
118 }
119
120 fn get_cache(&mut self) -> &mut Cache {
121 self.inner_client.get_cache()
122 }
123}
124
125pub trait MultiplexedPreparedCommand<'a, R>
126where
127 R: FromValue,
128{
129 fn forget(self) -> Result<()>;
134}
135
136impl<'a, R> MultiplexedPreparedCommand<'a, R> for PreparedCommand<'a, MultiplexedClient, R>
137where
138 R: FromValue + Send + 'a,
139{
140 fn forget(self) -> Result<()> {
145 self.executor.send_and_forget(self.command)
146 }
147}
148
149impl<'a, R> IntoFuture for PreparedCommand<'a, MultiplexedClient, R>
150where
151 R: FromValue + Send + 'a,
152{
153 type Output = Result<R>;
154 type IntoFuture = Future<'a, R>;
155
156 fn into_future(self) -> Self::IntoFuture {
157 Box::pin(async move {
158 if self.keep_command_for_result {
159 let command_for_result = self.command.clone();
160 self.executor
161 .send(self.command)
162 .await?
163 .into_with_command(&command_for_result)
164 } else if let Some(post_process) = self.post_process {
165 let command_for_result = self.command.clone();
166 let result = self.executor.send(self.command).await?;
167 post_process(result, command_for_result, self.executor).await
168 } else {
169 self.executor.send(self.command).await?.into()
170 }
171 })
172 }
173}
174
175impl BitmapCommands for MultiplexedClient {}
176#[cfg(feature = "redis-bloom")]
177impl BloomCommands for MultiplexedClient {}
178impl ClusterCommands for MultiplexedClient {}
179impl ConnectionCommands for MultiplexedClient {}
180#[cfg(feature = "redis-bloom")]
181impl CountMinSketchCommands for MultiplexedClient {}
182#[cfg(feature = "redis-bloom")]
183impl CuckooCommands for MultiplexedClient {}
184impl GenericCommands for MultiplexedClient {}
185impl GeoCommands for MultiplexedClient {}
186#[cfg(feature = "redis-graph")]
187impl GraphCommands for MultiplexedClient {}
188impl HashCommands for MultiplexedClient {}
189impl HyperLogLogCommands for MultiplexedClient {}
190impl InternalPubSubCommands for MultiplexedClient {}
191#[cfg(feature = "redis-json")]
192impl JsonCommands for MultiplexedClient {}
193impl ListCommands for MultiplexedClient {}
194impl ScriptingCommands for MultiplexedClient {}
195#[cfg(feature = "redis-search")]
196impl SearchCommands for MultiplexedClient {}
197impl SentinelCommands for MultiplexedClient {}
198impl ServerCommands for MultiplexedClient {}
199impl SetCommands for MultiplexedClient {}
200impl SortedSetCommands for MultiplexedClient {}
201impl StreamCommands for MultiplexedClient {}
202impl StringCommands for MultiplexedClient {}
203#[cfg(feature = "redis-bloom")]
204impl TDigestCommands for MultiplexedClient {}
205#[cfg(feature = "redis-time-series")]
206impl TimeSeriesCommands for MultiplexedClient {}
207#[cfg(feature = "redis-bloom")]
208impl TopKCommands for MultiplexedClient {}
209
210impl PubSubCommands for MultiplexedClient {
211 fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
212 where
213 C: Into<CommandArg> + Send + 'a,
214 CC: SingleArgOrCollection<C>,
215 {
216 self.inner_client.subscribe(channels)
217 }
218
219 fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
220 where
221 P: Into<CommandArg> + Send + 'a,
222 PP: SingleArgOrCollection<P>,
223 {
224 self.inner_client.psubscribe(patterns)
225 }
226
227 fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
228 where
229 C: Into<CommandArg> + Send + 'a,
230 CC: SingleArgOrCollection<C>,
231 {
232 self.inner_client.ssubscribe(shardchannels)
233 }
234}