1use crate::{
2 network::{PubSubReceiver, PubSubSender},
3 resp::{cmd, CommandArg, Command, FromValue, ResultValueExt, SingleArgOrCollection, Value},
4 ClientPreparedCommand, Future, InternalPubSubCommands, IntoConfig, Message, MsgSender,
5 NetworkHandler, PreparedCommand, PubSubStream, Result, ValueReceiver, ValueSender, Pipeline, Cache,
6};
7use futures::channel::{mpsc, oneshot};
8use std::{
9 future::IntoFuture,
10 sync::Arc,
11};
12
13pub(crate) struct InnerClient {
14 msg_sender: Arc<MsgSender>,
15 cache: Cache,
16}
17
18impl Clone for InnerClient {
19 fn clone(&self) -> Self {
20 Self {
21 msg_sender: self.msg_sender.clone(),
22 cache: Cache::new(),
23 }
24 }
25}
26
27impl InnerClient {
28 pub async fn connect(config: impl IntoConfig) -> Result<Self> {
33 let msg_sender = NetworkHandler::connect(config.into_config()?).await?;
34
35 Ok(Self {
36 msg_sender: Arc::new(msg_sender),
37 cache: Cache::new(),
38 })
39 }
40
41 pub fn get_cache(&mut self) -> &mut Cache {
42 &mut self.cache
43 }
44
45 pub async fn send(&mut self, command: Command) -> Result<Value> {
46 let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
47 let message = Message::single(command, value_sender);
48 self.send_message(message)?;
49 let value = value_receiver.await?;
50 value.into_result()
51 }
52
53 pub fn send_and_forget(&mut self, command: Command) -> Result<()> {
54 let message = Message::single_forget(command);
55 self.send_message(message)?;
56 Ok(())
57 }
58
59 pub async fn send_batch(&mut self, commands: Vec<Command>) -> Result<Value> {
60 let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
61 let message = Message::batch(commands, value_sender);
62 self.send_message(message)?;
63 let value = value_receiver.await?;
64 value.into_result()
65 }
66
67 pub fn send_message(&mut self, message: Message) -> Result<()> {
68 self.msg_sender.unbounded_send(message)?;
69 Ok(())
70 }
71
72 pub fn create_pipeline(&mut self) -> Pipeline {
73 Pipeline::new(self.clone())
74 }
75
76 pub fn subscribe<'a, C, CC>(&'a mut self, channels: CC) -> Future<'a, PubSubStream>
77 where
78 C: Into<CommandArg> + Send + 'a,
79 CC: SingleArgOrCollection<C>,
80 {
81 let channels: Vec<String> = channels.into_iter().map(|c| c.into().to_string()).collect();
82
83 Box::pin(async move {
84 let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
85 let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) =
86 mpsc::unbounded();
87
88 let pub_sub_senders = channels
89 .iter()
90 .map(|c| (c.as_bytes().to_vec(), pub_sub_sender.clone()))
91 .collect::<Vec<_>>();
92
93 let message = Message::pub_sub(
94 cmd("SUBSCRIBE").arg(channels.clone()),
95 value_sender,
96 pub_sub_senders,
97 );
98
99 self.send_message(message)?;
100
101 let value = value_receiver.await?;
102 value.map_into_result(|_| {
103 PubSubStream::from_channels(channels, pub_sub_receiver, self.clone())
104 })
105 })
106 }
107
108 pub fn psubscribe<'a, P, PP>(&'a mut self, patterns: PP) -> Future<'a, PubSubStream>
109 where
110 P: Into<CommandArg> + Send + 'a,
111 PP: SingleArgOrCollection<P>,
112 {
113 let patterns: Vec<String> = patterns.into_iter().map(|p| p.into().to_string()).collect();
114
115 Box::pin(async move {
116 let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
117 let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) =
118 mpsc::unbounded();
119
120 let pub_sub_senders = patterns
121 .iter()
122 .map(|c| (c.as_bytes().to_vec(), pub_sub_sender.clone()))
123 .collect::<Vec<_>>();
124
125 let message = Message::pub_sub(
126 cmd("PSUBSCRIBE").arg(patterns.clone()),
127 value_sender,
128 pub_sub_senders,
129 );
130
131 self.send_message(message)?;
132
133 let value = value_receiver.await?;
134 value.map_into_result(|_| {
135 PubSubStream::from_patterns(patterns, pub_sub_receiver, self.clone())
136 })
137 })
138 }
139
140 pub fn ssubscribe<'a, C, CC>(&'a mut self, shardchannels: CC) -> Future<'a, PubSubStream>
141 where
142 C: Into<CommandArg> + Send + 'a,
143 CC: SingleArgOrCollection<C>,
144 {
145 let shardchannels: Vec<String> = shardchannels
146 .into_iter()
147 .map(|c| c.into().to_string())
148 .collect();
149
150 Box::pin(async move {
151 let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
152 let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) =
153 mpsc::unbounded();
154
155 let pub_sub_senders = shardchannels
156 .iter()
157 .map(|c| (c.as_bytes().to_vec(), pub_sub_sender.clone()))
158 .collect::<Vec<_>>();
159
160 let message = Message::pub_sub(
161 cmd("SSUBSCRIBE").arg(shardchannels.clone()),
162 value_sender,
163 pub_sub_senders,
164 );
165
166 self.send_message(message)?;
167
168 let value = value_receiver.await?;
169 value.map_into_result(|_| {
170 PubSubStream::from_shardchannels(shardchannels, pub_sub_receiver, self.clone())
171 })
172 })
173 }
174}
175
176impl<'a, R> ClientPreparedCommand<'a, R> for PreparedCommand<'a, InnerClient, R>
177where
178 R: FromValue + Send + 'a,
179{
180 fn forget(self) -> Result<()> {
185 self.executor.send_and_forget(self.command)
186 }
187}
188
189impl<'a, R> IntoFuture for PreparedCommand<'a, InnerClient, R>
190where
191 R: FromValue + Send + 'a,
192{
193 type Output = Result<R>;
194 type IntoFuture = Future<'a, R>;
195
196 fn into_future(self) -> Self::IntoFuture {
197 Box::pin(async move { self.executor.send(self.command).await?.into() })
198 }
199}
200
201impl InternalPubSubCommands for InnerClient {}