redis_driver/clients/
inner_client.rs

1use crate::{
2    network::{PubSubReceiver, PubSubSender},
3    resp::{cmd, CommandArg, Command, FromValue, ResultValueExt, SingleArgOrCollection, Value},
4    ClientPreparedCommand, Future, InternalPubSubCommands, IntoConfig, Message, MsgSender,
5    NetworkHandler, PreparedCommand, PubSubStream, Result, ValueReceiver, ValueSender, Pipeline, Cache,
6};
7use futures::channel::{mpsc, oneshot};
8use std::{
9    future::IntoFuture,
10    sync::Arc,
11};
12
13pub(crate) struct InnerClient {
14    msg_sender: Arc<MsgSender>,
15    cache: Cache,
16}
17
18impl Clone for InnerClient {
19    fn clone(&self) -> Self {
20        Self {
21            msg_sender: self.msg_sender.clone(),
22            cache: Cache::new(),
23        }
24    }
25}
26
27impl InnerClient {
28    /// Connects asynchronously to the Redis server.
29    ///
30    /// # Errors
31    /// Any Redis driver [`Error`](crate::Error) that occurs during the connection operation
32    pub async fn connect(config: impl IntoConfig) -> Result<Self> {
33        let msg_sender = NetworkHandler::connect(config.into_config()?).await?;
34
35        Ok(Self {
36            msg_sender: Arc::new(msg_sender),
37            cache: Cache::new(),
38        })
39    }
40
41    pub fn get_cache(&mut self) -> &mut Cache {
42        &mut self.cache
43    }
44
45    pub async fn send(&mut self, command: Command) -> Result<Value> {
46        let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
47        let message = Message::single(command, value_sender);
48        self.send_message(message)?;
49        let value = value_receiver.await?;
50        value.into_result()
51    }
52
53    pub fn send_and_forget(&mut self, command: Command) -> Result<()> {
54        let message = Message::single_forget(command);
55        self.send_message(message)?;
56        Ok(())
57    }
58
59    pub async fn send_batch(&mut self, commands: Vec<Command>) -> Result<Value> {
60        let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
61        let message = Message::batch(commands, value_sender);
62        self.send_message(message)?;
63        let value = value_receiver.await?;
64        value.into_result()
65    }
66
67    pub fn send_message(&mut self, message: Message) -> Result<()> {
68        self.msg_sender.unbounded_send(message)?;
69        Ok(())
70    }
71
72    pub fn create_pipeline(&mut self) -> Pipeline {
73        Pipeline::new(self.clone())
74    }
75
76    pub fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
77    where
78        C: Into<CommandArg> + Send + 'a,
79        CC: SingleArgOrCollection<C>,
80    {
81        let channels: Vec<String> = channels.into_iter().map(|c| c.into().to_string()).collect();
82
83        Box::pin(async move {
84            let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
85            let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) =
86                mpsc::unbounded();
87
88            let pub_sub_senders = channels
89                .iter()
90                .map(|c| (c.as_bytes().to_vec(), pub_sub_sender.clone()))
91                .collect::<Vec<_>>();
92
93            let message = Message::pub_sub(
94                cmd("SUBSCRIBE").arg(channels.clone()),
95                value_sender,
96                pub_sub_senders,
97            );
98
99            self.send_message(message)?;
100
101            let value = value_receiver.await?;
102            value.map_into_result(|_| {
103                PubSubStream::from_channels(channels, pub_sub_receiver, self.clone())
104            })
105        })
106    }
107
108    pub fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
109    where
110        P: Into<CommandArg> + Send + 'a,
111        PP: SingleArgOrCollection<P>,
112    {
113        let patterns: Vec<String> = patterns.into_iter().map(|p| p.into().to_string()).collect();
114
115        Box::pin(async move {
116            let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
117            let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) =
118                mpsc::unbounded();
119
120            let pub_sub_senders = patterns
121                .iter()
122                .map(|c| (c.as_bytes().to_vec(), pub_sub_sender.clone()))
123                .collect::<Vec<_>>();
124
125            let message = Message::pub_sub(
126                cmd("PSUBSCRIBE").arg(patterns.clone()),
127                value_sender,
128                pub_sub_senders,
129            );
130
131            self.send_message(message)?;
132
133            let value = value_receiver.await?;
134            value.map_into_result(|_| {
135                PubSubStream::from_patterns(patterns, pub_sub_receiver, self.clone())
136            })
137        })
138    }
139
140    pub fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
141    where
142        C: Into<CommandArg> + Send + 'a,
143        CC: SingleArgOrCollection<C>,
144    {
145        let shardchannels: Vec<String> = shardchannels
146            .into_iter()
147            .map(|c| c.into().to_string())
148            .collect();
149
150        Box::pin(async move {
151            let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
152            let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) =
153                mpsc::unbounded();
154
155            let pub_sub_senders = shardchannels
156                .iter()
157                .map(|c| (c.as_bytes().to_vec(), pub_sub_sender.clone()))
158                .collect::<Vec<_>>();
159
160            let message = Message::pub_sub(
161                cmd("SSUBSCRIBE").arg(shardchannels.clone()),
162                value_sender,
163                pub_sub_senders,
164            );
165
166            self.send_message(message)?;
167
168            let value = value_receiver.await?;
169            value.map_into_result(|_| {
170                PubSubStream::from_shardchannels(shardchannels, pub_sub_receiver, self.clone())
171            })
172        })
173    }
174}
175
176impl<'a, R> ClientPreparedCommand<'a, R> for PreparedCommand<'a, InnerClient, R>
177where
178    R: FromValue + Send + 'a,
179{
180    /// Send command and forget its response
181    ///
182    /// # Errors
183    /// Any Redis driver [`Error`](crate::Error) that occur during the send operation
184    fn forget(self) -> Result<()> {
185        self.executor.send_and_forget(self.command)
186    }
187}
188
189impl<'a, R> IntoFuture for PreparedCommand<'a, InnerClient, R>
190where
191    R: FromValue + Send + 'a,
192{
193    type Output = Result<R>;
194    type IntoFuture = Future<'a, R>;
195
196    fn into_future(self) -> Self::IntoFuture {
197        Box::pin(async move { self.executor.send(self.command).await?.into() })
198    }
199}
200
201impl InternalPubSubCommands for InnerClient {}