pub mod fan_out;
pub mod file_jsonl;
pub mod stdout;
pub use fan_out::FanOutSinkAdapter;
pub use file_jsonl::{FileJsonlSink, FileJsonlSinkConfig};
pub use stdout::StdoutSink;
use crate::core::{Error, Event, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum SinkDeliveryGuarantee {
#[default]
AtLeastOnce,
AtLeastOnceIdempotent,
EffectivelyOnce,
}
impl SinkDeliveryGuarantee {
fn strength(self) -> u8 {
match self {
Self::AtLeastOnce => 0,
Self::AtLeastOnceIdempotent => 1,
Self::EffectivelyOnce => 2,
}
}
pub fn weakest(self, other: Self) -> Self {
if self.strength() <= other.strength() {
self
} else {
other
}
}
pub fn as_label(self) -> &'static str {
match self {
Self::AtLeastOnce => "at_least_once",
Self::AtLeastOnceIdempotent => "at_least_once_idempotent",
Self::EffectivelyOnce => "effectively_once",
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct SinkDeliveryMetrics {
pub events_sent: u64,
pub events_errored: u64,
pub events_retried: u64,
pub last_delivered_offset: Option<String>,
}
impl SinkDeliveryMetrics {
pub fn merge(&mut self, other: &Self) {
self.events_sent = self.events_sent.saturating_add(other.events_sent);
self.events_errored = self.events_errored.saturating_add(other.events_errored);
self.events_retried = self.events_retried.saturating_add(other.events_retried);
}
}
pub trait SinkAdapter: Send {
fn send(&mut self, event: &Event) -> impl std::future::Future<Output = Result<()>> + Send;
fn flush(&mut self) -> impl std::future::Future<Output = Result<()>> + Send;
fn close(&mut self) -> impl std::future::Future<Output = Result<()>> + Send;
fn name(&self) -> &str;
fn delivery_guarantee(&self) -> SinkDeliveryGuarantee {
SinkDeliveryGuarantee::AtLeastOnce
}
fn idempotent_delivery_capable(&self) -> bool {
false
}
fn transactional_checkpoint_barrier_capable(&self) -> bool {
false
}
fn queue_depth(&self) -> Option<usize> {
None
}
fn flush_tick_interval(&self) -> Option<std::time::Duration> {
None
}
fn begin_checkpoint_barrier(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
std::future::ready(Ok(()))
}
fn commit_checkpoint_barrier(
&mut self,
) -> impl std::future::Future<Output = Result<()>> + Send {
std::future::ready(Ok(()))
}
fn abort_checkpoint_barrier(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
std::future::ready(Ok(()))
}
fn preflight_check(&mut self) -> impl std::future::Future<Output = Result<()>> + Send {
std::future::ready(Ok(()))
}
fn close_with_timeout(
&mut self,
timeout_ms: u64,
) -> impl std::future::Future<Output = Result<()>> + Send {
async move {
tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), self.close())
.await
.map_err(|_| {
Error::TimeoutError(format!(
"sink '{}' close exceeded timeout ({} ms)",
self.name(),
timeout_ms,
))
})?
}
}
fn exported_events(&self) -> Option<&[Event]> {
None
}
fn delivery_metrics(&self) -> Option<SinkDeliveryMetrics> {
None
}
fn is_closed(&self) -> bool {
false
}
fn boxed(self) -> BoxedSink
where
Self: Sized + 'static,
{
BoxedSink::new(self)
}
}
type BoxFut<'a> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
trait ErasedSink: Send {
fn erased_send<'a>(&'a mut self, event: &'a Event) -> BoxFut<'a>;
fn erased_flush<'a>(&'a mut self) -> BoxFut<'a>;
fn erased_close<'a>(&'a mut self) -> BoxFut<'a>;
fn erased_name(&self) -> &str;
fn erased_delivery_guarantee(&self) -> SinkDeliveryGuarantee;
fn erased_idempotent_delivery_capable(&self) -> bool;
fn erased_transactional_checkpoint_barrier_capable(&self) -> bool;
fn erased_queue_depth(&self) -> Option<usize>;
fn erased_flush_tick_interval(&self) -> Option<std::time::Duration>;
fn erased_delivery_metrics(&self) -> Option<SinkDeliveryMetrics>;
fn erased_is_closed(&self) -> bool;
fn erased_exported_events(&self) -> Option<&[Event]>;
fn erased_begin_checkpoint_barrier<'a>(&'a mut self) -> BoxFut<'a>;
fn erased_commit_checkpoint_barrier<'a>(&'a mut self) -> BoxFut<'a>;
fn erased_abort_checkpoint_barrier<'a>(&'a mut self) -> BoxFut<'a>;
fn erased_preflight_check<'a>(&'a mut self) -> BoxFut<'a>;
}
impl<T: SinkAdapter> ErasedSink for T {
fn erased_send<'a>(&'a mut self, event: &'a Event) -> BoxFut<'a> {
Box::pin(self.send(event))
}
fn erased_flush<'a>(&'a mut self) -> BoxFut<'a> {
Box::pin(self.flush())
}
fn erased_close<'a>(&'a mut self) -> BoxFut<'a> {
Box::pin(self.close())
}
fn erased_name(&self) -> &str {
self.name()
}
fn erased_delivery_guarantee(&self) -> SinkDeliveryGuarantee {
self.delivery_guarantee()
}
fn erased_idempotent_delivery_capable(&self) -> bool {
self.idempotent_delivery_capable()
}
fn erased_transactional_checkpoint_barrier_capable(&self) -> bool {
self.transactional_checkpoint_barrier_capable()
}
fn erased_queue_depth(&self) -> Option<usize> {
self.queue_depth()
}
fn erased_flush_tick_interval(&self) -> Option<std::time::Duration> {
self.flush_tick_interval()
}
fn erased_delivery_metrics(&self) -> Option<SinkDeliveryMetrics> {
self.delivery_metrics()
}
fn erased_is_closed(&self) -> bool {
self.is_closed()
}
fn erased_exported_events(&self) -> Option<&[Event]> {
self.exported_events()
}
fn erased_begin_checkpoint_barrier<'a>(&'a mut self) -> BoxFut<'a> {
Box::pin(self.begin_checkpoint_barrier())
}
fn erased_commit_checkpoint_barrier<'a>(&'a mut self) -> BoxFut<'a> {
Box::pin(self.commit_checkpoint_barrier())
}
fn erased_abort_checkpoint_barrier<'a>(&'a mut self) -> BoxFut<'a> {
Box::pin(self.abort_checkpoint_barrier())
}
fn erased_preflight_check<'a>(&'a mut self) -> BoxFut<'a> {
Box::pin(self.preflight_check())
}
}
pub struct BoxedSink(Box<dyn ErasedSink>);
impl BoxedSink {
pub fn new<S: SinkAdapter + 'static>(sink: S) -> Self {
Self(Box::new(sink))
}
}
impl std::fmt::Debug for BoxedSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BoxedSink")
.field("name", &self.0.erased_name())
.field("closed", &self.0.erased_is_closed())
.finish_non_exhaustive()
}
}
impl SinkAdapter for BoxedSink {
async fn send(&mut self, event: &Event) -> Result<()> {
self.0.erased_send(event).await
}
async fn flush(&mut self) -> Result<()> {
self.0.erased_flush().await
}
async fn close(&mut self) -> Result<()> {
self.0.erased_close().await
}
fn name(&self) -> &str {
self.0.erased_name()
}
fn delivery_guarantee(&self) -> SinkDeliveryGuarantee {
self.0.erased_delivery_guarantee()
}
fn idempotent_delivery_capable(&self) -> bool {
self.0.erased_idempotent_delivery_capable()
}
fn transactional_checkpoint_barrier_capable(&self) -> bool {
self.0.erased_transactional_checkpoint_barrier_capable()
}
fn queue_depth(&self) -> Option<usize> {
self.0.erased_queue_depth()
}
fn flush_tick_interval(&self) -> Option<std::time::Duration> {
self.0.erased_flush_tick_interval()
}
fn delivery_metrics(&self) -> Option<SinkDeliveryMetrics> {
self.0.erased_delivery_metrics()
}
fn is_closed(&self) -> bool {
self.0.erased_is_closed()
}
fn exported_events(&self) -> Option<&[Event]> {
self.0.erased_exported_events()
}
async fn begin_checkpoint_barrier(&mut self) -> Result<()> {
self.0.erased_begin_checkpoint_barrier().await
}
async fn commit_checkpoint_barrier(&mut self) -> Result<()> {
self.0.erased_commit_checkpoint_barrier().await
}
async fn abort_checkpoint_barrier(&mut self) -> Result<()> {
self.0.erased_abort_checkpoint_barrier().await
}
async fn preflight_check(&mut self) -> Result<()> {
self.0.erased_preflight_check().await
}
}
#[derive(Debug, Clone)]
pub struct MemorySinkAdapter {
name: String,
events: Vec<Event>,
closed: bool,
}
impl Default for MemorySinkAdapter {
fn default() -> Self {
Self::new("memory")
}
}
impl MemorySinkAdapter {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
events: Vec::new(),
closed: false,
}
}
pub fn events(&self) -> &[Event] {
&self.events
}
}
impl SinkAdapter for MemorySinkAdapter {
async fn send(&mut self, event: &Event) -> Result<()> {
if self.closed {
return Err(Error::StateError("adapter is closed".into()));
}
self.events.push(event.clone());
Ok(())
}
async fn flush(&mut self) -> Result<()> {
if self.closed {
return Err(Error::StateError("adapter is closed".into()));
}
Ok(())
}
async fn close(&mut self) -> Result<()> {
self.closed = true;
Ok(())
}
fn name(&self) -> &str {
&self.name
}
fn exported_events(&self) -> Option<&[Event]> {
Some(&self.events)
}
fn is_closed(&self) -> bool {
self.closed
}
}
#[derive(Debug, Clone)]
pub struct AdapterGoldenFixture {
pub name: String,
pub events: Vec<Event>,
}
impl AdapterGoldenFixture {
pub fn new(name: impl Into<String>, events: Vec<Event>) -> Self {
Self {
name: name.into(),
events,
}
}
pub fn single_event(event: Event) -> Self {
Self::new("single_event", vec![event])
}
pub fn batch(events: Vec<Event>) -> Self {
Self::new("batch", events)
}
pub fn ordering(events: Vec<Event>) -> Self {
Self::new("ordering", events)
}
pub fn crash_recovery(events: Vec<Event>) -> Self {
Self::new("crash_recovery", events)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TestResult {
pub passed: bool,
pub errors: Vec<String>,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Default)]
pub struct BasicAdapterConformance;
impl BasicAdapterConformance {
fn pass() -> TestResult {
TestResult {
passed: true,
errors: Vec::new(),
duration_ms: 0,
}
}
fn exported_len<S: SinkAdapter>(adapter: &S) -> Option<usize> {
adapter.exported_events().map(|events| events.len())
}
pub async fn single_event<S: SinkAdapter>(
&self,
adapter: &mut S,
fixture: &AdapterGoldenFixture,
) -> Result<TestResult> {
let Some(first) = fixture.events.first() else {
return Err(Error::ConfigError(
"single_event fixture requires at least one event".into(),
));
};
let before_len = Self::exported_len(adapter);
adapter.send(first).await?;
adapter.flush().await?;
if let Some(before) = before_len {
let after_events = adapter.exported_events().ok_or_else(|| {
Error::StateError("adapter exported_events became unavailable mid-test".into())
})?;
let after = after_events.len();
if after != before + 1 {
return Err(Error::StateError(format!(
"single_event conformance expected +1 event, observed delta {}",
after.saturating_sub(before)
)));
}
if after_events.last() != Some(first) {
return Err(Error::StateError(
"single_event conformance expected last emitted event to match fixture".into(),
));
}
}
Ok(Self::pass())
}
pub async fn batch_send<S: SinkAdapter>(
&self,
adapter: &mut S,
fixture: &AdapterGoldenFixture,
) -> Result<TestResult> {
let before_len = Self::exported_len(adapter);
for event in &fixture.events {
adapter.send(event).await?;
}
adapter.flush().await?;
if let Some(before) = before_len {
let after_events = adapter.exported_events().ok_or_else(|| {
Error::StateError("adapter exported_events became unavailable mid-test".into())
})?;
let expected = fixture.events.len();
let after = after_events.len();
let observed = after.saturating_sub(before);
if observed != expected {
return Err(Error::StateError(format!(
"batch_send conformance expected {expected} new events, observed {observed}"
)));
}
if after_events[before..after] != fixture.events[..] {
return Err(Error::StateError(
"batch_send conformance expected emitted tail to match fixture order".into(),
));
}
}
Ok(Self::pass())
}
pub async fn ordering<S: SinkAdapter>(
&self,
adapter: &mut S,
fixture: &AdapterGoldenFixture,
) -> Result<TestResult> {
let before_len = Self::exported_len(adapter);
for event in &fixture.events {
adapter.send(event).await?;
}
adapter.flush().await?;
if let Some(before) = before_len {
let after_events = adapter.exported_events().ok_or_else(|| {
Error::StateError("adapter exported_events became unavailable mid-test".into())
})?;
let after = after_events.len();
if after < before || after - before != fixture.events.len() {
return Err(Error::StateError(
"ordering conformance observed unexpected emitted event count delta".into(),
));
}
if after_events[before..after] != fixture.events[..] {
return Err(Error::StateError(
"ordering conformance expected emitted sequence to preserve fixture order"
.into(),
));
}
}
Ok(Self::pass())
}
pub async fn crash_recovery<S: SinkAdapter>(
&self,
adapter: &mut S,
fixture: &AdapterGoldenFixture,
) -> Result<TestResult> {
let Some(first_event) = fixture.events.first() else {
return Err(Error::ConfigError(
"crash_recovery fixture requires at least one event".into(),
));
};
let before_len = Self::exported_len(adapter);
for event in &fixture.events {
adapter.send(event).await?;
}
adapter.flush().await?;
adapter.close().await?;
if !adapter.is_closed() {
return Err(Error::StateError(
"crash_recovery conformance expected adapter to report closed state".into(),
));
}
if let Some(before) = before_len {
let after_events = adapter.exported_events().ok_or_else(|| {
Error::StateError("adapter exported_events became unavailable mid-test".into())
})?;
let after = after_events.len();
let observed = after.saturating_sub(before);
if observed != fixture.events.len() {
return Err(Error::StateError(format!(
"crash_recovery conformance expected {} new events before close, observed {observed}",
fixture.events.len()
)));
}
}
if adapter.send(first_event).await.is_ok() {
return Err(Error::StateError(
"crash_recovery conformance expected send to fail after adapter close".into(),
));
}
Ok(Self::pass())
}
}
#[derive(Debug, Clone, Default)]
pub struct AdapterConformanceSuite {
harness: BasicAdapterConformance,
}
impl AdapterConformanceSuite {
pub fn new() -> Self {
Self::default()
}
pub async fn run_all<S: SinkAdapter>(
&self,
adapter: &mut S,
fixture: &AdapterGoldenFixture,
) -> Result<Vec<TestResult>> {
let mut results = Vec::with_capacity(4);
results.push(self.harness.single_event(adapter, fixture).await?);
results.push(self.harness.batch_send(adapter, fixture).await?);
results.push(self.harness.ordering(adapter, fixture).await?);
results.push(self.harness.crash_recovery(adapter, fixture).await?);
Ok(results)
}
}