pub mod event;
pub mod runtime;
pub use runtime::ConsumerRuntime;
use futures::future::BoxFuture;
use crate::core::engine::FrozenDiContainer;
use crate::observability::propagation::TraceContext;
#[derive(Clone, Debug)]
pub struct InboundMessage {
pub topic: String,
pub payload: serde_json::Value,
pub idempotency_key: String,
pub tenant: Option<String>,
pub traceparent: Option<String>,
}
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub trait MessageTransport: Send + Sync + 'static {
fn poll(&self, max: usize) -> BoxFuture<'_, Result<Vec<InboundMessage>, BoxError>>;
fn ack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), BoxError>>;
fn nack<'a>(&'a self, msg: &'a InboundMessage) -> BoxFuture<'a, Result<(), BoxError>>;
fn dead_letter<'a>(
&'a self,
msg: &'a InboundMessage,
reason: &'a str,
) -> BoxFuture<'a, Result<(), BoxError>>;
}
#[non_exhaustive]
pub struct EventContext {
pub message: InboundMessage,
pub container: &'static FrozenDiContainer,
pub trace: TraceContext,
pub tenant: Option<std::sync::Arc<crate::web::tenant::TenantConfig>>,
}
impl EventContext {
#[inline]
pub fn inject<T: Send + Sync + 'static>(&self) -> &'static T {
self.container.get::<T>()
}
#[inline]
pub fn try_inject<T: Send + Sync + 'static>(&self) -> Option<&'static T> {
self.container.try_get::<T>()
}
pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, EventError> {
serde::Deserialize::deserialize(&self.message.payload)
.map_err(|e| EventError::DeadLetter(format!("payload decode failed: {e}")))
}
pub fn traceparent(&self) -> String {
self.trace.to_traceparent()
}
}
#[derive(Debug)]
pub enum EventError {
Retry(String),
DeadLetter(String),
}
impl From<String> for EventError {
fn from(s: String) -> Self {
Self::Retry(s)
}
}
impl From<&str> for EventError {
fn from(s: &str) -> Self {
Self::Retry(s.to_owned())
}
}
impl std::fmt::Display for EventError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Retry(m) => write!(f, "retryable: {m}"),
Self::DeadLetter(m) => write!(f, "poison: {m}"),
}
}
}
impl std::error::Error for EventError {}
pub struct EventHandlerDescriptor {
pub topic: &'static str,
pub consumer: &'static str,
pub handler: fn(EventContext) -> BoxFuture<'static, Result<(), EventError>>,
}
inventory::collect!(&'static EventHandlerDescriptor);