1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use crate::{
    cmd,
    resp::{BulkString, Value, ResultValueExt},
    Command, Database, Message, PubSub, PubSubReceiver, PubSubSender, PubSubStream,
    Result, ServerEndPoint, ValueReceiver, ValueSender,
};
use futures::channel::{mpsc, oneshot};

#[derive(Clone)]
pub struct ConnectionMultiplexer {
    server_end_point: ServerEndPoint,
}

impl ConnectionMultiplexer {
    pub async fn connect(addr: impl Into<String>) -> Result<ConnectionMultiplexer> {
        let server_end_point = ServerEndPoint::connect(addr).await?;

        println!("Connected to {}", server_end_point.get_addr());
        Ok(ConnectionMultiplexer { server_end_point })
    }

    pub fn get_database(&self, db: usize) -> Database {
        Database::new(self.clone(), db)
    }

    pub fn get_default_database(&self) -> Database {
        Database::new(self.clone(), 0)
    }

    pub fn get_pub_sub(&self) -> PubSub {
        PubSub::new(self.clone())
    }

    pub(crate) async fn send(&self, database: usize, command: Command) -> Result<Value> {
        let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();

        let message = Message::new(command)
            .database(database)
            .value_sender(value_sender);

        self.server_end_point.send(message)?;

        let value = value_receiver.await?;
        value.into_result()
    }

    pub(crate) async fn subscribe(&self, channel: BulkString) -> Result<PubSubStream> {
        let (value_sender, value_receiver): (ValueSender, ValueReceiver) = oneshot::channel();
        let (pub_sub_sender, pub_sub_receiver): (PubSubSender, PubSubReceiver) = mpsc::unbounded();

        let channel_name = channel.to_string();
        let message = Message::new(cmd("SUBSCRIBE").arg(channel))
            .value_sender(value_sender)
            .pub_sub_sender(pub_sub_sender);

        self.server_end_point.send(message)?;

        let value = value_receiver.await?;
        value.map_into_result(|_| PubSubStream::new(channel_name, pub_sub_receiver, self.clone()))
    }

    pub(crate) fn unsubscribe(&self, channel: BulkString) -> Result<()> {
        let message = Message::new(cmd("UNSUBSCRIBE").arg(channel));
        self.server_end_point.send(message)?;
        Ok(())
    }
}