Skip to main content

hyperi_rustlib/tiered_sink/
sink.rs

1// Project:   hyperi-rustlib
2// File:      src/tiered_sink/sink.rs
3// Purpose:   Sink trait for async message delivery
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Sink trait for async message delivery.
10
11use std::error::Error as StdError;
12use std::future::Future;
13
14/// A sink that can receive messages asynchronously.
15///
16/// Implement this trait for your message backend (Kafka, S3, HTTP, etc.)
17/// to use with `TieredSink`.
18///
19/// Async methods return `impl Future + Send` to ensure compatibility with
20/// `tokio::spawn`.
21///
22/// # Example
23///
24/// ```rust,ignore
25/// use hyperi_rustlib::tiered_sink::{Sink, SinkError};
26///
27/// struct MyKafkaSink {
28///     producer: KafkaProducer,
29/// }
30///
31/// impl Sink for MyKafkaSink {
32///     type Error = KafkaError;
33///
34///     async fn try_send(&self, data: &[u8]) -> Result<(), SinkError<Self::Error>> {
35///         match self.producer.send(data).await {
36///             Ok(()) => Ok(()),
37///             Err(e) if e.is_queue_full() => Err(SinkError::Full),
38///             Err(e) if e.is_broker_unavailable() => Err(SinkError::Unavailable),
39///             Err(e) => Err(SinkError::Fatal(e)),
40///         }
41///     }
42/// }
43/// ```
44pub trait Sink: Send + Sync + 'static {
45    /// The error type returned by this sink.
46    type Error: StdError + Send + Sync + 'static;
47
48    /// Try to send data to the sink.
49    ///
50    /// This should be non-blocking or have a short timeout.
51    /// Return appropriate `SinkError` variant based on the failure mode:
52    ///
53    /// - `SinkError::Full` - Sink is backpressuring, try again later
54    /// - `SinkError::Unavailable` - Sink is down, circuit break
55    /// - `SinkError::Fatal(e)` - Unrecoverable error, don't spool
56    fn try_send(
57        &self,
58        data: &[u8],
59    ) -> impl Future<Output = Result<(), SinkError<Self::Error>>> + Send;
60
61    /// Check if the sink is healthy.
62    ///
63    /// Used by circuit breaker to probe if sink has recovered.
64    /// Default implementation returns Ok (assumes healthy).
65    fn health_check(&self) -> impl Future<Output = Result<(), Self::Error>> + Send {
66        async { Ok(()) }
67    }
68}
69
70/// Error returned by `Sink::try_send`.
71///
72/// The error variant determines how `TieredSink` handles the failure:
73/// - `Full` and `Unavailable` → spool to disk
74/// - `Fatal` → propagate error to caller, don't spool
75#[derive(Debug)]
76pub enum SinkError<E> {
77    /// Sink is backpressuring (queue full, rate limited).
78    /// Message should be spooled and retried later.
79    Full,
80
81    /// Sink is unavailable (connection failed, broker down).
82    /// Message should be spooled, circuit breaker should open.
83    Unavailable,
84
85    /// Fatal error - do not retry, do not spool.
86    /// Examples: invalid message format, authentication failure.
87    Fatal(E),
88}
89
90impl<E: StdError> std::fmt::Display for SinkError<E> {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        match self {
93            Self::Full => write!(f, "sink is full"),
94            Self::Unavailable => write!(f, "sink is unavailable"),
95            Self::Fatal(e) => write!(f, "fatal sink error: {e}"),
96        }
97    }
98}
99
100impl<E: StdError + 'static> StdError for SinkError<E> {
101    fn source(&self) -> Option<&(dyn StdError + 'static)> {
102        match self {
103            Self::Fatal(e) => Some(e),
104            _ => None,
105        }
106    }
107}
108
109impl<E> SinkError<E> {
110    /// Returns true if this is a retryable error (Full or Unavailable).
111    #[must_use]
112    pub fn is_retryable(&self) -> bool {
113        matches!(self, Self::Full | Self::Unavailable)
114    }
115
116    /// Returns true if this is a fatal (non-retryable) error.
117    #[must_use]
118    pub fn is_fatal(&self) -> bool {
119        matches!(self, Self::Fatal(_))
120    }
121
122    /// Returns true if this error should trigger circuit breaker.
123    #[must_use]
124    pub fn should_circuit_break(&self) -> bool {
125        matches!(self, Self::Unavailable)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use std::sync::Arc;
133    use std::sync::atomic::{AtomicUsize, Ordering};
134
135    #[derive(Debug)]
136    struct TestError(String);
137
138    impl std::fmt::Display for TestError {
139        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140            write!(f, "{}", self.0)
141        }
142    }
143
144    impl StdError for TestError {}
145
146    struct CountingSink {
147        count: Arc<AtomicUsize>,
148        fail_after: Option<usize>,
149    }
150
151    impl Sink for CountingSink {
152        type Error = TestError;
153
154        async fn try_send(&self, _data: &[u8]) -> Result<(), SinkError<Self::Error>> {
155            let n = self.count.fetch_add(1, Ordering::SeqCst);
156            if let Some(fail_after) = self.fail_after
157                && n >= fail_after
158            {
159                return Err(SinkError::Unavailable);
160            }
161            Ok(())
162        }
163    }
164
165    #[tokio::test]
166    async fn test_sink_success() {
167        let count = Arc::new(AtomicUsize::new(0));
168        let sink = CountingSink {
169            count: Arc::clone(&count),
170            fail_after: None,
171        };
172
173        sink.try_send(b"test").await.unwrap();
174        assert_eq!(count.load(Ordering::SeqCst), 1);
175    }
176
177    #[tokio::test]
178    async fn test_sink_unavailable() {
179        let sink = CountingSink {
180            count: Arc::new(AtomicUsize::new(0)),
181            fail_after: Some(0),
182        };
183
184        let result = sink.try_send(b"test").await;
185        assert!(matches!(result, Err(SinkError::Unavailable)));
186    }
187
188    #[test]
189    fn test_sink_error_properties() {
190        let full: SinkError<TestError> = SinkError::Full;
191        assert!(full.is_retryable());
192        assert!(!full.is_fatal());
193        assert!(!full.should_circuit_break());
194
195        let unavailable: SinkError<TestError> = SinkError::Unavailable;
196        assert!(unavailable.is_retryable());
197        assert!(!unavailable.is_fatal());
198        assert!(unavailable.should_circuit_break());
199
200        let fatal: SinkError<TestError> = SinkError::Fatal(TestError("oops".into()));
201        assert!(!fatal.is_retryable());
202        assert!(fatal.is_fatal());
203        assert!(!fatal.should_circuit_break());
204    }
205}