#[cfg(feature = "backend-file")]
use sea_streamer_file::FileMessage;
#[cfg(feature = "backend-kafka")]
use sea_streamer_kafka::KafkaMessage;
#[cfg(feature = "backend-redis")]
use sea_streamer_redis::RedisMessage;
#[cfg(feature = "backend-stdio")]
use sea_streamer_stdio::StdioMessage;
use crate::{Backend, SeaStreamerBackend};
use sea_streamer_types::{Message, Payload, SeqNo, ShardId, StreamKey, Timestamp};
#[derive(Debug)]
pub enum SeaMessage<'a> {
#[cfg(feature = "backend-kafka")]
Kafka(KafkaMessage<'a>),
#[cfg(feature = "backend-redis")]
Redis(RedisMessage),
#[cfg(feature = "backend-stdio")]
Stdio(StdioMessage),
#[cfg(feature = "backend-file")]
File(FileMessage),
#[cfg(not(feature = "backend-kafka"))]
None(std::marker::PhantomData<&'a ()>),
}
impl<'a> SeaStreamerBackend for SeaMessage<'a> {
#[cfg(feature = "backend-kafka")]
type Kafka = KafkaMessage<'a>;
#[cfg(feature = "backend-redis")]
type Redis = RedisMessage;
#[cfg(feature = "backend-stdio")]
type Stdio = StdioMessage;
#[cfg(feature = "backend-file")]
type File = FileMessage;
fn backend(&self) -> Backend {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(_) => Backend::Kafka,
#[cfg(feature = "backend-redis")]
Self::Redis(_) => Backend::Redis,
#[cfg(feature = "backend-stdio")]
Self::Stdio(_) => Backend::Stdio,
#[cfg(feature = "backend-file")]
Self::File(_) => Backend::File,
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => unreachable!(),
}
}
#[cfg(feature = "backend-kafka")]
fn get_kafka(&mut self) -> Option<&mut KafkaMessage<'a>> {
match self {
Self::Kafka(s) => Some(s),
#[cfg(feature = "backend-redis")]
Self::Redis(_) => None,
#[cfg(feature = "backend-stdio")]
Self::Stdio(_) => None,
#[cfg(feature = "backend-file")]
Self::File(_) => None,
}
}
#[cfg(feature = "backend-redis")]
fn get_redis(&mut self) -> Option<&mut RedisMessage> {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(_) => None,
Self::Redis(s) => Some(s),
#[cfg(feature = "backend-stdio")]
Self::Stdio(_) => None,
#[cfg(feature = "backend-file")]
Self::File(_) => None,
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => None,
}
}
#[cfg(feature = "backend-stdio")]
fn get_stdio(&mut self) -> Option<&mut StdioMessage> {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(_) => None,
#[cfg(feature = "backend-redis")]
Self::Redis(_) => None,
Self::Stdio(s) => Some(s),
#[cfg(feature = "backend-file")]
Self::File(_) => None,
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => None,
}
}
#[cfg(feature = "backend-file")]
fn get_file(&mut self) -> Option<&mut FileMessage> {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(_) => None,
#[cfg(feature = "backend-redis")]
Self::Redis(_) => None,
#[cfg(feature = "backend-stdio")]
Self::Stdio(_) => None,
Self::File(s) => Some(s),
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => None,
}
}
}
impl<'a> Message for SeaMessage<'a> {
fn stream_key(&self) -> StreamKey {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(i) => i.stream_key(),
#[cfg(feature = "backend-redis")]
Self::Redis(i) => i.stream_key(),
#[cfg(feature = "backend-stdio")]
Self::Stdio(i) => i.stream_key(),
#[cfg(feature = "backend-file")]
Self::File(i) => i.stream_key(),
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => unreachable!(),
}
}
fn shard_id(&self) -> ShardId {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(i) => i.shard_id(),
#[cfg(feature = "backend-redis")]
Self::Redis(i) => i.shard_id(),
#[cfg(feature = "backend-stdio")]
Self::Stdio(i) => i.shard_id(),
#[cfg(feature = "backend-file")]
Self::File(i) => i.shard_id(),
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => unreachable!(),
}
}
fn sequence(&self) -> SeqNo {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(i) => i.sequence(),
#[cfg(feature = "backend-redis")]
Self::Redis(i) => i.sequence(),
#[cfg(feature = "backend-stdio")]
Self::Stdio(i) => i.sequence(),
#[cfg(feature = "backend-file")]
Self::File(i) => i.sequence(),
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => unreachable!(),
}
}
fn timestamp(&self) -> Timestamp {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(i) => i.timestamp(),
#[cfg(feature = "backend-redis")]
Self::Redis(i) => i.timestamp(),
#[cfg(feature = "backend-stdio")]
Self::Stdio(i) => i.timestamp(),
#[cfg(feature = "backend-file")]
Self::File(i) => i.timestamp(),
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => unreachable!(),
}
}
fn message(&self) -> Payload {
match self {
#[cfg(feature = "backend-kafka")]
Self::Kafka(i) => i.message(),
#[cfg(feature = "backend-redis")]
Self::Redis(i) => i.message(),
#[cfg(feature = "backend-stdio")]
Self::Stdio(i) => i.message(),
#[cfg(feature = "backend-file")]
Self::File(i) => i.message(),
#[cfg(not(feature = "backend-kafka"))]
Self::None(_) => unreachable!(),
}
}
}