use super::error::TransportError;
use super::traits::CommitToken;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TransportType {
#[default]
Kafka,
Grpc,
Memory,
File,
Pipe,
Http,
Redis,
}
impl std::fmt::Display for TransportType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Kafka => write!(f, "kafka"),
Self::Grpc => write!(f, "grpc"),
Self::Memory => write!(f, "memory"),
Self::File => write!(f, "file"),
Self::Pipe => write!(f, "pipe"),
Self::Http => write!(f, "http"),
Self::Redis => write!(f, "redis"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PayloadFormat {
#[default]
Auto,
Json,
MsgPack,
}
impl PayloadFormat {
#[must_use]
pub fn detect(payload: &[u8]) -> Self {
if payload.is_empty() {
return Self::Json; }
if matches!(payload[0], 0x80..=0x8f | 0xde | 0xdf | 0x90..=0x9f | 0xdc | 0xdd) {
Self::MsgPack
} else {
Self::Json
}
}
}
#[derive(Debug, Clone)]
pub struct Message<T: CommitToken> {
pub key: Option<Arc<str>>,
pub payload: Vec<u8>,
pub token: T,
pub timestamp_ms: Option<i64>,
pub format: PayloadFormat,
}
impl<T: CommitToken> Message<T> {
#[must_use]
pub fn new(
key: Option<Arc<str>>,
payload: Vec<u8>,
token: T,
timestamp_ms: Option<i64>,
) -> Self {
let format = PayloadFormat::detect(&payload);
Self {
key,
payload,
token,
timestamp_ms,
format,
}
}
#[must_use]
pub fn is_json(&self) -> bool {
self.format == PayloadFormat::Json
}
#[must_use]
pub fn is_msgpack(&self) -> bool {
self.format == PayloadFormat::MsgPack
}
}
#[derive(Debug)]
pub enum SendResult {
Ok,
Backpressured,
Fatal(TransportError),
FilteredDlq,
}
impl SendResult {
#[must_use]
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok)
}
#[must_use]
pub fn is_backpressured(&self) -> bool {
matches!(self, Self::Backpressured)
}
#[must_use]
pub fn is_fatal(&self) -> bool {
matches!(self, Self::Fatal(_))
}
#[must_use]
pub fn is_filtered_dlq(&self) -> bool {
matches!(self, Self::FilteredDlq)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TransportConfig {
#[serde(rename = "type", default)]
pub transport_type: TransportType,
#[serde(default)]
pub payload_format: PayloadFormat,
#[cfg(feature = "transport-kafka")]
#[serde(default)]
pub kafka: Option<super::kafka::KafkaConfig>,
#[cfg(feature = "transport-grpc")]
#[serde(default)]
pub grpc: Option<super::grpc::GrpcConfig>,
#[cfg(feature = "transport-memory")]
#[serde(default)]
pub memory: Option<super::memory::MemoryConfig>,
#[cfg(feature = "transport-pipe")]
#[serde(default)]
pub pipe: Option<super::pipe::PipeTransportConfig>,
#[cfg(feature = "transport-file")]
#[serde(default)]
pub file: Option<super::file::FileTransportConfig>,
#[cfg(feature = "transport-http")]
#[serde(default)]
pub http: Option<super::http::HttpTransportConfig>,
#[cfg(not(feature = "transport-kafka"))]
#[serde(default, skip)]
pub kafka: Option<()>,
#[cfg(not(feature = "transport-grpc"))]
#[serde(default, skip)]
pub grpc: Option<()>,
#[cfg(not(feature = "transport-memory"))]
#[serde(default, skip)]
pub memory: Option<()>,
#[cfg(not(feature = "transport-pipe"))]
#[serde(default, skip)]
pub pipe: Option<()>,
#[cfg(not(feature = "transport-file"))]
#[serde(default, skip)]
pub file: Option<()>,
#[cfg(not(feature = "transport-http"))]
#[serde(default, skip)]
pub http: Option<()>,
#[cfg(feature = "transport-redis")]
#[serde(default)]
pub redis: Option<super::redis_transport::RedisTransportConfig>,
#[cfg(not(feature = "transport-redis"))]
#[serde(default, skip)]
pub redis: Option<()>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_json_object() {
assert_eq!(PayloadFormat::detect(b"{\"foo\":1}"), PayloadFormat::Json);
}
#[test]
fn detect_json_array() {
assert_eq!(PayloadFormat::detect(b"[1,2,3]"), PayloadFormat::Json);
}
#[test]
fn detect_msgpack_fixmap() {
assert_eq!(PayloadFormat::detect(&[0x81, 0xa3]), PayloadFormat::MsgPack);
}
#[test]
fn detect_msgpack_map16() {
assert_eq!(
PayloadFormat::detect(&[0xde, 0x00, 0x10]),
PayloadFormat::MsgPack
);
}
#[test]
fn detect_empty_defaults_json() {
assert_eq!(PayloadFormat::detect(&[]), PayloadFormat::Json);
}
}