use std::{fmt::Display, str::FromStr};
use crate::{
ConnectOptions, Consumer, ConsumerOptions, Producer, ProducerOptions, StreamKey, StreamResult,
StreamUrlErr,
};
use futures::{Future, FutureExt};
use url::Url;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamerUri {
nodes: Vec<Url>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamUrl {
streamer: StreamerUri,
streams: Vec<StreamKey>,
}
pub trait Streamer: Sized {
type Error: std::error::Error;
type Producer: Producer<Error = Self::Error>;
type Consumer: Consumer<Error = Self::Error>;
type ConnectOptions: ConnectOptions;
type ConsumerOptions: ConsumerOptions;
type ProducerOptions: ProducerOptions;
fn connect(
streamer: StreamerUri,
options: Self::ConnectOptions,
) -> impl Future<Output = StreamResult<Self, Self::Error>> + Send;
fn disconnect(self) -> impl Future<Output = StreamResult<(), Self::Error>> + Send;
fn create_generic_producer(
&self,
options: Self::ProducerOptions,
) -> impl Future<Output = StreamResult<Self::Producer, Self::Error>> + Send;
fn create_producer(
&self,
stream: StreamKey,
options: Self::ProducerOptions,
) -> impl Future<Output = StreamResult<Self::Producer, Self::Error>> + Send {
self.create_generic_producer(options).map(|res| {
res.and_then(|mut producer| {
producer.anchor(stream)?;
Ok(producer)
})
})
}
fn create_consumer(
&self,
streams: &[StreamKey],
options: Self::ConsumerOptions,
) -> impl Future<Output = StreamResult<Self::Consumer, Self::Error>> + Send;
}
impl Display for StreamerUri {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(")?;
for (i, node) in self.nodes.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
write!(f, "{}", node)?;
}
write!(f, ")")
}
}
impl StreamerUri {
pub fn zero() -> Self {
Self { nodes: Vec::new() }
}
pub fn one(url: Url) -> Self {
Self { nodes: vec![url] }
}
pub fn many(urls: impl Iterator<Item = Url>) -> Self {
let nodes: Vec<Url> = urls.collect();
Self { nodes }
}
pub fn protocol(&self) -> Option<&str> {
match self.nodes.first() {
Some(node) => {
if let Some((front, _)) = node.as_str().split_once("://") {
Some(front)
} else {
None
}
}
None => None,
}
}
pub fn nodes(&self) -> &[Url] {
&self.nodes
}
pub fn into_nodes(self) -> impl Iterator<Item = Url> {
self.nodes.into_iter()
}
}
impl StreamUrl {
pub fn streamer(&self) -> StreamerUri {
self.streamer.to_owned()
}
pub fn streamer_ref(&self) -> &StreamerUri {
&self.streamer
}
pub fn stream_keys(&self) -> &[StreamKey] {
&self.streams
}
pub fn stream_key(&self) -> Result<StreamKey, StreamUrlErr> {
if self.streams.len() == 1 {
Ok(self.streams[0].to_owned())
} else {
Err(StreamUrlErr::NotOneStreamKey)
}
}
}
impl FromStr for StreamUrl {
type Err = StreamUrlErr;
fn from_str(mut urls: &str) -> Result<Self, Self::Err> {
let protocol = if let Some((front, remaining)) = urls.split_once("://") {
urls = remaining;
Some(front)
} else {
None
};
let streams = if let Some((front, remaining)) = urls.rsplit_once('/') {
urls = front;
if remaining.is_empty() {
None
} else {
Some(remaining)
}
} else {
return Err(StreamUrlErr::NoEndingSlash);
};
parse_url(protocol, urls, streams)
}
}
impl FromStr for StreamerUri {
type Err = StreamUrlErr;
fn from_str(mut urls: &str) -> Result<Self, Self::Err> {
let protocol = if let Some((front, remaining)) = urls.split_once("://") {
urls = remaining;
Some(front)
} else {
None
};
Ok(parse_url(protocol, urls, None)?.streamer)
}
}
fn parse_url(
protocol: Option<&str>,
urls: &str,
streams: Option<&str>,
) -> Result<StreamUrl, StreamUrlErr> {
let urls: Vec<_> = if urls.is_empty() {
if let Some(protocol) = protocol {
vec![format!("{protocol}://.")
.as_str()
.parse::<Url>()
.map_err(Into::<StreamUrlErr>::into)?]
} else {
return Err(StreamUrlErr::ProtocolRequired);
}
} else {
urls.split(',')
.filter(|x| !x.is_empty())
.map(|s| {
if let Some(protocol) = protocol {
FromStr::from_str(format!("{protocol}://{s}").as_str())
} else {
FromStr::from_str(s)
}
.map_err(Into::into)
})
.collect::<Result<Vec<_>, StreamUrlErr>>()?
};
Ok(StreamUrl {
streamer: StreamerUri { nodes: urls },
streams: match streams {
None => Default::default(),
Some(streams) => streams
.split(',')
.filter(|x| !x.is_empty())
.map(|n| StreamKey::new(n).map_err(Into::into))
.collect::<Result<Vec<StreamKey>, StreamUrlErr>>()?,
},
})
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_parse_stream_url() {
let stream_keys = vec![StreamKey::new("a").unwrap(), StreamKey::new("b").unwrap()];
let streamer: StreamerUri = "sea-ql.org:1234".parse().unwrap();
assert_eq!(streamer.protocol(), None);
assert_eq!(streamer.nodes(), &["sea-ql.org:1234".parse().unwrap()]);
assert!("proto://sea-ql.org:1234".parse::<StreamUrl>().is_err());
let stream: StreamUrl = "proto://sea-ql.org:1234/".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("proto"));
assert_eq!(
stream.streamer.nodes(),
&["proto://sea-ql.org:1234".parse().unwrap()]
);
assert_eq!(stream.stream_keys(), &[]);
let stream: StreamUrl = "proto://sea-ql.org:1234/stream".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("proto"));
assert_eq!(
stream.streamer.nodes(),
&["proto://sea-ql.org:1234".parse().unwrap()]
);
assert_eq!(stream.stream_keys(), &[StreamKey::new("stream").unwrap()]);
let stream: StreamUrl = "proto://sea-ql.org:1234/a,b".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("proto"));
assert_eq!(
stream.streamer.nodes(),
&["proto://sea-ql.org:1234".parse().unwrap()]
);
assert_eq!(stream.stream_keys(), &stream_keys);
let nodes = [
"kafka://node-a:1234".parse().unwrap(),
"kafka://node-b:1234".parse().unwrap(),
];
let stream: StreamUrl = "kafka://node-a:1234,node-b:1234/a,b".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("kafka"));
assert_eq!(stream.streamer.nodes(), &nodes);
assert_eq!(stream.stream_keys(), &stream_keys);
let stream: StreamUrl = "stdio:///".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("stdio"));
assert_eq!(stream.streamer.nodes(), &["stdio://.".parse().unwrap()]);
assert_eq!(stream.stream_keys(), &[]);
let stream: StreamUrl = "redis://localhost/".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("redis"));
assert_eq!(
stream.streamer.nodes(),
&["redis://localhost".parse().unwrap()]
);
assert_eq!(stream.stream_keys(), &[]);
let stream: StreamUrl = "redis://localhost/a,b".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("redis"));
assert_eq!(
stream.streamer.nodes(),
&["redis://localhost".parse().unwrap()]
);
assert_eq!(stream.stream_keys(), &stream_keys);
let stream: StreamUrl = "stdio:///a,b".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("stdio"));
assert_eq!(stream.streamer.nodes(), &["stdio://.".parse().unwrap()]);
assert_eq!(stream.stream_keys(), &stream_keys);
let stream: StreamUrl = "file://./path/to/hi/a,b".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("file"));
assert_eq!(
stream.streamer.nodes(),
&["file://./path/to/hi".parse().unwrap()]
);
let stream: StreamUrl = "file://./path/to/hi/".parse().unwrap();
assert_eq!(stream.streamer.protocol(), Some("file"));
assert_eq!(
stream.streamer.nodes(),
&["file://./path/to/hi".parse().unwrap()]
);
assert_eq!(stream.stream_keys(), &[]);
}
#[test]
fn test_parse_streamer_uri() {
let uri: StreamerUri = "kafka://localhost:9092".parse().unwrap();
assert_eq!(uri.protocol(), Some("kafka"));
assert_eq!(uri.nodes(), &["kafka://localhost:9092".parse().unwrap()]);
let uri: StreamerUri = "redis://localhost:6379".parse().unwrap();
assert_eq!(uri.protocol(), Some("redis"));
assert_eq!(uri.nodes(), &["redis://localhost:6379".parse().unwrap()]);
let uri: StreamerUri = "stdio://".parse().unwrap();
assert_eq!(uri.protocol(), Some("stdio"));
assert_eq!(uri.nodes(), &["stdio://.".parse().unwrap()]);
let uri: StreamerUri = "file://./path/to/hi".parse().unwrap();
assert_eq!(uri.protocol(), Some("file"));
assert_eq!(uri.nodes(), &["file://./path/to/hi".parse().unwrap()]);
let uri: StreamerUri = "file:///path/to/hi".parse().unwrap();
assert_eq!(uri.protocol(), Some("file"));
assert_eq!(uri.nodes(), &["file:///path/to/hi".parse().unwrap()]);
}
#[test]
fn test_parse_stream_url_err() {
use crate::StreamKeyErr;
assert!(matches!(
"proto://sea-ql.org:1234/stream?".parse::<StreamUrl>(),
Err(StreamUrlErr::StreamKeyErr(StreamKeyErr::InvalidStreamKey))
));
}
}