use std::error::Error as StdError;
use std::future::Future;
pub trait Sink: Send + Sync + 'static {
type Error: StdError + Send + Sync + 'static;
fn try_send(
&self,
data: &[u8],
) -> impl Future<Output = Result<(), SinkError<Self::Error>>> + Send;
fn health_check(&self) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
}
#[derive(Debug)]
pub enum SinkError<E> {
Full,
Unavailable,
Fatal(E),
}
impl<E: StdError> std::fmt::Display for SinkError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Full => write!(f, "sink is full"),
Self::Unavailable => write!(f, "sink is unavailable"),
Self::Fatal(e) => write!(f, "fatal sink error: {e}"),
}
}
}
impl<E: StdError + 'static> StdError for SinkError<E> {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
Self::Fatal(e) => Some(e),
_ => None,
}
}
}
impl<E> SinkError<E> {
#[must_use]
pub fn is_retryable(&self) -> bool {
matches!(self, Self::Full | Self::Unavailable)
}
#[must_use]
pub fn is_fatal(&self) -> bool {
matches!(self, Self::Fatal(_))
}
#[must_use]
pub fn should_circuit_break(&self) -> bool {
matches!(self, Self::Unavailable)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
struct TestError(String);
impl std::fmt::Display for TestError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl StdError for TestError {}
struct CountingSink {
count: Arc<AtomicUsize>,
fail_after: Option<usize>,
}
impl Sink for CountingSink {
type Error = TestError;
async fn try_send(&self, _data: &[u8]) -> Result<(), SinkError<Self::Error>> {
let n = self.count.fetch_add(1, Ordering::SeqCst);
if let Some(fail_after) = self.fail_after
&& n >= fail_after
{
return Err(SinkError::Unavailable);
}
Ok(())
}
}
#[tokio::test]
async fn test_sink_success() {
let count = Arc::new(AtomicUsize::new(0));
let sink = CountingSink {
count: Arc::clone(&count),
fail_after: None,
};
sink.try_send(b"test").await.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_sink_unavailable() {
let sink = CountingSink {
count: Arc::new(AtomicUsize::new(0)),
fail_after: Some(0),
};
let result = sink.try_send(b"test").await;
assert!(matches!(result, Err(SinkError::Unavailable)));
}
#[test]
fn test_sink_error_properties() {
let full: SinkError<TestError> = SinkError::Full;
assert!(full.is_retryable());
assert!(!full.is_fatal());
assert!(!full.should_circuit_break());
let unavailable: SinkError<TestError> = SinkError::Unavailable;
assert!(unavailable.is_retryable());
assert!(!unavailable.is_fatal());
assert!(unavailable.should_circuit_break());
let fatal: SinkError<TestError> = SinkError::Fatal(TestError("oops".into()));
assert!(!fatal.is_retryable());
assert!(fatal.is_fatal());
assert!(!fatal.should_circuit_break());
}
}