1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::{sync::Arc, time::Duration};

use crate::{
    create_consumer, create_producer, RedisCluster, RedisConsumer, RedisConsumerOptions, RedisErr,
    RedisProducer, RedisProducerOptions, RedisResult, REDIS_PORT,
};
use sea_streamer_types::{
    ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri,
};

#[derive(Debug, Clone)]
/// The Redis Streamer, from which you can create Producers and Consumers.
pub struct RedisStreamer {
    uri: StreamerUri,
    options: Arc<RedisConnectOptions>,
}

#[derive(Debug, Default, Clone)]
/// Options for connections, including credentials.
pub struct RedisConnectOptions {
    db: u32,
    username: Option<String>,
    password: Option<String>,
    timeout: Option<Duration>,
    enable_cluster: bool,
    disable_hostname_verification: bool,
}

impl Streamer for RedisStreamer {
    type Error = RedisErr;
    type Producer = RedisProducer;
    type Consumer = RedisConsumer;
    type ConnectOptions = RedisConnectOptions;
    type ConsumerOptions = RedisConsumerOptions;
    type ProducerOptions = RedisProducerOptions;

    async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> RedisResult<Self> {
        if uri.protocol().is_none() {
            return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
        }
        let uri = StreamerUri::many(uri.into_nodes().map(|u| {
            // normalize uri
            format!(
                "{}://{}:{}",
                u.scheme(),
                u.host().expect("Should have host"),
                u.port().unwrap_or(REDIS_PORT)
            )
            .parse()
            .expect("Must not fail")
        }));
        let options = Arc::new(options);
        let mut cluster = RedisCluster::new(uri.clone(), options.clone())?;
        cluster.reconnect_all().await?;
        Ok(RedisStreamer { uri, options })
    }

    async fn disconnect(self) -> RedisResult<()> {
        Err(StreamErr::Backend(RedisErr::Unknown(
            "Not implemented".to_owned(),
        )))
    }

    async fn create_generic_producer(
        &self,
        options: Self::ProducerOptions,
    ) -> RedisResult<Self::Producer> {
        let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
        create_producer(cluster, options).await
    }

    async fn create_consumer(
        &self,
        streams: &[StreamKey],
        options: Self::ConsumerOptions,
    ) -> RedisResult<Self::Consumer> {
        let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
        create_consumer(cluster, options, streams.to_vec()).await
    }
}

impl ConnectOptions for RedisConnectOptions {
    type Error = RedisErr;

    fn timeout(&self) -> RedisResult<Duration> {
        self.timeout.ok_or(StreamErr::TimeoutNotSet)
    }

    /// Timeout for network requests. Defaults to [`crate::DEFAULT_TIMEOUT`].
    fn set_timeout(&mut self, v: Duration) -> RedisResult<&mut Self> {
        self.timeout = Some(v);
        Ok(self)
    }
}

impl RedisConnectOptions {
    /// Defaults to 0.
    pub fn db(&self) -> u32 {
        self.db
    }
    pub fn set_db(&mut self, db: u32) -> &mut Self {
        self.db = db;
        self
    }

    pub fn username(&self) -> Option<&str> {
        self.username.as_deref()
    }
    pub fn set_username(&mut self, username: Option<String>) -> &mut Self {
        self.username = username;
        self
    }

    pub fn password(&self) -> Option<&str> {
        self.password.as_deref()
    }
    pub fn set_password(&mut self, password: Option<String>) -> &mut Self {
        self.password = password;
        self
    }

    pub fn enable_cluster(&self) -> bool {
        self.enable_cluster
    }
    /// Enable support for Redis Cluster.
    pub fn set_enable_cluster(&mut self, bool: bool) -> &mut Self {
        self.enable_cluster = bool;
        self
    }

    pub fn disable_hostname_verification(&self) -> bool {
        self.disable_hostname_verification
    }
    /// # Warning
    ///
    /// Only relevant if TLS is enabled and connecting to `rediss://`.
    /// Trust self-signed certificates. This is insecure. Do not use in production.
    pub fn set_disable_hostname_verification(&mut self, bool: bool) -> &mut Self {
        self.disable_hostname_verification = bool;
        self
    }
}