hyperi_rustlib/tiered_sink/
sink.rs1use std::error::Error as StdError;
12use std::future::Future;
13
14pub trait Sink: Send + Sync + 'static {
45 type Error: StdError + Send + Sync + 'static;
47
48 fn try_send(
57 &self,
58 data: &[u8],
59 ) -> impl Future<Output = Result<(), SinkError<Self::Error>>> + Send;
60
61 fn health_check(&self) -> impl Future<Output = Result<(), Self::Error>> + Send {
66 async { Ok(()) }
67 }
68}
69
70#[derive(Debug)]
76pub enum SinkError<E> {
77 Full,
80
81 Unavailable,
84
85 Fatal(E),
88}
89
90impl<E: StdError> std::fmt::Display for SinkError<E> {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 Self::Full => write!(f, "sink is full"),
94 Self::Unavailable => write!(f, "sink is unavailable"),
95 Self::Fatal(e) => write!(f, "fatal sink error: {e}"),
96 }
97 }
98}
99
100impl<E: StdError + 'static> StdError for SinkError<E> {
101 fn source(&self) -> Option<&(dyn StdError + 'static)> {
102 match self {
103 Self::Fatal(e) => Some(e),
104 _ => None,
105 }
106 }
107}
108
109impl<E> SinkError<E> {
110 #[must_use]
112 pub fn is_retryable(&self) -> bool {
113 matches!(self, Self::Full | Self::Unavailable)
114 }
115
116 #[must_use]
118 pub fn is_fatal(&self) -> bool {
119 matches!(self, Self::Fatal(_))
120 }
121
122 #[must_use]
124 pub fn should_circuit_break(&self) -> bool {
125 matches!(self, Self::Unavailable)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use std::sync::Arc;
133 use std::sync::atomic::{AtomicUsize, Ordering};
134
135 #[derive(Debug)]
136 struct TestError(String);
137
138 impl std::fmt::Display for TestError {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 write!(f, "{}", self.0)
141 }
142 }
143
144 impl StdError for TestError {}
145
146 struct CountingSink {
147 count: Arc<AtomicUsize>,
148 fail_after: Option<usize>,
149 }
150
151 impl Sink for CountingSink {
152 type Error = TestError;
153
154 async fn try_send(&self, _data: &[u8]) -> Result<(), SinkError<Self::Error>> {
155 let n = self.count.fetch_add(1, Ordering::SeqCst);
156 if let Some(fail_after) = self.fail_after
157 && n >= fail_after
158 {
159 return Err(SinkError::Unavailable);
160 }
161 Ok(())
162 }
163 }
164
165 #[tokio::test]
166 async fn test_sink_success() {
167 let count = Arc::new(AtomicUsize::new(0));
168 let sink = CountingSink {
169 count: Arc::clone(&count),
170 fail_after: None,
171 };
172
173 sink.try_send(b"test").await.unwrap();
174 assert_eq!(count.load(Ordering::SeqCst), 1);
175 }
176
177 #[tokio::test]
178 async fn test_sink_unavailable() {
179 let sink = CountingSink {
180 count: Arc::new(AtomicUsize::new(0)),
181 fail_after: Some(0),
182 };
183
184 let result = sink.try_send(b"test").await;
185 assert!(matches!(result, Err(SinkError::Unavailable)));
186 }
187
188 #[test]
189 fn test_sink_error_properties() {
190 let full: SinkError<TestError> = SinkError::Full;
191 assert!(full.is_retryable());
192 assert!(!full.is_fatal());
193 assert!(!full.should_circuit_break());
194
195 let unavailable: SinkError<TestError> = SinkError::Unavailable;
196 assert!(unavailable.is_retryable());
197 assert!(!unavailable.is_fatal());
198 assert!(unavailable.should_circuit_break());
199
200 let fatal: SinkError<TestError> = SinkError::Fatal(TestError("oops".into()));
201 assert!(!fatal.is_retryable());
202 assert!(fatal.is_fatal());
203 assert!(!fatal.should_circuit_break());
204 }
205}