use crate::Event;
use crate::kind::EventPayload;
use core::marker::PhantomData;
#[derive(Debug, thiserror::Error)]
pub enum SinkError {
#[error("sink unavailable: {0}")]
Unavailable(String),
#[error("sink rejected batch: {0}")]
Rejected(String),
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
}
pub trait EventSink<P>: Send + Sync + 'static
where
P: EventPayload,
{
fn emit(
&self,
events: &[Event<P>],
) -> impl core::future::Future<Output = Result<(), SinkError>> + Send;
}
#[derive(Debug, Default)]
pub struct NoopEventSink<P>(PhantomData<fn() -> P>);
impl<P> NoopEventSink<P> {
pub const fn new() -> Self {
Self(PhantomData)
}
}
impl<P> EventSink<P> for NoopEventSink<P>
where
P: EventPayload,
{
async fn emit(&self, events: &[Event<P>]) -> Result<(), SinkError> {
tracing::trace!(
target: "axess::events::noop_sink",
count = events.len(),
"NoopEventSink: batch discarded",
);
Ok(())
}
}
#[derive(Debug)]
pub struct LogAndSwallow<S>(pub S);
impl<P, S> EventSink<P> for LogAndSwallow<S>
where
P: EventPayload,
S: EventSink<P>,
{
async fn emit(&self, events: &[Event<P>]) -> Result<(), SinkError> {
if let Err(e) = self.0.emit(events).await {
tracing::warn!(error = %e, count = events.len(), "event sink: emit failed");
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kind::KindTag;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Clone, Debug)]
struct DummyPayload;
impl EventPayload for DummyPayload {
fn kind_tag(&self) -> KindTag {
KindTag::new("test.dummy.v1")
}
#[cfg(feature = "serde")]
fn to_inner_json(&self) -> serde_json::Value {
serde_json::json!({})
}
}
struct RecorderSink {
calls: Arc<AtomicUsize>,
}
impl EventSink<DummyPayload> for RecorderSink {
async fn emit(&self, events: &[Event<DummyPayload>]) -> Result<(), SinkError> {
tracing::trace!(
target: "axess::events::recorder_sink",
count = events.len(),
"RecorderSink: call observed",
);
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn log_and_swallow_forwards_to_inner_sink() {
let calls = Arc::new(AtomicUsize::new(0));
let inner = RecorderSink {
calls: calls.clone(),
};
let sink = LogAndSwallow(inner);
let res = sink.emit(&[]).await;
assert!(res.is_ok());
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
}