use crate::{frame::MessageType, message::Messages};
#[cfg(feature = "cassandra")]
use cassandra_protocol::compression::Compression;
use core::fmt;
#[cfg(feature = "kafka")]
use kafka::KafkaCodecState;
use metrics::{Histogram, histogram};
use tokio_util::codec::{Decoder, Encoder};
#[cfg(feature = "cassandra")]
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "valkey")]
pub mod valkey;
#[derive(Eq, PartialEq, Copy, Clone)]
pub enum Direction {
Source,
Sink,
}
impl fmt::Display for Direction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Sink => write!(f, "Sink"),
Self::Source => write!(f, "Source"),
}
}
}
pub fn message_latency(direction: Direction, destination_name: String) -> Histogram {
match direction {
Direction::Source => {
histogram!("shotover_sink_to_source_latency_seconds", "source" => destination_name)
}
Direction::Sink => {
histogram!("shotover_source_to_sink_latency_seconds", "sink" => destination_name)
}
}
}
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum CodecState {
#[cfg(feature = "cassandra")]
Cassandra {
compression: Compression,
},
#[cfg(feature = "valkey")]
Valkey,
#[cfg(feature = "kafka")]
Kafka(KafkaCodecState),
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch,
}
impl CodecState {
#[cfg(feature = "cassandra")]
pub fn as_cassandra(&self) -> Compression {
match self {
CodecState::Cassandra { compression } => *compression,
_ => {
panic!("This is a {self:?}, expected CodecState::Cassandra")
}
}
}
#[cfg(feature = "kafka")]
pub fn as_kafka(&self) -> KafkaCodecState {
match self {
CodecState::Kafka(state) => *state,
_ => {
panic!("This is a {self:?}, expected CodecState::Kafka")
}
}
}
}
#[derive(Debug)]
pub enum CodecReadError {
Parser(anyhow::Error),
Io(std::io::Error),
RespondAndThenCloseConnection(Messages),
}
impl From<std::io::Error> for CodecReadError {
fn from(err: std::io::Error) -> Self {
CodecReadError::Io(err)
}
}
#[derive(Debug)]
pub enum CodecWriteError {
Encoder(anyhow::Error),
Io(std::io::Error),
}
impl From<std::io::Error> for CodecWriteError {
fn from(err: std::io::Error) -> Self {
CodecWriteError::Io(err)
}
}
pub trait DecoderHalf: Decoder<Item = Messages, Error = CodecReadError> + Send {}
impl<T: Decoder<Item = Messages, Error = CodecReadError> + Send> DecoderHalf for T {}
pub trait EncoderHalf: Encoder<Messages, Error = CodecWriteError> + Send {}
impl<T: Encoder<Messages, Error = CodecWriteError> + Send> EncoderHalf for T {}
pub trait CodecBuilder: Clone + Send {
type Decoder: DecoderHalf;
type Encoder: EncoderHalf;
fn build(&self) -> (Self::Decoder, Self::Encoder);
fn new(direction: Direction, destination_name: String) -> Self;
fn protocol(&self) -> MessageType;
}