sea_streamer_redis/
streamer.rs

1use std::{sync::Arc, time::Duration};
2
3use crate::{
4    create_consumer, create_producer, RedisCluster, RedisConsumer, RedisConsumerOptions, RedisErr,
5    RedisProducer, RedisProducerOptions, RedisResult, MSG, REDIS_PORT,
6};
7use sea_streamer_types::{
8    ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri,
9};
10
11#[derive(Debug, Clone)]
12/// The Redis Streamer, from which you can create Producers and Consumers.
13pub struct RedisStreamer {
14    uri: StreamerUri,
15    options: Arc<RedisConnectOptions>,
16}
17
18#[derive(Debug, Default, Clone)]
19/// Options for connections, including credentials.
20pub struct RedisConnectOptions {
21    db: u32,
22    username: Option<String>,
23    password: Option<String>,
24    timeout: Option<Duration>,
25    enable_cluster: bool,
26    disable_hostname_verification: bool,
27    timestamp_format: TimestampFormat,
28    message_field: MessageField,
29}
30
31#[derive(Debug, Copy, Clone)]
32pub(crate) struct MessageField(pub &'static str);
33
34impl Default for MessageField {
35    fn default() -> Self {
36        Self(MSG)
37    }
38}
39
40#[derive(Debug, Default, Clone, Copy)]
41pub enum TimestampFormat {
42    #[default]
43    UnixTimestampMillis,
44    #[cfg(feature = "nanosecond-timestamp")]
45    UnixTimestampNanos,
46}
47
48impl Streamer for RedisStreamer {
49    type Error = RedisErr;
50    type Producer = RedisProducer;
51    type Consumer = RedisConsumer;
52    type ConnectOptions = RedisConnectOptions;
53    type ConsumerOptions = RedisConsumerOptions;
54    type ProducerOptions = RedisProducerOptions;
55
56    async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> RedisResult<Self> {
57        if uri.protocol().is_none() {
58            return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
59        }
60        let uri = StreamerUri::many(uri.into_nodes().map(|u| {
61            // normalize uri
62            format!(
63                "{}://{}:{}",
64                u.scheme(),
65                u.host().expect("Should have host"),
66                u.port().unwrap_or(REDIS_PORT)
67            )
68            .parse()
69            .expect("Must not fail")
70        }));
71        let options = Arc::new(options);
72        let mut cluster = RedisCluster::new(uri.clone(), options.clone())?;
73        cluster.reconnect_all().await?;
74        Ok(RedisStreamer { uri, options })
75    }
76
77    async fn disconnect(self) -> RedisResult<()> {
78        Err(StreamErr::Backend(RedisErr::Unknown(
79            "Not implemented".to_owned(),
80        )))
81    }
82
83    async fn create_generic_producer(
84        &self,
85        mut options: Self::ProducerOptions,
86    ) -> RedisResult<Self::Producer> {
87        options.timestamp_format = self.options.timestamp_format;
88        options.message_field = self.options.message_field;
89        let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
90        create_producer(cluster, options).await
91    }
92
93    async fn create_consumer(
94        &self,
95        streams: &[StreamKey],
96        mut options: Self::ConsumerOptions,
97    ) -> RedisResult<Self::Consumer> {
98        options.timestamp_format = self.options.timestamp_format;
99        options.message_field = self.options.message_field;
100        let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
101        create_consumer(cluster, options, streams.to_vec()).await
102    }
103}
104
105impl ConnectOptions for RedisConnectOptions {
106    type Error = RedisErr;
107
108    fn timeout(&self) -> RedisResult<Duration> {
109        self.timeout.ok_or(StreamErr::TimeoutNotSet)
110    }
111
112    /// Timeout for network requests. Defaults to [`crate::DEFAULT_TIMEOUT`].
113    fn set_timeout(&mut self, v: Duration) -> RedisResult<&mut Self> {
114        self.timeout = Some(v);
115        Ok(self)
116    }
117}
118
119impl RedisConnectOptions {
120    /// Defaults to 0.
121    pub fn db(&self) -> u32 {
122        self.db
123    }
124    pub fn set_db(&mut self, db: u32) -> &mut Self {
125        self.db = db;
126        self
127    }
128
129    pub fn username(&self) -> Option<&str> {
130        self.username.as_deref()
131    }
132    pub fn set_username(&mut self, username: Option<String>) -> &mut Self {
133        self.username = username;
134        self
135    }
136
137    pub fn password(&self) -> Option<&str> {
138        self.password.as_deref()
139    }
140    pub fn set_password(&mut self, password: Option<String>) -> &mut Self {
141        self.password = password;
142        self
143    }
144
145    pub fn enable_cluster(&self) -> bool {
146        self.enable_cluster
147    }
148    /// Enable support for Redis Cluster.
149    pub fn set_enable_cluster(&mut self, bool: bool) -> &mut Self {
150        self.enable_cluster = bool;
151        self
152    }
153
154    pub fn disable_hostname_verification(&self) -> bool {
155        self.disable_hostname_verification
156    }
157    /// # Warning
158    ///
159    /// Only relevant if TLS is enabled and connecting to `rediss://`.
160    /// Trust self-signed certificates. This is insecure. Do not use in production.
161    pub fn set_disable_hostname_verification(&mut self, bool: bool) -> &mut Self {
162        self.disable_hostname_verification = bool;
163        self
164    }
165
166    pub fn timestamp_format(&self) -> TimestampFormat {
167        self.timestamp_format
168    }
169    /// Set timestamp format. i.e. the default timestamp format is milliseconds,
170    /// which is recommended by Redis.
171    pub fn set_timestamp_format(&mut self, fmt: TimestampFormat) -> &mut Self {
172        self.timestamp_format = fmt;
173        self
174    }
175
176    pub fn message_field(&self) -> &'static str {
177        self.message_field.0
178    }
179    /// The field used to hold the message. Defaults to [`crate::MSG`].
180    pub fn set_message_field(&mut self, field: &'static str) -> &mut Self {
181        self.message_field = MessageField(field);
182        self
183    }
184}