1use std::{fmt::Display, str::FromStr};
2
3use crate::{
4 ConnectOptions, Consumer, ConsumerOptions, Producer, ProducerOptions, StreamKey, StreamResult,
5 StreamUrlErr,
6};
7use futures::{Future, FutureExt};
8use url::Url;
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub struct StreamerUri {
22 nodes: Vec<Url>,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub struct StreamUrl {
37 streamer: StreamerUri,
38 streams: Vec<StreamKey>,
39}
40
41pub trait Streamer: Sized {
43 type Error: std::error::Error;
44 type Producer: Producer<Error = Self::Error>;
45 type Consumer: Consumer<Error = Self::Error>;
46 type ConnectOptions: ConnectOptions;
47 type ConsumerOptions: ConsumerOptions;
48 type ProducerOptions: ProducerOptions;
49
50 fn connect(
52 streamer: StreamerUri,
53 options: Self::ConnectOptions,
54 ) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send;
55
56 fn disconnect(self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
58
59 fn create_generic_producer(
61 &self,
62 options: Self::ProducerOptions,
63 ) -> impl Future<Output = StreamResult<Self::Producer, Self::Error>> + Send;
64
65 fn create_producer(
67 &self,
68 stream: StreamKey,
69 options: Self::ProducerOptions,
70 ) -> impl Future<Output = StreamResult<Self::Producer, Self::Error>> + Send {
71 self.create_generic_producer(options).map(|res| {
72 res.and_then(|mut producer| {
73 producer.anchor(stream)?;
74
75 Ok(producer)
76 })
77 })
78 }
79
80 fn create_consumer(
82 &self,
83 streams: &[StreamKey],
84 options: Self::ConsumerOptions,
85 ) -> impl Future<Output = StreamResult<Self::Consumer, Self::Error>> + Send;
86}
87
88impl Display for StreamerUri {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 write!(f, "(")?;
91 for (i, node) in self.nodes.iter().enumerate() {
92 if i > 0 {
93 write!(f, ",")?;
94 }
95 write!(f, "{}", node)?;
96 }
97 write!(f, ")")
98 }
99}
100
101impl From<Url> for StreamerUri {
102 fn from(value: Url) -> Self {
103 Self { nodes: vec![value] }
104 }
105}
106
107impl FromIterator<Url> for StreamerUri {
108 fn from_iter<T: IntoIterator<Item = Url>>(iter: T) -> Self {
109 Self {
110 nodes: iter.into_iter().collect(),
111 }
112 }
113}
114
115impl StreamerUri {
116 pub fn zero() -> Self {
117 Self { nodes: Vec::new() }
118 }
119
120 pub fn one(url: Url) -> Self {
121 Self { nodes: vec![url] }
122 }
123
124 pub fn many(urls: impl Iterator<Item = Url>) -> Self {
125 let nodes: Vec<Url> = urls.collect();
126 Self { nodes }
127 }
128
129 pub fn protocol(&self) -> Option<&str> {
130 match self.nodes.first() {
131 Some(node) => {
132 if let Some((front, _)) = node.as_str().split_once("://") {
133 Some(front)
134 } else {
135 None
136 }
137 }
138 None => None,
139 }
140 }
141
142 pub fn nodes(&self) -> &[Url] {
143 &self.nodes
144 }
145
146 pub fn into_nodes(self) -> impl Iterator<Item = Url> {
147 self.nodes.into_iter()
148 }
149}
150
151impl StreamUrl {
152 pub fn streamer(&self) -> StreamerUri {
153 self.streamer.to_owned()
154 }
155
156 pub fn streamer_ref(&self) -> &StreamerUri {
157 &self.streamer
158 }
159
160 pub fn stream_keys(&self) -> &[StreamKey] {
161 &self.streams
162 }
163
164 pub fn stream_key(&self) -> Result<StreamKey, StreamUrlErr> {
165 if self.streams.len() == 1 {
166 Ok(self.streams[0].to_owned())
167 } else {
168 Err(StreamUrlErr::NotOneStreamKey)
169 }
170 }
171}
172
173impl FromStr for StreamUrl {
174 type Err = StreamUrlErr;
175
176 fn from_str(mut urls: &str) -> Result<Self, Self::Err> {
177 let protocol = if let Some((front, remaining)) = urls.split_once("://") {
178 urls = remaining;
179 Some(front)
180 } else {
181 None
182 };
183 let streams = if let Some((front, remaining)) = urls.rsplit_once('/') {
184 urls = front;
185 if remaining.is_empty() {
186 None
187 } else {
188 Some(remaining)
189 }
190 } else {
191 return Err(StreamUrlErr::NoEndingSlash);
192 };
193 parse_url(protocol, urls, streams)
194 }
195}
196
197impl FromStr for StreamerUri {
198 type Err = StreamUrlErr;
199
200 fn from_str(mut urls: &str) -> Result<Self, Self::Err> {
201 let protocol = if let Some((front, remaining)) = urls.split_once("://") {
202 urls = remaining;
203 Some(front)
204 } else {
205 None
206 };
207 Ok(parse_url(protocol, urls, None)?.streamer)
208 }
209}
210
211fn parse_url(
212 protocol: Option<&str>,
213 urls: &str,
214 streams: Option<&str>,
215) -> Result<StreamUrl, StreamUrlErr> {
216 let urls: Vec<_> = if urls.is_empty() {
217 if let Some(protocol) = protocol {
218 vec![format!("{protocol}://.")
219 .as_str()
220 .parse::<Url>()
221 .map_err(Into::<StreamUrlErr>::into)?]
222 } else {
223 return Err(StreamUrlErr::ProtocolRequired);
224 }
225 } else {
226 urls.split(',')
227 .filter(|x| !x.is_empty())
228 .map(|s| {
229 if let Some(protocol) = protocol {
230 FromStr::from_str(format!("{protocol}://{s}").as_str())
231 } else {
232 FromStr::from_str(s)
233 }
234 .map_err(Into::into)
235 })
236 .collect::<Result<Vec<_>, StreamUrlErr>>()?
237 };
238
239 Ok(StreamUrl {
240 streamer: StreamerUri { nodes: urls },
241 streams: match streams {
242 None => Default::default(),
243 Some(streams) => streams
244 .split(',')
245 .filter(|x| !x.is_empty())
246 .map(|n| StreamKey::new(n).map_err(Into::into))
247 .collect::<Result<Vec<StreamKey>, StreamUrlErr>>()?,
248 },
249 })
250}
251
252#[cfg(test)]
253mod test {
254 use super::*;
255
256 #[test]
257 fn test_parse_stream_url() {
258 let stream_keys = vec![StreamKey::new("a").unwrap(), StreamKey::new("b").unwrap()];
259
260 let streamer: StreamerUri = "sea-ql.org:1234".parse().unwrap();
261 assert_eq!(streamer.protocol(), None);
262 assert_eq!(streamer.nodes(), &["sea-ql.org:1234".parse().unwrap()]);
263
264 assert!("proto://sea-ql.org:1234".parse::<StreamUrl>().is_err());
265
266 let stream: StreamUrl = "proto://sea-ql.org:1234/".parse().unwrap();
267 assert_eq!(stream.streamer.protocol(), Some("proto"));
268 assert_eq!(
269 stream.streamer.nodes(),
270 &["proto://sea-ql.org:1234".parse().unwrap()]
271 );
272 assert_eq!(stream.stream_keys(), &[]);
273
274 let stream: StreamUrl = "proto://sea-ql.org:1234/stream".parse().unwrap();
275 assert_eq!(stream.streamer.protocol(), Some("proto"));
276 assert_eq!(
277 stream.streamer.nodes(),
278 &["proto://sea-ql.org:1234".parse().unwrap()]
279 );
280 assert_eq!(stream.stream_keys(), &[StreamKey::new("stream").unwrap()]);
281
282 let stream: StreamUrl = "proto://sea-ql.org:1234/a,b".parse().unwrap();
283 assert_eq!(stream.streamer.protocol(), Some("proto"));
284 assert_eq!(
285 stream.streamer.nodes(),
286 &["proto://sea-ql.org:1234".parse().unwrap()]
287 );
288 assert_eq!(stream.stream_keys(), &stream_keys);
289
290 let nodes = [
291 "kafka://node-a:1234".parse().unwrap(),
292 "kafka://node-b:1234".parse().unwrap(),
293 ];
294 let stream: StreamUrl = "kafka://node-a:1234,node-b:1234/a,b".parse().unwrap();
295 assert_eq!(stream.streamer.protocol(), Some("kafka"));
296 assert_eq!(stream.streamer.nodes(), &nodes);
297 assert_eq!(stream.stream_keys(), &stream_keys);
298
299 let stream: StreamUrl = "stdio:///".parse().unwrap();
300 assert_eq!(stream.streamer.protocol(), Some("stdio"));
301 assert_eq!(stream.streamer.nodes(), &["stdio://.".parse().unwrap()]);
302 assert_eq!(stream.stream_keys(), &[]);
303
304 let stream: StreamUrl = "redis://localhost/".parse().unwrap();
305 assert_eq!(stream.streamer.protocol(), Some("redis"));
306 assert_eq!(
307 stream.streamer.nodes(),
308 &["redis://localhost".parse().unwrap()]
309 );
310 assert_eq!(stream.stream_keys(), &[]);
311
312 let stream: StreamUrl = "redis://localhost/a,b".parse().unwrap();
313 assert_eq!(stream.streamer.protocol(), Some("redis"));
314 assert_eq!(
315 stream.streamer.nodes(),
316 &["redis://localhost".parse().unwrap()]
317 );
318 assert_eq!(stream.stream_keys(), &stream_keys);
319
320 let stream: StreamUrl = "stdio:///a,b".parse().unwrap();
321 assert_eq!(stream.streamer.protocol(), Some("stdio"));
322 assert_eq!(stream.streamer.nodes(), &["stdio://.".parse().unwrap()]);
323 assert_eq!(stream.stream_keys(), &stream_keys);
324
325 let stream: StreamUrl = "file://./path/to/hi/a,b".parse().unwrap();
326 assert_eq!(stream.streamer.protocol(), Some("file"));
327 assert_eq!(
328 stream.streamer.nodes(),
329 &["file://./path/to/hi".parse().unwrap()]
330 );
331
332 let stream: StreamUrl = "file://./path/to/hi/".parse().unwrap();
333 assert_eq!(stream.streamer.protocol(), Some("file"));
334 assert_eq!(
335 stream.streamer.nodes(),
336 &["file://./path/to/hi".parse().unwrap()]
337 );
338 assert_eq!(stream.stream_keys(), &[]);
339 }
340
341 #[test]
342 fn test_parse_streamer_uri() {
343 let uri: StreamerUri = "kafka://localhost:9092".parse().unwrap();
344 assert_eq!(uri.protocol(), Some("kafka"));
345 assert_eq!(uri.nodes(), &["kafka://localhost:9092".parse().unwrap()]);
346
347 let uri: StreamerUri = "redis://localhost:6379".parse().unwrap();
348 assert_eq!(uri.protocol(), Some("redis"));
349 assert_eq!(uri.nodes(), &["redis://localhost:6379".parse().unwrap()]);
350
351 let uri: StreamerUri = "stdio://".parse().unwrap();
352 assert_eq!(uri.protocol(), Some("stdio"));
353 assert_eq!(uri.nodes(), &["stdio://.".parse().unwrap()]);
354
355 let uri: StreamerUri = "file://./path/to/hi".parse().unwrap();
356 assert_eq!(uri.protocol(), Some("file"));
357 assert_eq!(uri.nodes(), &["file://./path/to/hi".parse().unwrap()]);
358
359 let uri: StreamerUri = "file:///path/to/hi".parse().unwrap();
360 assert_eq!(uri.protocol(), Some("file"));
361 assert_eq!(uri.nodes(), &["file:///path/to/hi".parse().unwrap()]);
362 }
363
364 #[test]
365 fn test_into_streamer_uri() {
366 let url: Url = "proto://sea-ql.org:1234".parse().unwrap();
367 let uri: StreamerUri = url.clone().into();
368 assert!(uri.nodes.len() == 1);
369 assert_eq!(url, uri.nodes.first().unwrap().clone());
370
371 let urls: [Url; 3] = [
372 "proto://sea-ql.org:1".parse().unwrap(),
373 "proto://sea-ql.org:2".parse().unwrap(),
374 "proto://sea-ql.org:3".parse().unwrap(),
375 ];
376 let uri: StreamerUri = StreamerUri::from_iter(urls.clone().into_iter());
377 assert!(uri.nodes.len() == 3);
378 assert!(uri.nodes.iter().eq(urls.iter()));
379 }
380
381 #[test]
382 fn test_parse_stream_url_err() {
383 use crate::StreamKeyErr;
384
385 assert!(matches!(
386 "proto://sea-ql.org:1234/stream?".parse::<StreamUrl>(),
387 Err(StreamUrlErr::StreamKeyErr(StreamKeyErr::InvalidStreamKey))
388 ));
389 }
390}