redis_driver/clients/
client.rs

1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10    network::{MonitorReceiver, MonitorSender},
11    resp::{cmd, Command, CommandArg, FromValue, ResultValueExt, SingleArgOrCollection, Value},
12    BitmapCommands, BlockingCommands, ClientTrait, ClusterCommands, ConnectionCommands, Future,
13    GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands, InnerClient,
14    InternalPubSubCommands, IntoConfig, ListCommands, Message, MonitorStream, Pipeline,
15    PreparedCommand, PubSubCommands, PubSubStream, Result, ScriptingCommands, SentinelCommands,
16    ServerCommands, SetCommands, SortedSetCommands, StreamCommands, StringCommands, Transaction,
17    TransactionCommands, ValueReceiver, ValueSender,
18};
19#[cfg(feature = "redis-bloom")]
20use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
21use futures::channel::{mpsc, oneshot};
22use std::future::IntoFuture;
23
24/// Client with a unique connection to a Redis server.
25pub struct Client {
26    inner_client: InnerClient,
27}
28
29impl Client {
30    /// Connects asynchronously to the Redis server.
31    ///
32    /// # Errors
33    /// Any Redis driver [`Error`](crate::Error) that occurs during the connection operation
34    pub async fn connect(config: impl IntoConfig) -> Result<Self> {
35        let inner_client = InnerClient::connect(config).await?;
36        Ok(Self { inner_client })
37    }
38
39    /// We don't want the Client struct to be publicly cloneable
40    /// If one wants to consume a multiplexed client,
41    /// the [MultiplexedClient](crate::MultiplexedClient) must be used instead
42    pub(crate) fn clone(&self) -> Client {
43        Client {
44            inner_client: self.inner_client.clone(),
45        }
46    }
47
48    /// Send an arbitrary command to the server.
49    ///
50    /// This is used primarily intended for implementing high level commands API
51    /// but may also be used to provide access to new features that lack a direct API.
52    ///
53    /// # Arguments
54    /// * `name` - Command name in uppercase.
55    /// * `args` - Command arguments which can be provided as arrays (up to 4 elements) or vectors of [`CommandArg`](crate::resp::CommandArg).
56    ///
57    /// # Errors
58    /// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
59    ///
60    /// # Example
61    /// ```
62    /// use redis_driver::{resp::cmd, Client, Result};
63    ///
64    /// #[tokio::main]
65    /// async fn main() -> Result<()> {
66    ///     let mut client = Client::connect("127.0.0.1:6379").await?;
67    ///
68    ///     let values: Vec<String> = client
69    ///         .send(cmd("MGET").arg("key1").arg("key2").arg("key3").arg("key4"))
70    ///         .await?
71    ///         .into()?;
72    ///     println!("{:?}", values);
73    ///
74    ///     Ok(())
75    /// }
76    /// ```
77    pub async fn send(&mut self, command: Command) -> Result<Value> {
78        self.inner_client.send(command).await
79    }
80
81    /// Send command to the Redis server and forget its response.
82    ///
83    /// # Errors
84    /// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
85    pub fn send_and_forget(&mut self, command: Command) -> Result<()> {
86        self.inner_client.send_and_forget(command)
87    }
88
89    /// Send a command batch to the Redis server.
90    ///
91    /// # Errors
92    /// Any Redis driver [`Error`](crate::Error) that occurs during the send operation
93    pub async fn send_batch(&mut self, commands: Vec<Command>) -> Result<Value> {
94        self.inner_client.send_batch(commands).await
95    }
96
97    /// Create a new transaction
98    pub fn create_transaction(&mut self) -> Transaction {
99        Transaction::new(self.inner_client.clone())
100    }
101
102    /// Create a new pipeline
103    pub fn create_pipeline(&mut self) -> Pipeline {
104        self.inner_client.create_pipeline()
105    }
106}
107
108impl ClientTrait for Client {
109    fn create_pipeline(&mut self) -> Pipeline {
110        self.create_pipeline()
111    }
112
113    fn get_cache(&mut self) -> &mut crate::Cache {
114        self.inner_client.get_cache()
115    }
116}
117
118pub trait ClientPreparedCommand<'a, R>
119where
120    R: FromValue,
121{
122    /// Send command and forget its response
123    ///
124    /// # Errors
125    /// Any Redis driver [`Error`](crate::Error) that occur during the send operation
126    fn forget(self) -> Result<()>;
127}
128
129impl<'a, R> ClientPreparedCommand<'a, R> for PreparedCommand<'a, Client, R>
130where
131    R: FromValue + Send + 'a,
132{
133    /// Send command and forget its response
134    ///
135    /// # Errors
136    /// Any Redis driver [`Error`](crate::Error) that occur during the send operation
137    fn forget(self) -> Result<()> {
138        self.executor.send_and_forget(self.command)
139    }
140}
141
142impl<'a, R> IntoFuture for PreparedCommand<'a, Client, R>
143where
144    R: FromValue + Send + 'a,
145{
146    type Output = Result<R>;
147    type IntoFuture = Future<'a, R>;
148
149    fn into_future(self) -> Self::IntoFuture {
150        Box::pin(async move {
151            if self.keep_command_for_result {
152                let command_for_result = self.command.clone();
153                self.executor
154                    .send(self.command)
155                    .await?
156                    .into_with_command(&command_for_result)
157            } else if let Some(post_process) = self.post_process {
158                let command_for_result = self.command.clone();
159                let result = self.executor.send(self.command).await?;
160                post_process(result, command_for_result, self.executor).await
161            } else {
162                self.executor.send(self.command).await?.into()
163            }
164        })
165    }
166}
167
168impl BitmapCommands for Client {}
169#[cfg(feature = "redis-bloom")]
170impl BloomCommands for Client {}
171impl ClusterCommands for Client {}
172#[cfg(feature = "redis-bloom")]
173impl CountMinSketchCommands for Client {}
174#[cfg(feature = "redis-bloom")]
175impl CuckooCommands for Client {}
176impl ConnectionCommands for Client {}
177impl GenericCommands for Client {}
178impl GeoCommands for Client {}
179#[cfg(feature = "redis-graph")]
180impl GraphCommands for Client {}
181impl HashCommands for Client {}
182impl HyperLogLogCommands for Client {}
183impl InternalPubSubCommands for Client {}
184#[cfg(feature = "redis-json")]
185impl JsonCommands for Client {}
186impl ListCommands for Client {}
187impl ScriptingCommands for Client {}
188#[cfg(feature = "redis-search")]
189impl SearchCommands for Client {}
190impl SentinelCommands for Client {}
191impl ServerCommands for Client {}
192impl SetCommands for Client {}
193impl SortedSetCommands for Client {}
194impl StreamCommands for Client {}
195impl StringCommands for Client {}
196#[cfg(feature = "redis-bloom")]
197impl TDigestCommands for Client {}
198#[cfg(feature = "redis-time-series")]
199impl TimeSeriesCommands for Client {}
200impl TransactionCommands for Client {}
201#[cfg(feature = "redis-bloom")]
202impl TopKCommands for Client {}
203
204impl PubSubCommands for Client {
205    fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
206    where
207        C: Into<CommandArg> + Send + 'a,
208        CC: SingleArgOrCollection<C>,
209    {
210        self.inner_client.subscribe(channels)
211    }
212
213    fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
214    where
215        P: Into<CommandArg> + Send + 'a,
216        PP: SingleArgOrCollection<P>,
217    {
218        self.inner_client.psubscribe(patterns)
219    }
220
221    fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
222    where
223        C: Into<CommandArg> + Send + 'a,
224        CC: SingleArgOrCollection<C>,
225    {
226        self.inner_client.ssubscribe(shardchannels)
227    }
228}
229
230impl BlockingCommands for Client {
231    fn monitor(&mut self) -> Future<crate::MonitorStream> {
232        Box::pin(async move {
233            let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
234            let (monitor_sender, monitor_receiver): (MonitorSender, MonitorReceiver) =
235                mpsc::unbounded();
236
237            let message = Message::monitor(cmd("MONITOR"), value_sender, monitor_sender);
238
239            self.inner_client.send_message(message)?;
240
241            let value = value_receiver.await?;
242            value.map_into_result(|_| MonitorStream::new(monitor_receiver, self.clone()))
243        })
244    }
245}