sea_streamer_redis/
streamer.rs1use std::{sync::Arc, time::Duration};
2
3use crate::{
4 create_consumer, create_producer, RedisCluster, RedisConsumer, RedisConsumerOptions, RedisErr,
5 RedisProducer, RedisProducerOptions, RedisResult, MSG, REDIS_PORT,
6};
7use sea_streamer_types::{
8 ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri,
9};
10
11#[derive(Debug, Clone)]
12pub struct RedisStreamer {
14 uri: StreamerUri,
15 options: Arc<RedisConnectOptions>,
16}
17
18#[derive(Debug, Default, Clone)]
19pub struct RedisConnectOptions {
21 db: u32,
22 username: Option<String>,
23 password: Option<String>,
24 timeout: Option<Duration>,
25 enable_cluster: bool,
26 disable_hostname_verification: bool,
27 timestamp_format: TimestampFormat,
28 message_field: MessageField,
29}
30
31#[derive(Debug, Copy, Clone)]
32pub(crate) struct MessageField(pub &'static str);
33
34impl Default for MessageField {
35 fn default() -> Self {
36 Self(MSG)
37 }
38}
39
40#[derive(Debug, Default, Clone, Copy)]
41pub enum TimestampFormat {
42 #[default]
43 UnixTimestampMillis,
44 #[cfg(feature = "nanosecond-timestamp")]
45 UnixTimestampNanos,
46}
47
48impl Streamer for RedisStreamer {
49 type Error = RedisErr;
50 type Producer = RedisProducer;
51 type Consumer = RedisConsumer;
52 type ConnectOptions = RedisConnectOptions;
53 type ConsumerOptions = RedisConsumerOptions;
54 type ProducerOptions = RedisProducerOptions;
55
56 async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> RedisResult<Self> {
57 if uri.protocol().is_none() {
58 return Err(StreamErr::StreamUrlErr(StreamUrlErr::ProtocolRequired));
59 }
60 let uri = StreamerUri::many(uri.into_nodes().map(|u| {
61 format!(
63 "{}://{}:{}",
64 u.scheme(),
65 u.host().expect("Should have host"),
66 u.port().unwrap_or(REDIS_PORT)
67 )
68 .parse()
69 .expect("Must not fail")
70 }));
71 let options = Arc::new(options);
72 let mut cluster = RedisCluster::new(uri.clone(), options.clone())?;
73 cluster.reconnect_all().await?;
74 Ok(RedisStreamer { uri, options })
75 }
76
77 async fn disconnect(self) -> RedisResult<()> {
78 Err(StreamErr::Backend(RedisErr::Unknown(
79 "Not implemented".to_owned(),
80 )))
81 }
82
83 async fn create_generic_producer(
84 &self,
85 mut options: Self::ProducerOptions,
86 ) -> RedisResult<Self::Producer> {
87 options.timestamp_format = self.options.timestamp_format;
88 options.message_field = self.options.message_field;
89 let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
90 create_producer(cluster, options).await
91 }
92
93 async fn create_consumer(
94 &self,
95 streams: &[StreamKey],
96 mut options: Self::ConsumerOptions,
97 ) -> RedisResult<Self::Consumer> {
98 options.timestamp_format = self.options.timestamp_format;
99 options.message_field = self.options.message_field;
100 let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
101 create_consumer(cluster, options, streams.to_vec()).await
102 }
103}
104
105impl ConnectOptions for RedisConnectOptions {
106 type Error = RedisErr;
107
108 fn timeout(&self) -> RedisResult<Duration> {
109 self.timeout.ok_or(StreamErr::TimeoutNotSet)
110 }
111
112 fn set_timeout(&mut self, v: Duration) -> RedisResult<&mut Self> {
114 self.timeout = Some(v);
115 Ok(self)
116 }
117}
118
119impl RedisConnectOptions {
120 pub fn db(&self) -> u32 {
122 self.db
123 }
124 pub fn set_db(&mut self, db: u32) -> &mut Self {
125 self.db = db;
126 self
127 }
128
129 pub fn username(&self) -> Option<&str> {
130 self.username.as_deref()
131 }
132 pub fn set_username(&mut self, username: Option<String>) -> &mut Self {
133 self.username = username;
134 self
135 }
136
137 pub fn password(&self) -> Option<&str> {
138 self.password.as_deref()
139 }
140 pub fn set_password(&mut self, password: Option<String>) -> &mut Self {
141 self.password = password;
142 self
143 }
144
145 pub fn enable_cluster(&self) -> bool {
146 self.enable_cluster
147 }
148 pub fn set_enable_cluster(&mut self, bool: bool) -> &mut Self {
150 self.enable_cluster = bool;
151 self
152 }
153
154 pub fn disable_hostname_verification(&self) -> bool {
155 self.disable_hostname_verification
156 }
157 pub fn set_disable_hostname_verification(&mut self, bool: bool) -> &mut Self {
162 self.disable_hostname_verification = bool;
163 self
164 }
165
166 pub fn timestamp_format(&self) -> TimestampFormat {
167 self.timestamp_format
168 }
169 pub fn set_timestamp_format(&mut self, fmt: TimestampFormat) -> &mut Self {
172 self.timestamp_format = fmt;
173 self
174 }
175
176 pub fn message_field(&self) -> &'static str {
177 self.message_field.0
178 }
179 pub fn set_message_field(&mut self, field: &'static str) -> &mut Self {
181 self.message_field = MessageField(field);
182 self
183 }
184}