redis_driver/clients/
multiplexed_client.rs

1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10    resp::{Command, CommandArg, FromValue, SingleArgOrCollection, Value},
11    BitmapCommands, Cache, ClientTrait, ClusterCommands, ConnectionCommands, Future,
12    GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands, InnerClient,
13    InternalPubSubCommands, IntoConfig, ListCommands, Pipeline, PreparedCommand, PubSubCommands,
14    PubSubStream, Result, ScriptingCommands, SentinelCommands, ServerCommands, SetCommands,
15    SortedSetCommands, StreamCommands, StringCommands, Transaction,
16};
17#[cfg(feature = "redis-bloom")]
18use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
19use std::future::IntoFuture;
20
21/// A multiplexed client that can be cloned, allowing requests
22/// to be be sent concurrently on the same underlying connection.
23///
24/// Compared to a [single client](crate::Client), a multiplexed client cannot offers access
25/// to all existing Redis commands.
26/// Transactions and [blocking commands](crate::BlockingCommands) are not compatible with a multiplexed client
27/// because they monopolize the whole connection which cannot be shared anymore. It means other consumers of the same
28/// multiplexed client will be blocked each time a transaction or a blocking command is in progress, losing the advantage
29/// of a shared connection.
30///
31/// #See also [Multiplexing Explained](https://redis.com/blog/multiplexing-explained/)
32#[derive(Clone)]
33pub struct MultiplexedClient {
34    inner_client: InnerClient,
35}
36
37impl MultiplexedClient {
38    /// Connects asynchronously to the Redis server.
39    ///
40    /// # Errors
41    /// Any Redis driver [`Error`](crate::Error) that occurs during the connection operation
42    pub async fn connect(config: impl IntoConfig) -> Result<Self> {
43        let inner_client = InnerClient::connect(config).await?;
44        Ok(Self { inner_client })
45    }
46
47    /// Send an arbitrary command to the Redis server.
48    ///
49    /// This is used primarily intended for implementing high level commands API
50    /// but may also be used to provide access to new features that lack a direct API.
51    ///
52    /// # Arguments
53    /// * `name` - Command name in uppercase.
54    /// * `args` - Command arguments which can be provided as arrays (up to 4 elements) or vectors of [`CommandArg`](crate::resp::CommandArg).
55    ///
56    /// # Errors
57    /// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
58    ///
59    /// # Example
60    /// ```
61    /// use redis_driver::{resp::cmd, MultiplexedClient, Result};
62    ///
63    /// #[tokio::main]
64    /// async fn main() -> Result<()> {
65    ///     let mut client = MultiplexedClient::connect("127.0.0.1:6379").await?;
66    ///
67    ///     let values: Vec<String> = client
68    ///         .send(cmd("MGET").arg("key1").arg("key2").arg("key3").arg("key4"))
69    ///         .await?
70    ///         .into()?;
71    ///     println!("{:?}", values);
72    ///
73    ///     Ok(())
74    /// }
75    /// ```
76    pub async fn send(&mut self, command: Command) -> Result<Value> {
77        self.inner_client.send(command).await
78    }
79
80    /// Send command to the Redis server and forget its response.
81    ///
82    /// # Errors
83    /// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
84    pub fn send_and_forget(&mut self, command: Command) -> Result<()> {
85        self.inner_client.send_and_forget(command)
86    }
87
88    /// Send a command batch to the Redis server.
89    ///
90    /// # Errors
91    /// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
92    pub async fn send_batch(&mut self, commands: Vec<Command>) -> Result<Value> {
93        self.inner_client.send_batch(commands).await
94    }
95
96    /// Create a new pipeline
97    pub fn create_pipeline(&mut self) -> Pipeline {
98        self.inner_client.create_pipeline()
99    }
100
101    /// Create a new transaction
102    ///
103    /// Because of the multiplexed nature of the client,
104    /// [`watch`](crate::TransactionCommands::watch) &
105    /// [`unwatch`](crate::TransactionCommands::unwatch)
106    /// commands cannot be supported.
107    /// To be able to use these commands with a transaction,
108    /// [`Client`](crate::Client) or [`PooledClientManager`](crate::PooledClientManager)
109    /// should be used instead
110    pub fn create_transaction(&mut self) -> Transaction {
111        Transaction::new(self.inner_client.clone())
112    }
113}
114
115impl ClientTrait for MultiplexedClient {
116    fn create_pipeline(&mut self) -> Pipeline {
117        self.create_pipeline()
118    }
119
120    fn get_cache(&mut self) -> &mut Cache {
121        self.inner_client.get_cache()
122    }
123}
124
125pub trait MultiplexedPreparedCommand<'a, R>
126where
127    R: FromValue,
128{
129    /// Send command and forget its response
130    ///
131    /// # Errors
132    /// Any Redis driver [`Error`](crate::Error) that occur during the send operation
133    fn forget(self) -> Result<()>;
134}
135
136impl<'a, R> MultiplexedPreparedCommand<'a, R> for PreparedCommand<'a, MultiplexedClient, R>
137where
138    R: FromValue + Send + 'a,
139{
140    /// Send command and forget its response
141    ///
142    /// # Errors
143    /// Any Redis driver [`Error`](crate::Error) that occur during the send operation
144    fn forget(self) -> Result<()> {
145        self.executor.send_and_forget(self.command)
146    }
147}
148
149impl<'a, R> IntoFuture for PreparedCommand<'a, MultiplexedClient, R>
150where
151    R: FromValue + Send + 'a,
152{
153    type Output = Result<R>;
154    type IntoFuture = Future<'a, R>;
155
156    fn into_future(self) -> Self::IntoFuture {
157        Box::pin(async move {
158            if self.keep_command_for_result {
159                let command_for_result = self.command.clone();
160                self.executor
161                    .send(self.command)
162                    .await?
163                    .into_with_command(&command_for_result)
164            } else if let Some(post_process) = self.post_process {
165                let command_for_result = self.command.clone();
166                let result = self.executor.send(self.command).await?;
167                post_process(result, command_for_result, self.executor).await
168            } else {
169                self.executor.send(self.command).await?.into()
170            }
171        })
172    }
173}
174
175impl BitmapCommands for MultiplexedClient {}
176#[cfg(feature = "redis-bloom")]
177impl BloomCommands for MultiplexedClient {}
178impl ClusterCommands for MultiplexedClient {}
179impl ConnectionCommands for MultiplexedClient {}
180#[cfg(feature = "redis-bloom")]
181impl CountMinSketchCommands for MultiplexedClient {}
182#[cfg(feature = "redis-bloom")]
183impl CuckooCommands for MultiplexedClient {}
184impl GenericCommands for MultiplexedClient {}
185impl GeoCommands for MultiplexedClient {}
186#[cfg(feature = "redis-graph")]
187impl GraphCommands for MultiplexedClient {}
188impl HashCommands for MultiplexedClient {}
189impl HyperLogLogCommands for MultiplexedClient {}
190impl InternalPubSubCommands for MultiplexedClient {}
191#[cfg(feature = "redis-json")]
192impl JsonCommands for MultiplexedClient {}
193impl ListCommands for MultiplexedClient {}
194impl ScriptingCommands for MultiplexedClient {}
195#[cfg(feature = "redis-search")]
196impl SearchCommands for MultiplexedClient {}
197impl SentinelCommands for MultiplexedClient {}
198impl ServerCommands for MultiplexedClient {}
199impl SetCommands for MultiplexedClient {}
200impl SortedSetCommands for MultiplexedClient {}
201impl StreamCommands for MultiplexedClient {}
202impl StringCommands for MultiplexedClient {}
203#[cfg(feature = "redis-bloom")]
204impl TDigestCommands for MultiplexedClient {}
205#[cfg(feature = "redis-time-series")]
206impl TimeSeriesCommands for MultiplexedClient {}
207#[cfg(feature = "redis-bloom")]
208impl TopKCommands for MultiplexedClient {}
209
210impl PubSubCommands for MultiplexedClient {
211    fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
212    where
213        C: Into<CommandArg> + Send + 'a,
214        CC: SingleArgOrCollection<C>,
215    {
216        self.inner_client.subscribe(channels)
217    }
218
219    fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
220    where
221        P: Into<CommandArg> + Send + 'a,
222        PP: SingleArgOrCollection<P>,
223    {
224        self.inner_client.psubscribe(patterns)
225    }
226
227    fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
228    where
229        C: Into<CommandArg> + Send + 'a,
230        CC: SingleArgOrCollection<C>,
231    {
232        self.inner_client.ssubscribe(shardchannels)
233    }
234}