1#[cfg(feature = "redis-graph")]
2use crate::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::TimeSeriesCommands;
9use crate::{
10 network::{MonitorReceiver, MonitorSender},
11 resp::{cmd, Command, CommandArg, FromValue, ResultValueExt, SingleArgOrCollection, Value},
12 BitmapCommands, BlockingCommands, ClientTrait, ClusterCommands, ConnectionCommands, Future,
13 GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands, InnerClient,
14 InternalPubSubCommands, IntoConfig, ListCommands, Message, MonitorStream, Pipeline,
15 PreparedCommand, PubSubCommands, PubSubStream, Result, ScriptingCommands, SentinelCommands,
16 ServerCommands, SetCommands, SortedSetCommands, StreamCommands, StringCommands, Transaction,
17 TransactionCommands, ValueReceiver, ValueSender,
18};
19#[cfg(feature = "redis-bloom")]
20use crate::{BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands};
21use futures::channel::{mpsc, oneshot};
22use std::future::IntoFuture;
23
24pub struct Client {
26 inner_client: InnerClient,
27}
28
29impl Client {
30 pub async fn connect(config: impl IntoConfig) -> Result<Self> {
35 let inner_client = InnerClient::connect(config).await?;
36 Ok(Self { inner_client })
37 }
38
39 pub(crate) fn clone(&self) -> Client {
43 Client {
44 inner_client: self.inner_client.clone(),
45 }
46 }
47
48 pub async fn send(&mut self, command: Command) -> Result<Value> {
78 self.inner_client.send(command).await
79 }
80
81 pub fn send_and_forget(&mut self, command: Command) -> Result<()> {
86 self.inner_client.send_and_forget(command)
87 }
88
89 pub async fn send_batch(&mut self, commands: Vec<Command>) -> Result<Value> {
94 self.inner_client.send_batch(commands).await
95 }
96
97 pub fn create_transaction(&mut self) -> Transaction {
99 Transaction::new(self.inner_client.clone())
100 }
101
102 pub fn create_pipeline(&mut self) -> Pipeline {
104 self.inner_client.create_pipeline()
105 }
106}
107
108impl ClientTrait for Client {
109 fn create_pipeline(&mut self) -> Pipeline {
110 self.create_pipeline()
111 }
112
113 fn get_cache(&mut self) -> &mut crate::Cache {
114 self.inner_client.get_cache()
115 }
116}
117
118pub trait ClientPreparedCommand<'a, R>
119where
120 R: FromValue,
121{
122 fn forget(self) -> Result<()>;
127}
128
129impl<'a, R> ClientPreparedCommand<'a, R> for PreparedCommand<'a, Client, R>
130where
131 R: FromValue + Send + 'a,
132{
133 fn forget(self) -> Result<()> {
138 self.executor.send_and_forget(self.command)
139 }
140}
141
142impl<'a, R> IntoFuture for PreparedCommand<'a, Client, R>
143where
144 R: FromValue + Send + 'a,
145{
146 type Output = Result<R>;
147 type IntoFuture = Future<'a, R>;
148
149 fn into_future(self) -> Self::IntoFuture {
150 Box::pin(async move {
151 if self.keep_command_for_result {
152 let command_for_result = self.command.clone();
153 self.executor
154 .send(self.command)
155 .await?
156 .into_with_command(&command_for_result)
157 } else if let Some(post_process) = self.post_process {
158 let command_for_result = self.command.clone();
159 let result = self.executor.send(self.command).await?;
160 post_process(result, command_for_result, self.executor).await
161 } else {
162 self.executor.send(self.command).await?.into()
163 }
164 })
165 }
166}
167
168impl BitmapCommands for Client {}
169#[cfg(feature = "redis-bloom")]
170impl BloomCommands for Client {}
171impl ClusterCommands for Client {}
172#[cfg(feature = "redis-bloom")]
173impl CountMinSketchCommands for Client {}
174#[cfg(feature = "redis-bloom")]
175impl CuckooCommands for Client {}
176impl ConnectionCommands for Client {}
177impl GenericCommands for Client {}
178impl GeoCommands for Client {}
179#[cfg(feature = "redis-graph")]
180impl GraphCommands for Client {}
181impl HashCommands for Client {}
182impl HyperLogLogCommands for Client {}
183impl InternalPubSubCommands for Client {}
184#[cfg(feature = "redis-json")]
185impl JsonCommands for Client {}
186impl ListCommands for Client {}
187impl ScriptingCommands for Client {}
188#[cfg(feature = "redis-search")]
189impl SearchCommands for Client {}
190impl SentinelCommands for Client {}
191impl ServerCommands for Client {}
192impl SetCommands for Client {}
193impl SortedSetCommands for Client {}
194impl StreamCommands for Client {}
195impl StringCommands for Client {}
196#[cfg(feature = "redis-bloom")]
197impl TDigestCommands for Client {}
198#[cfg(feature = "redis-time-series")]
199impl TimeSeriesCommands for Client {}
200impl TransactionCommands for Client {}
201#[cfg(feature = "redis-bloom")]
202impl TopKCommands for Client {}
203
204impl PubSubCommands for Client {
205 fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
206 where
207 C: Into<CommandArg> + Send + 'a,
208 CC: SingleArgOrCollection<C>,
209 {
210 self.inner_client.subscribe(channels)
211 }
212
213 fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
214 where
215 P: Into<CommandArg> + Send + 'a,
216 PP: SingleArgOrCollection<P>,
217 {
218 self.inner_client.psubscribe(patterns)
219 }
220
221 fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
222 where
223 C: Into<CommandArg> + Send + 'a,
224 CC: SingleArgOrCollection<C>,
225 {
226 self.inner_client.ssubscribe(shardchannels)
227 }
228}
229
230impl BlockingCommands for Client {
231 fn monitor(&mut self) -> Future<crate::MonitorStream> {
232 Box::pin(async move {
233 let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
234 let (monitor_sender, monitor_receiver): (MonitorSender, MonitorReceiver) =
235 mpsc::unbounded();
236
237 let message = Message::monitor(cmd("MONITOR"), value_sender, monitor_sender);
238
239 self.inner_client.send_message(message)?;
240
241 let value = value_receiver.await?;
242 value.map_into_result(|_| MonitorStream::new(monitor_receiver, self.clone()))
243 })
244 }
245}