sea_streamer_types/
streamer.rs

1use std::{fmt::Display, str::FromStr};
2
3use crate::{
4    ConnectOptions, Consumer, ConsumerOptions, Producer, ProducerOptions, StreamKey, StreamResult,
5    StreamUrlErr,
6};
7use futures::{Future, FutureExt};
8use url::Url;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11/// URI of Streaming Server. If this is a cluster, there can be multiple nodes.
12///
13/// Examples:
14///
15/// ```ignore
16/// stdio://
17/// redis://localhost
18/// kafka://node-a:1234,node-b:1234
19/// file://./path/to/stream
20/// ```
21pub struct StreamerUri {
22    nodes: Vec<Url>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26/// Streamer URI with stream key(s).
27///
28/// Examples:
29///
30/// ```ignore
31/// stdio:///stream_a,stream_b
32/// redis://localhost/stream_a,stream_b
33/// kafka://node-a:1234,node-b:1234/stream_a,stream_b
34/// file://./path/to/stream/stream_a,stream_b
35/// ```
36pub struct StreamUrl {
37    streamer: StreamerUri,
38    streams: Vec<StreamKey>,
39}
40
41/// Common interface of streamer clients.
42pub trait Streamer: Sized {
43    type Error: std::error::Error;
44    type Producer: Producer<Error = Self::Error>;
45    type Consumer: Consumer<Error = Self::Error>;
46    type ConnectOptions: ConnectOptions;
47    type ConsumerOptions: ConsumerOptions;
48    type ProducerOptions: ProducerOptions;
49
50    /// Establish a connection to the streaming server.
51    fn connect(
52        streamer: StreamerUri,
53        options: Self::ConnectOptions,
54    ) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send;
55
56    /// Flush and disconnect from the streaming server.
57    fn disconnect(self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
58
59    /// Create a producer that can stream to any stream key.
60    fn create_generic_producer(
61        &self,
62        options: Self::ProducerOptions,
63    ) -> impl Future<Output = StreamResult<Self::Producer, Self::Error>> + Send;
64
65    /// Create a producer that streams to the specified stream.
66    fn create_producer(
67        &self,
68        stream: StreamKey,
69        options: Self::ProducerOptions,
70    ) -> impl Future<Output = StreamResult<Self::Producer, Self::Error>> + Send {
71        self.create_generic_producer(options).map(|res| {
72            res.and_then(|mut producer| {
73                producer.anchor(stream)?;
74
75                Ok(producer)
76            })
77        })
78    }
79
80    /// Create a consumer subscribing to the specified streams.
81    fn create_consumer(
82        &self,
83        streams: &[StreamKey],
84        options: Self::ConsumerOptions,
85    ) -> impl Future<Output = StreamResult<Self::Consumer, Self::Error>> + Send;
86}
87
88impl Display for StreamerUri {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        write!(f, "(")?;
91        for (i, node) in self.nodes.iter().enumerate() {
92            if i > 0 {
93                write!(f, ",")?;
94            }
95            write!(f, "{}", node)?;
96        }
97        write!(f, ")")
98    }
99}
100
101impl From<Url> for StreamerUri {
102    fn from(value: Url) -> Self {
103        Self { nodes: vec![value] }
104    }
105}
106
107impl FromIterator<Url> for StreamerUri {
108    fn from_iter<T: IntoIterator<Item = Url>>(iter: T) -> Self {
109        Self {
110            nodes: iter.into_iter().collect(),
111        }
112    }
113}
114
115impl StreamerUri {
116    pub fn zero() -> Self {
117        Self { nodes: Vec::new() }
118    }
119
120    pub fn one(url: Url) -> Self {
121        Self { nodes: vec![url] }
122    }
123
124    pub fn many(urls: impl Iterator<Item = Url>) -> Self {
125        let nodes: Vec<Url> = urls.collect();
126        Self { nodes }
127    }
128
129    pub fn protocol(&self) -> Option<&str> {
130        match self.nodes.first() {
131            Some(node) => {
132                if let Some((front, _)) = node.as_str().split_once("://") {
133                    Some(front)
134                } else {
135                    None
136                }
137            }
138            None => None,
139        }
140    }
141
142    pub fn nodes(&self) -> &[Url] {
143        &self.nodes
144    }
145
146    pub fn into_nodes(self) -> impl Iterator<Item = Url> {
147        self.nodes.into_iter()
148    }
149}
150
151impl StreamUrl {
152    pub fn streamer(&self) -> StreamerUri {
153        self.streamer.to_owned()
154    }
155
156    pub fn streamer_ref(&self) -> &StreamerUri {
157        &self.streamer
158    }
159
160    pub fn stream_keys(&self) -> &[StreamKey] {
161        &self.streams
162    }
163
164    pub fn stream_key(&self) -> Result<StreamKey, StreamUrlErr> {
165        if self.streams.len() == 1 {
166            Ok(self.streams[0].to_owned())
167        } else {
168            Err(StreamUrlErr::NotOneStreamKey)
169        }
170    }
171}
172
173impl FromStr for StreamUrl {
174    type Err = StreamUrlErr;
175
176    fn from_str(mut urls: &str) -> Result<Self, Self::Err> {
177        let protocol = if let Some((front, remaining)) = urls.split_once("://") {
178            urls = remaining;
179            Some(front)
180        } else {
181            None
182        };
183        let streams = if let Some((front, remaining)) = urls.rsplit_once('/') {
184            urls = front;
185            if remaining.is_empty() {
186                None
187            } else {
188                Some(remaining)
189            }
190        } else {
191            return Err(StreamUrlErr::NoEndingSlash);
192        };
193        parse_url(protocol, urls, streams)
194    }
195}
196
197impl FromStr for StreamerUri {
198    type Err = StreamUrlErr;
199
200    fn from_str(mut urls: &str) -> Result<Self, Self::Err> {
201        let protocol = if let Some((front, remaining)) = urls.split_once("://") {
202            urls = remaining;
203            Some(front)
204        } else {
205            None
206        };
207        Ok(parse_url(protocol, urls, None)?.streamer)
208    }
209}
210
211fn parse_url(
212    protocol: Option<&str>,
213    urls: &str,
214    streams: Option<&str>,
215) -> Result<StreamUrl, StreamUrlErr> {
216    let urls: Vec<_> = if urls.is_empty() {
217        if let Some(protocol) = protocol {
218            vec![format!("{protocol}://.")
219                .as_str()
220                .parse::<Url>()
221                .map_err(Into::<StreamUrlErr>::into)?]
222        } else {
223            return Err(StreamUrlErr::ProtocolRequired);
224        }
225    } else {
226        urls.split(',')
227            .filter(|x| !x.is_empty())
228            .map(|s| {
229                if let Some(protocol) = protocol {
230                    FromStr::from_str(format!("{protocol}://{s}").as_str())
231                } else {
232                    FromStr::from_str(s)
233                }
234                .map_err(Into::into)
235            })
236            .collect::<Result<Vec<_>, StreamUrlErr>>()?
237    };
238
239    Ok(StreamUrl {
240        streamer: StreamerUri { nodes: urls },
241        streams: match streams {
242            None => Default::default(),
243            Some(streams) => streams
244                .split(',')
245                .filter(|x| !x.is_empty())
246                .map(|n| StreamKey::new(n).map_err(Into::into))
247                .collect::<Result<Vec<StreamKey>, StreamUrlErr>>()?,
248        },
249    })
250}
251
252#[cfg(test)]
253mod test {
254    use super::*;
255
256    #[test]
257    fn test_parse_stream_url() {
258        let stream_keys = vec![StreamKey::new("a").unwrap(), StreamKey::new("b").unwrap()];
259
260        let streamer: StreamerUri = "sea-ql.org:1234".parse().unwrap();
261        assert_eq!(streamer.protocol(), None);
262        assert_eq!(streamer.nodes(), &["sea-ql.org:1234".parse().unwrap()]);
263
264        assert!("proto://sea-ql.org:1234".parse::<StreamUrl>().is_err());
265
266        let stream: StreamUrl = "proto://sea-ql.org:1234/".parse().unwrap();
267        assert_eq!(stream.streamer.protocol(), Some("proto"));
268        assert_eq!(
269            stream.streamer.nodes(),
270            &["proto://sea-ql.org:1234".parse().unwrap()]
271        );
272        assert_eq!(stream.stream_keys(), &[]);
273
274        let stream: StreamUrl = "proto://sea-ql.org:1234/stream".parse().unwrap();
275        assert_eq!(stream.streamer.protocol(), Some("proto"));
276        assert_eq!(
277            stream.streamer.nodes(),
278            &["proto://sea-ql.org:1234".parse().unwrap()]
279        );
280        assert_eq!(stream.stream_keys(), &[StreamKey::new("stream").unwrap()]);
281
282        let stream: StreamUrl = "proto://sea-ql.org:1234/a,b".parse().unwrap();
283        assert_eq!(stream.streamer.protocol(), Some("proto"));
284        assert_eq!(
285            stream.streamer.nodes(),
286            &["proto://sea-ql.org:1234".parse().unwrap()]
287        );
288        assert_eq!(stream.stream_keys(), &stream_keys);
289
290        let nodes = [
291            "kafka://node-a:1234".parse().unwrap(),
292            "kafka://node-b:1234".parse().unwrap(),
293        ];
294        let stream: StreamUrl = "kafka://node-a:1234,node-b:1234/a,b".parse().unwrap();
295        assert_eq!(stream.streamer.protocol(), Some("kafka"));
296        assert_eq!(stream.streamer.nodes(), &nodes);
297        assert_eq!(stream.stream_keys(), &stream_keys);
298
299        let stream: StreamUrl = "stdio:///".parse().unwrap();
300        assert_eq!(stream.streamer.protocol(), Some("stdio"));
301        assert_eq!(stream.streamer.nodes(), &["stdio://.".parse().unwrap()]);
302        assert_eq!(stream.stream_keys(), &[]);
303
304        let stream: StreamUrl = "redis://localhost/".parse().unwrap();
305        assert_eq!(stream.streamer.protocol(), Some("redis"));
306        assert_eq!(
307            stream.streamer.nodes(),
308            &["redis://localhost".parse().unwrap()]
309        );
310        assert_eq!(stream.stream_keys(), &[]);
311
312        let stream: StreamUrl = "redis://localhost/a,b".parse().unwrap();
313        assert_eq!(stream.streamer.protocol(), Some("redis"));
314        assert_eq!(
315            stream.streamer.nodes(),
316            &["redis://localhost".parse().unwrap()]
317        );
318        assert_eq!(stream.stream_keys(), &stream_keys);
319
320        let stream: StreamUrl = "stdio:///a,b".parse().unwrap();
321        assert_eq!(stream.streamer.protocol(), Some("stdio"));
322        assert_eq!(stream.streamer.nodes(), &["stdio://.".parse().unwrap()]);
323        assert_eq!(stream.stream_keys(), &stream_keys);
324
325        let stream: StreamUrl = "file://./path/to/hi/a,b".parse().unwrap();
326        assert_eq!(stream.streamer.protocol(), Some("file"));
327        assert_eq!(
328            stream.streamer.nodes(),
329            &["file://./path/to/hi".parse().unwrap()]
330        );
331
332        let stream: StreamUrl = "file://./path/to/hi/".parse().unwrap();
333        assert_eq!(stream.streamer.protocol(), Some("file"));
334        assert_eq!(
335            stream.streamer.nodes(),
336            &["file://./path/to/hi".parse().unwrap()]
337        );
338        assert_eq!(stream.stream_keys(), &[]);
339    }
340
341    #[test]
342    fn test_parse_streamer_uri() {
343        let uri: StreamerUri = "kafka://localhost:9092".parse().unwrap();
344        assert_eq!(uri.protocol(), Some("kafka"));
345        assert_eq!(uri.nodes(), &["kafka://localhost:9092".parse().unwrap()]);
346
347        let uri: StreamerUri = "redis://localhost:6379".parse().unwrap();
348        assert_eq!(uri.protocol(), Some("redis"));
349        assert_eq!(uri.nodes(), &["redis://localhost:6379".parse().unwrap()]);
350
351        let uri: StreamerUri = "stdio://".parse().unwrap();
352        assert_eq!(uri.protocol(), Some("stdio"));
353        assert_eq!(uri.nodes(), &["stdio://.".parse().unwrap()]);
354
355        let uri: StreamerUri = "file://./path/to/hi".parse().unwrap();
356        assert_eq!(uri.protocol(), Some("file"));
357        assert_eq!(uri.nodes(), &["file://./path/to/hi".parse().unwrap()]);
358
359        let uri: StreamerUri = "file:///path/to/hi".parse().unwrap();
360        assert_eq!(uri.protocol(), Some("file"));
361        assert_eq!(uri.nodes(), &["file:///path/to/hi".parse().unwrap()]);
362    }
363
364    #[test]
365    fn test_into_streamer_uri() {
366        let url: Url = "proto://sea-ql.org:1234".parse().unwrap();
367        let uri: StreamerUri = url.clone().into();
368        assert!(uri.nodes.len() == 1);
369        assert_eq!(url, uri.nodes.first().unwrap().clone());
370
371        let urls: [Url; 3] = [
372            "proto://sea-ql.org:1".parse().unwrap(),
373            "proto://sea-ql.org:2".parse().unwrap(),
374            "proto://sea-ql.org:3".parse().unwrap(),
375        ];
376        let uri: StreamerUri = StreamerUri::from_iter(urls.clone().into_iter());
377        assert!(uri.nodes.len() == 3);
378        assert!(uri.nodes.iter().eq(urls.iter()));
379    }
380
381    #[test]
382    fn test_parse_stream_url_err() {
383        use crate::StreamKeyErr;
384
385        assert!(matches!(
386            "proto://sea-ql.org:1234/stream?".parse::<StreamUrl>(),
387            Err(StreamUrlErr::StreamKeyErr(StreamKeyErr::InvalidStreamKey))
388        ));
389    }
390}