1use std::fs::{File, OpenOptions};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use indexmap::IndexMap;
14use serde_json;
15use tokio::sync::Mutex;
16use tracing::{error, warn};
17pub use varpulis_connectors::sink::{Sink, SinkConnectorAdapter, SinkError};
18
19use crate::event::Event;
20
21#[derive(Debug)]
23pub struct ConsoleSink {
24 name: String,
25 pretty: bool,
26}
27
28impl ConsoleSink {
29 pub fn new(name: impl Into<String>) -> Self {
30 Self {
31 name: name.into(),
32 pretty: true,
33 }
34 }
35
36 pub const fn compact(mut self) -> Self {
37 self.pretty = false;
38 self
39 }
40}
41
42#[async_trait]
43impl Sink for ConsoleSink {
44 fn name(&self) -> &str {
45 &self.name
46 }
47
48 async fn send(&self, event: &Event) -> Result<(), SinkError> {
49 if self.pretty {
50 println!(
51 "[{}] {} | {:?}",
52 event.timestamp.format("%H:%M:%S"),
53 event.event_type,
54 event.data
55 );
56 } else {
57 println!("{}", serde_json::to_string(event)?);
58 }
59 Ok(())
60 }
61
62 async fn flush(&self) -> Result<(), SinkError> {
63 Ok(())
64 }
65
66 async fn close(&self) -> Result<(), SinkError> {
67 Ok(())
68 }
69}
70
71#[derive(Debug)]
73pub struct FileSink {
74 name: String,
75 path: PathBuf,
76 file: Arc<Mutex<File>>,
77}
78
79impl FileSink {
80 pub const fn path(&self) -> &PathBuf {
82 &self.path
83 }
84
85 pub fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
86 let path = path.into();
87 let file = OpenOptions::new().create(true).append(true).open(&path)?;
88
89 Ok(Self {
90 name: name.into(),
91 path,
92 file: Arc::new(Mutex::new(file)),
93 })
94 }
95}
96
97#[async_trait]
98impl Sink for FileSink {
99 fn name(&self) -> &str {
100 &self.name
101 }
102
103 async fn send(&self, event: &Event) -> Result<(), SinkError> {
104 let buf = event.to_sink_payload();
105 let mut file = self.file.lock().await;
106 file.write_all(&buf)?;
107 file.write_all(b"\n")?;
108 Ok(())
109 }
110
111 async fn flush(&self) -> Result<(), SinkError> {
112 let mut file = self.file.lock().await;
113 file.flush()?;
114 Ok(())
115 }
116
117 async fn close(&self) -> Result<(), SinkError> {
118 self.flush().await
119 }
120}
121
122#[derive(Debug)]
134pub struct AsyncFileSink {
135 name: String,
136 path: PathBuf,
137 file: Arc<Mutex<tokio::fs::File>>,
138 buffer: Arc<Mutex<Vec<u8>>>,
139 buffer_size: usize,
140}
141
142impl AsyncFileSink {
143 pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
145
146 pub const fn path(&self) -> &PathBuf {
148 &self.path
149 }
150
151 pub async fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
153 Self::with_buffer_size(name, path, Self::DEFAULT_BUFFER_SIZE).await
154 }
155
156 pub async fn with_buffer_size(
158 name: impl Into<String>,
159 path: impl Into<PathBuf>,
160 buffer_size: usize,
161 ) -> Result<Self, SinkError> {
162 use tokio::fs::OpenOptions;
163
164 let path = path.into();
165 let file = OpenOptions::new()
166 .create(true)
167 .append(true)
168 .open(&path)
169 .await?;
170
171 Ok(Self {
172 name: name.into(),
173 path,
174 file: Arc::new(Mutex::new(file)),
175 buffer: Arc::new(Mutex::new(Vec::with_capacity(buffer_size))),
176 buffer_size,
177 })
178 }
179}
180
181#[async_trait]
182impl Sink for AsyncFileSink {
183 fn name(&self) -> &str {
184 &self.name
185 }
186
187 async fn send(&self, event: &Event) -> Result<(), SinkError> {
188 let buf = event.to_sink_payload();
189
190 let should_flush = {
191 let mut buffer = self.buffer.lock().await;
192 buffer.extend_from_slice(&buf);
193 buffer.push(b'\n');
194 buffer.len() >= self.buffer_size
195 };
196
197 if should_flush {
198 self.flush().await?;
199 }
200
201 Ok(())
202 }
203
204 async fn flush(&self) -> Result<(), SinkError> {
205 use tokio::io::AsyncWriteExt;
206
207 let data = {
208 let mut buffer = self.buffer.lock().await;
209 std::mem::take(&mut *buffer)
210 };
211
212 if !data.is_empty() {
213 let mut file = self.file.lock().await;
214 file.write_all(&data).await?;
215 file.flush().await?;
216 }
217
218 Ok(())
219 }
220
221 async fn close(&self) -> Result<(), SinkError> {
222 self.flush().await
223 }
224}
225
226#[derive(Debug)]
228pub struct HttpSink {
229 name: String,
230 url: String,
231 client: reqwest::Client,
232 headers: IndexMap<String, String>,
233}
234
235impl HttpSink {
236 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
237 Self {
238 name: name.into(),
239 url: url.into(),
240 client: reqwest::Client::new(),
241 headers: IndexMap::new(),
242 }
243 }
244
245 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
246 self.headers.insert(key.into(), value.into());
247 self
248 }
249}
250
251#[async_trait]
252impl Sink for HttpSink {
253 fn name(&self) -> &str {
254 &self.name
255 }
256
257 async fn send(&self, event: &Event) -> Result<(), SinkError> {
258 let mut req = self.client.post(&self.url);
259 for (k, v) in &self.headers {
260 req = req.header(k.as_str(), v.as_str());
261 }
262 req = req.header("Content-Type", "application/json");
263 req = req.body(event.to_sink_payload());
264
265 match req.send().await {
266 Ok(resp) => {
267 if !resp.status().is_success() {
268 error!("HTTP sink {} got status {}", self.name, resp.status());
269 }
270 }
271 Err(e) => {
272 error!("HTTP sink {} error: {}", self.name, e);
273 }
274 }
275 Ok(())
276 }
277
278 async fn flush(&self) -> Result<(), SinkError> {
279 Ok(())
280 }
281
282 async fn close(&self) -> Result<(), SinkError> {
283 Ok(())
284 }
285}
286
287#[derive(Debug, Clone)]
289pub struct HttpRetryConfig {
290 pub max_retries: usize,
292 pub initial_delay: Duration,
294 pub max_delay: Duration,
296 pub timeout: Duration,
298}
299
300impl Default for HttpRetryConfig {
301 fn default() -> Self {
302 Self {
303 max_retries: 3,
304 initial_delay: Duration::from_millis(100),
305 max_delay: Duration::from_secs(5),
306 timeout: Duration::from_secs(30),
307 }
308 }
309}
310
311#[derive(Debug)]
328pub struct HttpSinkWithRetry {
329 name: String,
330 url: String,
331 client: reqwest::Client,
332 headers: IndexMap<String, String>,
333 retry_config: HttpRetryConfig,
334}
335
336impl HttpSinkWithRetry {
337 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
338 let config = HttpRetryConfig::default();
339 let client = reqwest::Client::builder()
340 .timeout(config.timeout)
341 .build()
342 .unwrap_or_else(|_| reqwest::Client::new());
343
344 Self {
345 name: name.into(),
346 url: url.into(),
347 client,
348 headers: IndexMap::new(),
349 retry_config: config,
350 }
351 }
352
353 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
354 self.headers.insert(key.into(), value.into());
355 self
356 }
357
358 pub fn with_retry_config(mut self, config: HttpRetryConfig) -> Self {
359 self.retry_config = config;
360 self.client = reqwest::Client::builder()
362 .timeout(self.retry_config.timeout)
363 .build()
364 .unwrap_or_else(|_| reqwest::Client::new());
365 self
366 }
367
368 async fn send_with_retry(&self, body: Vec<u8>) -> Result<(), SinkError> {
370 let mut attempt = 0;
371 let mut delay = self.retry_config.initial_delay;
372
373 loop {
374 let mut req = self.client.post(&self.url);
375 for (k, v) in &self.headers {
376 req = req.header(k.as_str(), v.as_str());
377 }
378 req = req.header("Content-Type", "application/json");
379 req = req.body(body.clone());
380
381 match req.send().await {
382 Ok(resp) => {
383 if resp.status().is_success() {
384 return Ok(());
385 } else if resp.status().is_server_error() {
386 if attempt >= self.retry_config.max_retries {
388 return Err(SinkError::other(format!(
389 "HTTP sink {} failed with status {} after {} retries",
390 self.name,
391 resp.status(),
392 attempt
393 )));
394 }
395 warn!(
396 "HTTP sink {} got {}, retrying ({}/{})",
397 self.name,
398 resp.status(),
399 attempt + 1,
400 self.retry_config.max_retries
401 );
402 } else {
403 return Err(SinkError::other(format!(
405 "HTTP sink {} got client error status {}",
406 self.name,
407 resp.status()
408 )));
409 }
410 }
411 Err(e) => {
412 if e.is_timeout() || e.is_connect() || e.is_request() {
414 if attempt >= self.retry_config.max_retries {
415 return Err(SinkError::other(format!(
416 "HTTP sink {} failed with error {} after {} retries",
417 self.name, e, attempt
418 )));
419 }
420 warn!(
421 "HTTP sink {} error: {}, retrying ({}/{})",
422 self.name,
423 e,
424 attempt + 1,
425 self.retry_config.max_retries
426 );
427 } else {
428 return Err(e.into());
430 }
431 }
432 }
433
434 attempt += 1;
436 tokio::time::sleep(delay).await;
437 delay = (delay * 2).min(self.retry_config.max_delay);
438 }
439 }
440}
441
442#[async_trait]
443impl Sink for HttpSinkWithRetry {
444 fn name(&self) -> &str {
445 &self.name
446 }
447
448 async fn send(&self, event: &Event) -> Result<(), SinkError> {
449 self.send_with_retry(event.to_sink_payload()).await
450 }
451
452 async fn flush(&self) -> Result<(), SinkError> {
453 Ok(())
454 }
455
456 async fn close(&self) -> Result<(), SinkError> {
457 Ok(())
458 }
459}
460
461pub struct MultiSink {
463 name: String,
464 sinks: Vec<Box<dyn Sink>>,
465}
466
467impl std::fmt::Debug for MultiSink {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 f.debug_struct("MultiSink")
470 .field("name", &self.name)
471 .field(
472 "sinks",
473 &self.sinks.iter().map(|s| s.name()).collect::<Vec<_>>(),
474 )
475 .finish()
476 }
477}
478
479impl MultiSink {
480 pub fn new(name: impl Into<String>) -> Self {
481 Self {
482 name: name.into(),
483 sinks: Vec::new(),
484 }
485 }
486
487 pub fn with_sink(mut self, sink: Box<dyn Sink>) -> Self {
488 self.sinks.push(sink);
489 self
490 }
491}
492
493#[async_trait]
494impl Sink for MultiSink {
495 fn name(&self) -> &str {
496 &self.name
497 }
498
499 async fn send(&self, event: &Event) -> Result<(), SinkError> {
500 for sink in &self.sinks {
501 if let Err(e) = sink.send(event).await {
502 error!("Sink {} error: {}", sink.name(), e);
503 }
504 }
505 Ok(())
506 }
507
508 async fn flush(&self) -> Result<(), SinkError> {
509 for sink in &self.sinks {
510 sink.flush().await?;
511 }
512 Ok(())
513 }
514
515 async fn close(&self) -> Result<(), SinkError> {
516 for sink in &self.sinks {
517 sink.close().await?;
518 }
519 Ok(())
520 }
521}
522
523pub struct ResilientSink {
532 inner: Arc<dyn Sink>,
533 cb: Arc<crate::circuit_breaker::CircuitBreaker>,
534 dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
535 metrics: Option<crate::metrics::Metrics>,
536}
537
538impl std::fmt::Debug for ResilientSink {
539 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540 f.debug_struct("ResilientSink")
541 .field("inner", &self.inner.name())
542 .field("cb", &self.cb)
543 .field("has_dlq", &self.dlq.is_some())
544 .field("has_metrics", &self.metrics.is_some())
545 .finish_non_exhaustive()
546 }
547}
548
549impl ResilientSink {
550 pub fn new(
552 inner: Arc<dyn Sink>,
553 cb: Arc<crate::circuit_breaker::CircuitBreaker>,
554 dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
555 metrics: Option<crate::metrics::Metrics>,
556 ) -> Self {
557 Self {
558 inner,
559 cb,
560 dlq,
561 metrics,
562 }
563 }
564
565 fn send_to_dlq(&self, error_msg: &str, events: &[Arc<Event>]) {
566 if let Some(ref dlq) = self.dlq {
567 dlq.write_batch(self.inner.name(), error_msg, events);
568 if let Some(ref metrics) = self.metrics {
569 metrics.dlq_events_total.inc_by(events.len() as f64);
570 }
571 }
572 }
573}
574
575#[async_trait]
576impl Sink for ResilientSink {
577 fn name(&self) -> &str {
578 self.inner.name()
579 }
580
581 async fn connect(&self) -> Result<(), SinkError> {
582 self.inner.connect().await
583 }
584
585 async fn send(&self, event: &Event) -> Result<(), SinkError> {
586 if !self.cb.allow_request() {
587 let arc_event = Arc::new(event.clone());
588 self.send_to_dlq("circuit breaker open", &[arc_event]);
589 return Err(SinkError::other(format!(
590 "circuit breaker open for sink '{}'",
591 self.name()
592 )));
593 }
594
595 match self.inner.send(event).await {
596 Ok(()) => {
597 self.cb.record_success();
598 Ok(())
599 }
600 Err(e) => {
601 self.cb.record_failure();
602 let error_msg = e.to_string();
603 let arc_event = Arc::new(event.clone());
604 self.send_to_dlq(&error_msg, &[arc_event]);
605 Err(e)
606 }
607 }
608 }
609
610 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
611 if !self.cb.allow_request() {
612 self.send_to_dlq("circuit breaker open", events);
613 return Err(SinkError::other(format!(
614 "circuit breaker open for sink '{}'",
615 self.name()
616 )));
617 }
618
619 match self.inner.send_batch(events).await {
620 Ok(()) => {
621 self.cb.record_success();
622 Ok(())
623 }
624 Err(e) => {
625 self.cb.record_failure();
626 self.send_to_dlq(&e.to_string(), events);
627 Err(e)
628 }
629 }
630 }
631
632 async fn flush(&self) -> Result<(), SinkError> {
633 self.inner.flush().await
634 }
635
636 async fn close(&self) -> Result<(), SinkError> {
637 self.inner.close().await
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use tempfile::NamedTempFile;
644
645 use super::*;
646
647 #[tokio::test]
652 async fn test_console_sink() {
653 let sink = ConsoleSink::new("test");
654 let event = Event::new("TestEvent").with_field("value", 42i64);
655 assert!(sink.send(&event).await.is_ok());
656 }
657
658 #[tokio::test]
659 async fn test_console_sink_name() {
660 let sink = ConsoleSink::new("my_console");
661 assert_eq!(sink.name(), "my_console");
662 }
663
664 #[tokio::test]
665 async fn test_console_sink_compact() {
666 let sink = ConsoleSink::new("test").compact();
667 assert!(!sink.pretty);
668 let event = Event::new("TestEvent").with_field("value", 42i64);
669 assert!(sink.send(&event).await.is_ok());
670 }
671
672 #[tokio::test]
673 async fn test_console_sink_flush_close() {
674 let sink = ConsoleSink::new("test");
675 assert!(sink.flush().await.is_ok());
676 assert!(sink.close().await.is_ok());
677 }
678
679 #[tokio::test]
684 async fn test_file_sink() {
685 let temp_file = NamedTempFile::new().unwrap();
686 let sink = FileSink::new("test_file", temp_file.path()).unwrap();
687
688 let event = Event::new("TestEvent").with_field("value", 42i64);
689 assert!(sink.send(&event).await.is_ok());
690
691 assert!(sink.flush().await.is_ok());
692 assert!(sink.close().await.is_ok());
693
694 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
696 assert!(contents.contains("\"value\":42"));
697 }
698
699 #[tokio::test]
700 async fn test_file_sink_name() {
701 let temp_file = NamedTempFile::new().unwrap();
702 let sink = FileSink::new("my_file", temp_file.path()).unwrap();
703 assert_eq!(sink.name(), "my_file");
704 }
705
706 #[test]
711 fn test_http_sink_new() {
712 let sink = HttpSink::new("http_test", "http://localhost:8080/webhook");
713 assert_eq!(sink.name(), "http_test");
714 assert_eq!(sink.url, "http://localhost:8080/webhook");
715 }
716
717 #[test]
718 fn test_http_sink_with_header() {
719 let sink = HttpSink::new("http_test", "http://localhost:8080")
720 .with_header("Authorization", "Bearer token123")
721 .with_header("X-Custom", "value");
722
723 assert_eq!(sink.headers.len(), 2);
724 assert_eq!(
725 sink.headers.get("Authorization"),
726 Some(&"Bearer token123".to_string())
727 );
728 }
729
730 #[tokio::test]
731 async fn test_http_sink_flush_close() {
732 let sink = HttpSink::new("http_test", "http://localhost:8080");
733 assert!(sink.flush().await.is_ok());
734 assert!(sink.close().await.is_ok());
735 }
736
737 #[tokio::test]
742 async fn test_multi_sink_empty() {
743 let sink = MultiSink::new("multi");
744 assert_eq!(sink.name(), "multi");
745
746 let event = Event::new("Test");
747 assert!(sink.send(&event).await.is_ok());
748 assert!(sink.flush().await.is_ok());
749 assert!(sink.close().await.is_ok());
750 }
751
752 #[tokio::test]
753 async fn test_multi_sink_with_console() {
754 let multi = MultiSink::new("multi")
755 .with_sink(Box::new(ConsoleSink::new("console1")))
756 .with_sink(Box::new(ConsoleSink::new("console2")));
757
758 let event = Event::new("Test").with_field("x", 1i64);
759 assert!(multi.send(&event).await.is_ok());
760
761 assert!(multi.flush().await.is_ok());
762 assert!(multi.close().await.is_ok());
763 }
764
765 #[tokio::test]
766 async fn test_multi_sink_with_file() {
767 let temp_file = NamedTempFile::new().unwrap();
768 let file_sink = FileSink::new("file", temp_file.path()).unwrap();
769
770 let multi = MultiSink::new("multi").with_sink(Box::new(file_sink));
771
772 let event = Event::new("MultiEvent").with_field("val", 100i64);
773 assert!(multi.send(&event).await.is_ok());
774 assert!(multi.flush().await.is_ok());
775
776 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
777 assert!(contents.contains("\"val\":100"));
778 }
779
780 #[tokio::test]
785 async fn test_file_sink_path() {
786 let temp_file = NamedTempFile::new().unwrap();
787 let expected_path = temp_file.path().to_path_buf();
788 let sink = FileSink::new("test", temp_file.path()).unwrap();
789
790 assert_eq!(sink.path(), &expected_path);
791 }
792
793 #[tokio::test]
794 async fn test_file_sink_multiple_events() {
795 let temp_file = NamedTempFile::new().unwrap();
796 let sink = FileSink::new("test", temp_file.path()).unwrap();
797
798 for i in 0..5 {
800 let event = Event::new("Event").with_field("id", i as i64);
801 sink.send(&event).await.unwrap();
802 }
803 sink.flush().await.unwrap();
804
805 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
807 let lines: Vec<&str> = contents.lines().collect();
808 assert_eq!(lines.len(), 5);
809 }
810
811 #[tokio::test]
816 async fn test_multi_sink_three_sinks() {
817 let temp1 = NamedTempFile::new().unwrap();
818 let temp2 = NamedTempFile::new().unwrap();
819
820 let multi = MultiSink::new("triple")
821 .with_sink(Box::new(ConsoleSink::new("console")))
822 .with_sink(Box::new(FileSink::new("file1", temp1.path()).unwrap()))
823 .with_sink(Box::new(FileSink::new("file2", temp2.path()).unwrap()));
824
825 let event = Event::new("TripleEvent");
826 multi.send(&event).await.unwrap();
827 multi.flush().await.unwrap();
828 multi.close().await.unwrap();
829
830 let contents1 = std::fs::read_to_string(temp1.path()).unwrap();
832 let contents2 = std::fs::read_to_string(temp2.path()).unwrap();
833 assert!(contents1.contains("timestamp"));
834 assert!(contents2.contains("timestamp"));
835 }
836
837 #[test]
842 fn test_file_sink_invalid_path() {
843 let result = FileSink::new("test", "/nonexistent/path/file.json");
845 assert!(result.is_err());
846 }
847
848 #[tokio::test]
853 async fn test_async_file_sink_basic() {
854 let temp_file = NamedTempFile::new().unwrap();
855 let sink = AsyncFileSink::new("test_async", temp_file.path())
856 .await
857 .unwrap();
858
859 let event = Event::new("AsyncTestEvent").with_field("value", 123i64);
860 assert!(sink.send(&event).await.is_ok());
861 assert!(sink.flush().await.is_ok());
862 assert!(sink.close().await.is_ok());
863
864 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
866 assert!(contents.contains("\"value\":123"));
867 assert!(contents.contains("123"));
868 }
869
870 #[tokio::test]
871 async fn test_async_file_sink_name() {
872 let temp_file = NamedTempFile::new().unwrap();
873 let sink = AsyncFileSink::new("my_async_file", temp_file.path())
874 .await
875 .unwrap();
876 assert_eq!(sink.name(), "my_async_file");
877 }
878
879 #[tokio::test]
880 async fn test_async_file_sink_path() {
881 let temp_file = NamedTempFile::new().unwrap();
882 let expected_path = temp_file.path().to_path_buf();
883 let sink = AsyncFileSink::new("test", temp_file.path()).await.unwrap();
884 assert_eq!(sink.path(), &expected_path);
885 }
886
887 #[tokio::test]
888 async fn test_async_file_sink_multiple_events() {
889 let temp_file = NamedTempFile::new().unwrap();
890 let sink = AsyncFileSink::new("test_async", temp_file.path())
891 .await
892 .unwrap();
893
894 for i in 0..5 {
896 let event = Event::new("AsyncEvent").with_field("id", i as i64);
897 sink.send(&event).await.unwrap();
898 }
899 sink.flush().await.unwrap();
900
901 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
903 let lines: Vec<&str> = contents.lines().collect();
904 assert_eq!(lines.len(), 5);
905 }
906
907 #[tokio::test]
908 async fn test_async_file_sink_custom_buffer_size() {
909 let temp_file = NamedTempFile::new().unwrap();
910 let sink = AsyncFileSink::with_buffer_size("test", temp_file.path(), 50)
912 .await
913 .unwrap();
914
915 for i in 0..10 {
917 let event = Event::new("BufferTest").with_field("id", i as i64);
918 sink.send(&event).await.unwrap();
919 }
920 sink.flush().await.unwrap();
922
923 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
924 let lines: Vec<&str> = contents.lines().collect();
925 assert_eq!(lines.len(), 10);
926 }
927
928 #[tokio::test]
929 async fn test_async_file_sink_invalid_path() {
930 let result = AsyncFileSink::new("test", "/nonexistent/path/file.json").await;
931 assert!(result.is_err());
932 }
933
934 #[tokio::test]
935 async fn test_async_file_sink_in_multi_sink() {
936 let temp = NamedTempFile::new().unwrap();
937 let async_sink = AsyncFileSink::new("async", temp.path()).await.unwrap();
938
939 let multi = MultiSink::new("multi_with_async").with_sink(Box::new(async_sink));
940
941 let event = Event::new("MultiAsyncEvent");
942 multi.send(&event).await.unwrap();
943 multi.flush().await.unwrap();
944
945 let contents = std::fs::read_to_string(temp.path()).unwrap();
946 assert!(contents.contains("timestamp"));
947 }
948
949 #[test]
954 fn test_http_retry_config_default() {
955 let config = HttpRetryConfig::default();
956 assert_eq!(config.max_retries, 3);
957 assert_eq!(config.initial_delay, std::time::Duration::from_millis(100));
958 assert_eq!(config.max_delay, std::time::Duration::from_secs(5));
959 assert_eq!(config.timeout, std::time::Duration::from_secs(30));
960 }
961
962 #[test]
963 fn test_http_sink_with_retry_creation() {
964 let sink = HttpSinkWithRetry::new("retry_test", "http://localhost:8080/webhook");
965 assert_eq!(sink.name(), "retry_test");
966 assert_eq!(sink.url, "http://localhost:8080/webhook");
967 }
968
969 #[test]
970 fn test_http_sink_with_retry_headers() {
971 let sink = HttpSinkWithRetry::new("test", "http://localhost:8080")
972 .with_header("Authorization", "Bearer token123")
973 .with_header("X-Custom", "value");
974
975 assert_eq!(sink.headers.len(), 2);
976 assert_eq!(
977 sink.headers.get("Authorization"),
978 Some(&"Bearer token123".to_string())
979 );
980 }
981
982 #[test]
983 fn test_http_sink_with_retry_custom_config() {
984 let config = HttpRetryConfig {
985 max_retries: 5,
986 initial_delay: std::time::Duration::from_millis(200),
987 max_delay: std::time::Duration::from_secs(10),
988 timeout: std::time::Duration::from_secs(60),
989 };
990 let sink =
991 HttpSinkWithRetry::new("test", "http://localhost:8080").with_retry_config(config);
992
993 assert_eq!(sink.retry_config.max_retries, 5);
994 assert_eq!(
995 sink.retry_config.initial_delay,
996 std::time::Duration::from_millis(200)
997 );
998 }
999
1000 #[tokio::test]
1001 async fn test_http_sink_with_retry_flush_close() {
1002 let sink = HttpSinkWithRetry::new("test", "http://localhost:8080");
1003 assert!(sink.flush().await.is_ok());
1004 assert!(sink.close().await.is_ok());
1005 }
1006
1007 struct MockSink {
1013 name: String,
1014 fail: std::sync::atomic::AtomicBool,
1015 send_count: std::sync::atomic::AtomicU64,
1016 }
1017
1018 impl MockSink {
1019 fn new(name: &str) -> Self {
1020 Self {
1021 name: name.to_string(),
1022 fail: std::sync::atomic::AtomicBool::new(false),
1023 send_count: std::sync::atomic::AtomicU64::new(0),
1024 }
1025 }
1026
1027 fn set_fail(&self, fail: bool) {
1028 self.fail.store(fail, std::sync::atomic::Ordering::Relaxed);
1029 }
1030
1031 fn send_count(&self) -> u64 {
1032 self.send_count.load(std::sync::atomic::Ordering::Relaxed)
1033 }
1034 }
1035
1036 #[async_trait]
1037 impl Sink for MockSink {
1038 fn name(&self) -> &str {
1039 &self.name
1040 }
1041
1042 async fn send(&self, _event: &Event) -> Result<(), SinkError> {
1043 if self.fail.load(std::sync::atomic::Ordering::Relaxed) {
1044 Err(SinkError::other("mock send failure"))
1045 } else {
1046 self.send_count
1047 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1048 Ok(())
1049 }
1050 }
1051
1052 async fn flush(&self) -> Result<(), SinkError> {
1053 Ok(())
1054 }
1055 async fn close(&self) -> Result<(), SinkError> {
1056 Ok(())
1057 }
1058 }
1059
1060 #[tokio::test]
1061 async fn test_resilient_sink_success_passthrough() {
1062 let mock = Arc::new(MockSink::new("test-sink"));
1063 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1064 crate::circuit_breaker::CircuitBreakerConfig {
1065 failure_threshold: 3,
1066 reset_timeout: std::time::Duration::from_secs(60),
1067 },
1068 ));
1069 let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1070
1071 let event = Event::new("TestEvent");
1072 assert!(resilient.send(&event).await.is_ok());
1073 assert_eq!(mock.send_count(), 1);
1074 assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1075 }
1076
1077 #[tokio::test]
1078 async fn test_resilient_sink_failure_opens_circuit() {
1079 let mock = Arc::new(MockSink::new("test-sink"));
1080 mock.set_fail(true);
1081
1082 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1083 crate::circuit_breaker::CircuitBreakerConfig {
1084 failure_threshold: 2,
1085 reset_timeout: std::time::Duration::from_secs(60),
1086 },
1087 ));
1088
1089 let dlq_path = std::env::temp_dir().join("varpulis_resilient_test_dlq.jsonl");
1090 let _ = std::fs::remove_file(&dlq_path);
1091 let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1092
1093 let resilient = ResilientSink::new(mock.clone(), cb.clone(), Some(dlq.clone()), None);
1094
1095 let event = Event::new("TestEvent");
1096
1097 assert!(resilient.send(&event).await.is_err());
1099 assert_eq!(dlq.count(), 1);
1100
1101 assert!(resilient.send(&event).await.is_err());
1103 assert_eq!(dlq.count(), 2);
1104 assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1105
1106 assert!(resilient.send(&event).await.is_err());
1108 assert_eq!(dlq.count(), 3);
1109
1110 let _ = std::fs::remove_file(&dlq_path);
1111 }
1112
1113 #[tokio::test]
1114 async fn test_resilient_sink_batch_with_dlq() {
1115 let mock = Arc::new(MockSink::new("batch-sink"));
1116 mock.set_fail(true);
1117
1118 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1119 crate::circuit_breaker::CircuitBreakerConfig::default(),
1120 ));
1121
1122 let dlq_path = std::env::temp_dir().join("varpulis_resilient_batch_dlq.jsonl");
1123 let _ = std::fs::remove_file(&dlq_path);
1124 let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1125
1126 let resilient = ResilientSink::new(mock, cb, Some(dlq.clone()), None);
1127
1128 let events: Vec<Arc<Event>> = (0..3)
1129 .map(|i| Arc::new(Event::new(format!("Event{i}"))))
1130 .collect();
1131
1132 assert!(resilient.send_batch(&events).await.is_err());
1133 assert_eq!(dlq.count(), 3); let _ = std::fs::remove_file(&dlq_path);
1136 }
1137
1138 #[tokio::test]
1139 async fn test_resilient_sink_recovery() {
1140 let mock = Arc::new(MockSink::new("recover-sink"));
1141 mock.set_fail(true);
1142
1143 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1144 crate::circuit_breaker::CircuitBreakerConfig {
1145 failure_threshold: 1,
1146 reset_timeout: std::time::Duration::from_millis(10),
1147 },
1148 ));
1149
1150 let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1151
1152 let event = Event::new("TestEvent");
1153
1154 assert!(resilient.send(&event).await.is_err());
1156 assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1157
1158 tokio::time::sleep(std::time::Duration::from_millis(15)).await;
1160
1161 mock.set_fail(false);
1163
1164 assert!(resilient.send(&event).await.is_ok());
1166 assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1167 assert_eq!(mock.send_count(), 1);
1168 }
1169
1170 #[tokio::test]
1171 async fn test_resilient_sink_name_passthrough() {
1172 let mock = Arc::new(MockSink::new("my-kafka-sink"));
1173 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1174 crate::circuit_breaker::CircuitBreakerConfig::default(),
1175 ));
1176 let resilient = ResilientSink::new(mock, cb, None, None);
1177 assert_eq!(resilient.name(), "my-kafka-sink");
1178 }
1179}