1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use crate::{
cmd,
resp::{BulkString, Value, ResultValueExt},
Command, Database, Message, PubSub, PubSubReceiver, PubSubSender, PubSubStream,
Result, ServerEndPoint, ValueReceiver, ValueSender,
};
use futures::channel::{mpsc, oneshot};
#[derive(Clone)]
pub struct ConnectionMultiplexer {
server_end_point: ServerEndPoint,
}
impl ConnectionMultiplexer {
pub async fn connect(addr: impl Into<String>) -> Result<ConnectionMultiplexer> {
let server_end_point = ServerEndPoint::connect(addr).await?;
println!("Connected to {}", server_end_point.get_addr());
Ok(ConnectionMultiplexer { server_end_point })
}
pub fn get_database(&self, db: usize) -> Database {
Database::new(self.clone(), db)
}
pub fn get_default_database(&self) -> Database {
Database::new(self.clone(), 0)
}
pub fn get_pub_sub(&self) -> PubSub {
PubSub::new(self.clone())
}
pub(crate) async fn send(&self, database: usize, command: Command) -> Result<Value> {
let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
let message = Message::new(command)
.database(database)
.value_sender(value_sender);
self.server_end_point.send(message)?;
let value = value_receiver.await?;
value.into_result()
}
pub(crate) async fn subscribe(&self, channel: BulkString) -> Result<PubSubStream> {
let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) = mpsc::unbounded();
let channel_name = channel.to_string();
let message = Message::new(cmd("SUBSCRIBE").arg(channel))
.value_sender(value_sender)
.pub_sub_sender(pub_sub_sender);
self.server_end_point.send(message)?;
let value = value_receiver.await?;
value.map_into_result(|_| PubSubStream::new(channel_name, pub_sub_receiver, self.clone()))
}
pub(crate) fn unsubscribe(&self, channel: BulkString) -> Result<()> {
let message = Message::new(cmd("UNSUBSCRIBE").arg(channel));
self.server_end_point.send(message)?;
Ok(())
}
}