use super::error::TransportResult;
use super::filter::FilteredDlqEntry;
use super::types::{Message, SendResult};
use super::work_batch::{Record, WorkBatch};
use std::fmt::{Debug, Display};
use std::future::Future;
pub trait CommitToken: Clone + Send + Sync + Debug + Display + 'static {
fn as_str(&self) -> String {
format!("{self}")
}
}
#[derive(Debug)]
pub struct RecvBatch<T: CommitToken> {
pub messages: Vec<Message<T>>,
pub dlq_entries: Vec<FilteredDlqEntry>,
}
impl<T: CommitToken> RecvBatch<T> {
#[must_use]
pub fn empty() -> Self {
Self {
messages: Vec::new(),
dlq_entries: Vec::new(),
}
}
#[must_use]
pub fn from_messages(messages: Vec<Message<T>>) -> Self {
Self {
messages,
dlq_entries: Vec::new(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.messages.len()
}
}
pub trait TransportBase: Send + Sync {
fn close(&self) -> impl Future<Output = TransportResult<()>> + Send;
fn is_healthy(&self) -> bool;
fn name(&self) -> &'static str;
}
pub trait TransportSender: TransportBase {
fn send(&self, key: &str, payload: bytes::Bytes) -> impl Future<Output = SendResult> + Send;
fn send_batch(&self, records: &[Record]) -> impl Future<Output = SendResult> + Send {
async move {
for record in records {
let key = record.key.as_deref().unwrap_or("");
let result = self.send(key, record.payload.clone()).await;
if !result.is_ok() {
return result;
}
}
SendResult::Ok
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RecvLimits {
pub max_records: usize,
pub max_bytes: u64,
}
pub trait TransportReceiver: TransportBase {
type Token: CommitToken;
fn recv(
&self,
max: usize,
) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
fn recv_limited(
&self,
limits: RecvLimits,
) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send {
self.recv(limits.max_records)
}
fn commit(&self, tokens: &[Self::Token]) -> impl Future<Output = TransportResult<()>> + Send;
}
pub trait Transport: TransportSender + TransportReceiver {}
impl<T: TransportSender + TransportReceiver> Transport for T {}
pub trait FromCascade: Default + serde::Serialize + serde::de::DeserializeOwned + 'static {
#[must_use]
fn from_cascade_key(key: &str) -> Self {
#[cfg(feature = "config")]
{
if let Some(cfg) = crate::config::try_get()
&& let Ok(value) = cfg.unmarshal_key_registered::<Self>(key)
{
return value;
}
}
#[cfg(not(feature = "config"))]
let _ = key;
Self::default()
}
}