1use std::fs::{File, OpenOptions};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use indexmap::IndexMap;
14use serde_json;
15use tokio::sync::Mutex;
16use tracing::{error, warn};
17pub use varpulis_connectors::sink::{Sink, SinkConnectorAdapter, SinkError};
18
19use crate::event::Event;
20
21#[derive(Debug)]
23pub struct ConsoleSink {
24 name: String,
25 pretty: bool,
26}
27
28impl ConsoleSink {
29 pub fn new(name: impl Into<String>) -> Self {
30 Self {
31 name: name.into(),
32 pretty: true,
33 }
34 }
35
36 pub const fn compact(mut self) -> Self {
37 self.pretty = false;
38 self
39 }
40}
41
42#[async_trait]
43impl Sink for ConsoleSink {
44 fn name(&self) -> &str {
45 &self.name
46 }
47
48 async fn send(&self, event: &Event) -> Result<(), SinkError> {
49 if self.pretty {
50 println!(
51 "[{}] {} | {:?}",
52 event.timestamp.format("%H:%M:%S"),
53 event.event_type,
54 event.data
55 );
56 } else {
57 println!("{}", serde_json::to_string(event)?);
58 }
59 Ok(())
60 }
61
62 async fn flush(&self) -> Result<(), SinkError> {
63 Ok(())
64 }
65
66 async fn close(&self) -> Result<(), SinkError> {
67 Ok(())
68 }
69}
70
71#[derive(Debug)]
73pub struct FileSink {
74 name: String,
75 path: PathBuf,
76 file: Arc<Mutex<File>>,
77}
78
79impl FileSink {
80 pub const fn path(&self) -> &PathBuf {
82 &self.path
83 }
84
85 pub fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
86 let path = path.into();
87 let file = OpenOptions::new().create(true).append(true).open(&path)?;
88
89 Ok(Self {
90 name: name.into(),
91 path,
92 file: Arc::new(Mutex::new(file)),
93 })
94 }
95}
96
97#[async_trait]
98impl Sink for FileSink {
99 fn name(&self) -> &str {
100 &self.name
101 }
102
103 async fn send(&self, event: &Event) -> Result<(), SinkError> {
104 let buf = event.to_sink_payload();
105 let mut file = self.file.lock().await;
106 file.write_all(&buf)?;
107 file.write_all(b"\n")?;
108 Ok(())
109 }
110
111 async fn flush(&self) -> Result<(), SinkError> {
112 let mut file = self.file.lock().await;
113 file.flush()?;
114 Ok(())
115 }
116
117 async fn close(&self) -> Result<(), SinkError> {
118 self.flush().await
119 }
120}
121
122#[derive(Debug)]
134pub struct AsyncFileSink {
135 name: String,
136 path: PathBuf,
137 file: Arc<Mutex<tokio::fs::File>>,
138 buffer: Arc<Mutex<Vec<u8>>>,
139 buffer_size: usize,
140}
141
142impl AsyncFileSink {
143 pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
145
146 pub const fn path(&self) -> &PathBuf {
148 &self.path
149 }
150
151 pub async fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
153 Self::with_buffer_size(name, path, Self::DEFAULT_BUFFER_SIZE).await
154 }
155
156 pub async fn with_buffer_size(
158 name: impl Into<String>,
159 path: impl Into<PathBuf>,
160 buffer_size: usize,
161 ) -> Result<Self, SinkError> {
162 use tokio::fs::OpenOptions;
163
164 let path = path.into();
165 let file = OpenOptions::new()
166 .create(true)
167 .append(true)
168 .open(&path)
169 .await?;
170
171 Ok(Self {
172 name: name.into(),
173 path,
174 file: Arc::new(Mutex::new(file)),
175 buffer: Arc::new(Mutex::new(Vec::with_capacity(buffer_size))),
176 buffer_size,
177 })
178 }
179}
180
181#[async_trait]
182impl Sink for AsyncFileSink {
183 fn name(&self) -> &str {
184 &self.name
185 }
186
187 async fn send(&self, event: &Event) -> Result<(), SinkError> {
188 let buf = event.to_sink_payload();
189
190 let should_flush = {
191 let mut buffer = self.buffer.lock().await;
192 buffer.extend_from_slice(&buf);
193 buffer.push(b'\n');
194 buffer.len() >= self.buffer_size
195 };
196
197 if should_flush {
198 self.flush().await?;
199 }
200
201 Ok(())
202 }
203
204 async fn flush(&self) -> Result<(), SinkError> {
205 use tokio::io::AsyncWriteExt;
206
207 let data = {
208 let mut buffer = self.buffer.lock().await;
209 std::mem::take(&mut *buffer)
210 };
211
212 if !data.is_empty() {
213 let mut file = self.file.lock().await;
214 file.write_all(&data).await?;
215 file.flush().await?;
216 }
217
218 Ok(())
219 }
220
221 async fn close(&self) -> Result<(), SinkError> {
222 self.flush().await
223 }
224}
225
226#[derive(Debug)]
228pub struct HttpSink {
229 name: String,
230 url: String,
231 client: reqwest::Client,
232 headers: IndexMap<String, String>,
233}
234
235impl HttpSink {
236 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
237 Self {
238 name: name.into(),
239 url: url.into(),
240 client: reqwest::Client::new(),
241 headers: IndexMap::new(),
242 }
243 }
244
245 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
246 self.headers.insert(key.into(), value.into());
247 self
248 }
249}
250
251#[async_trait]
252impl Sink for HttpSink {
253 fn name(&self) -> &str {
254 &self.name
255 }
256
257 async fn send(&self, event: &Event) -> Result<(), SinkError> {
258 let mut req = self.client.post(&self.url);
259 for (k, v) in &self.headers {
260 req = req.header(k.as_str(), v.as_str());
261 }
262 req = req.header("Content-Type", "application/json");
263 req = req.body(event.to_sink_payload());
264
265 match req.send().await {
266 Ok(resp) => {
267 if !resp.status().is_success() {
268 error!("HTTP sink {} got status {}", self.name, resp.status());
269 }
270 }
271 Err(e) => {
272 error!("HTTP sink {} error: {}", self.name, e);
273 }
274 }
275 Ok(())
276 }
277
278 async fn flush(&self) -> Result<(), SinkError> {
279 Ok(())
280 }
281
282 async fn close(&self) -> Result<(), SinkError> {
283 Ok(())
284 }
285}
286
287#[derive(Debug, Clone)]
289pub struct HttpRetryConfig {
290 pub max_retries: usize,
292 pub initial_delay: Duration,
294 pub max_delay: Duration,
296 pub timeout: Duration,
298}
299
300impl Default for HttpRetryConfig {
301 fn default() -> Self {
302 Self {
303 max_retries: 3,
304 initial_delay: Duration::from_millis(100),
305 max_delay: Duration::from_secs(5),
306 timeout: Duration::from_secs(30),
307 }
308 }
309}
310
311#[derive(Debug)]
328pub struct HttpSinkWithRetry {
329 name: String,
330 url: String,
331 client: reqwest::Client,
332 headers: IndexMap<String, String>,
333 retry_config: HttpRetryConfig,
334}
335
336impl HttpSinkWithRetry {
337 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
338 let config = HttpRetryConfig::default();
339 let client = reqwest::Client::builder()
340 .timeout(config.timeout)
341 .build()
342 .unwrap_or_else(|_| reqwest::Client::new());
343
344 Self {
345 name: name.into(),
346 url: url.into(),
347 client,
348 headers: IndexMap::new(),
349 retry_config: config,
350 }
351 }
352
353 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
354 self.headers.insert(key.into(), value.into());
355 self
356 }
357
358 pub fn with_retry_config(mut self, config: HttpRetryConfig) -> Self {
359 self.retry_config = config;
360 self.client = reqwest::Client::builder()
362 .timeout(self.retry_config.timeout)
363 .build()
364 .unwrap_or_else(|_| reqwest::Client::new());
365 self
366 }
367
368 async fn send_with_retry(&self, body: Vec<u8>) -> Result<(), SinkError> {
370 let mut attempt = 0;
371 let mut delay = self.retry_config.initial_delay;
372
373 loop {
374 let mut req = self.client.post(&self.url);
375 for (k, v) in &self.headers {
376 req = req.header(k.as_str(), v.as_str());
377 }
378 req = req.header("Content-Type", "application/json");
379 req = req.body(body.clone());
380
381 match req.send().await {
382 Ok(resp) => {
383 if resp.status().is_success() {
384 return Ok(());
385 } else if resp.status().is_server_error() {
386 if attempt >= self.retry_config.max_retries {
388 return Err(SinkError::other(format!(
389 "HTTP sink {} failed with status {} after {} retries",
390 self.name,
391 resp.status(),
392 attempt
393 )));
394 }
395 warn!(
396 "HTTP sink {} got {}, retrying ({}/{})",
397 self.name,
398 resp.status(),
399 attempt + 1,
400 self.retry_config.max_retries
401 );
402 } else {
403 return Err(SinkError::other(format!(
405 "HTTP sink {} got client error status {}",
406 self.name,
407 resp.status()
408 )));
409 }
410 }
411 Err(e) => {
412 if e.is_timeout() || e.is_connect() || e.is_request() {
414 if attempt >= self.retry_config.max_retries {
415 return Err(SinkError::other(format!(
416 "HTTP sink {} failed with error {} after {} retries",
417 self.name, e, attempt
418 )));
419 }
420 warn!(
421 "HTTP sink {} error: {}, retrying ({}/{})",
422 self.name,
423 e,
424 attempt + 1,
425 self.retry_config.max_retries
426 );
427 } else {
428 return Err(e.into());
430 }
431 }
432 }
433
434 attempt += 1;
436 tokio::time::sleep(delay).await;
437 delay = (delay * 2).min(self.retry_config.max_delay);
438 }
439 }
440}
441
442#[async_trait]
443impl Sink for HttpSinkWithRetry {
444 fn name(&self) -> &str {
445 &self.name
446 }
447
448 async fn send(&self, event: &Event) -> Result<(), SinkError> {
449 self.send_with_retry(event.to_sink_payload()).await
450 }
451
452 async fn flush(&self) -> Result<(), SinkError> {
453 Ok(())
454 }
455
456 async fn close(&self) -> Result<(), SinkError> {
457 Ok(())
458 }
459}
460
461pub struct MultiSink {
463 name: String,
464 sinks: Vec<Box<dyn Sink>>,
465}
466
467impl std::fmt::Debug for MultiSink {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 f.debug_struct("MultiSink")
470 .field("name", &self.name)
471 .field(
472 "sinks",
473 &self.sinks.iter().map(|s| s.name()).collect::<Vec<_>>(),
474 )
475 .finish()
476 }
477}
478
479impl MultiSink {
480 pub fn new(name: impl Into<String>) -> Self {
481 Self {
482 name: name.into(),
483 sinks: Vec::new(),
484 }
485 }
486
487 pub fn with_sink(mut self, sink: Box<dyn Sink>) -> Self {
488 self.sinks.push(sink);
489 self
490 }
491}
492
493#[async_trait]
494impl Sink for MultiSink {
495 fn name(&self) -> &str {
496 &self.name
497 }
498
499 async fn send(&self, event: &Event) -> Result<(), SinkError> {
500 for sink in &self.sinks {
501 if let Err(e) = sink.send(event).await {
502 error!("Sink {} error: {}", sink.name(), e);
503 }
504 }
505 Ok(())
506 }
507
508 async fn flush(&self) -> Result<(), SinkError> {
509 for sink in &self.sinks {
510 sink.flush().await?;
511 }
512 Ok(())
513 }
514
515 async fn close(&self) -> Result<(), SinkError> {
516 for sink in &self.sinks {
517 sink.close().await?;
518 }
519 Ok(())
520 }
521}
522
523pub struct ResilientSink {
532 inner: Arc<dyn Sink>,
533 cb: Arc<crate::circuit_breaker::CircuitBreaker>,
534 dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
535 metrics: Option<crate::metrics::Metrics>,
536}
537
538impl std::fmt::Debug for ResilientSink {
539 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540 f.debug_struct("ResilientSink")
541 .field("inner", &self.inner.name())
542 .field("cb", &self.cb)
543 .field("has_dlq", &self.dlq.is_some())
544 .field("has_metrics", &self.metrics.is_some())
545 .finish_non_exhaustive()
546 }
547}
548
549impl ResilientSink {
550 pub fn new(
552 inner: Arc<dyn Sink>,
553 cb: Arc<crate::circuit_breaker::CircuitBreaker>,
554 dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
555 metrics: Option<crate::metrics::Metrics>,
556 ) -> Self {
557 Self {
558 inner,
559 cb,
560 dlq,
561 metrics,
562 }
563 }
564
565 fn send_to_dlq(&self, error_msg: &str, events: &[Arc<Event>]) {
566 if let Some(ref dlq) = self.dlq {
567 dlq.write_batch(self.inner.name(), error_msg, events);
568 if let Some(ref metrics) = self.metrics {
569 metrics.dlq_events_total.inc_by(events.len() as f64);
570 }
571 }
572 }
573}
574
575#[async_trait]
576impl Sink for ResilientSink {
577 fn name(&self) -> &str {
578 self.inner.name()
579 }
580
581 async fn connect(&self) -> Result<(), SinkError> {
582 self.inner.connect().await
583 }
584
585 async fn send(&self, event: &Event) -> Result<(), SinkError> {
586 if !self.cb.allow_request() {
587 let arc_event = Arc::new(event.clone());
588 self.send_to_dlq("circuit breaker open", &[arc_event]);
589 return Err(SinkError::other(format!(
590 "circuit breaker open for sink '{}'",
591 self.name()
592 )));
593 }
594
595 match self.inner.send(event).await {
596 Ok(()) => {
597 self.cb.record_success();
598 Ok(())
599 }
600 Err(e) => {
601 self.cb.record_failure();
602 let error_msg = e.to_string();
603 let arc_event = Arc::new(event.clone());
604 self.send_to_dlq(&error_msg, &[arc_event]);
605 Err(e)
606 }
607 }
608 }
609
610 async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
611 if !self.cb.allow_request() {
612 self.send_to_dlq("circuit breaker open", events);
613 return Err(SinkError::other(format!(
614 "circuit breaker open for sink '{}'",
615 self.name()
616 )));
617 }
618
619 match self.inner.send_batch(events).await {
620 Ok(()) => {
621 self.cb.record_success();
622 Ok(())
623 }
624 Err(e) => {
625 self.cb.record_failure();
626 self.send_to_dlq(&e.to_string(), events);
627 Err(e)
628 }
629 }
630 }
631
632 async fn send_batch_to_topic(
633 &self,
634 events: &[Arc<Event>],
635 topic: &str,
636 ) -> Result<(), SinkError> {
637 if !self.cb.allow_request() {
638 self.send_to_dlq("circuit breaker open", events);
639 return Err(SinkError::other(format!(
640 "circuit breaker open for sink '{}'",
641 self.name()
642 )));
643 }
644
645 match self.inner.send_batch_to_topic(events, topic).await {
646 Ok(()) => {
647 self.cb.record_success();
648 Ok(())
649 }
650 Err(e) => {
651 self.cb.record_failure();
652 self.send_to_dlq(&e.to_string(), events);
653 Err(e)
654 }
655 }
656 }
657
658 async fn flush(&self) -> Result<(), SinkError> {
659 self.inner.flush().await
660 }
661
662 async fn close(&self) -> Result<(), SinkError> {
663 self.inner.close().await
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use tempfile::NamedTempFile;
670
671 use super::*;
672
673 #[tokio::test]
678 async fn test_console_sink() {
679 let sink = ConsoleSink::new("test");
680 let event = Event::new("TestEvent").with_field("value", 42i64);
681 assert!(sink.send(&event).await.is_ok());
682 }
683
684 #[tokio::test]
685 async fn test_console_sink_name() {
686 let sink = ConsoleSink::new("my_console");
687 assert_eq!(sink.name(), "my_console");
688 }
689
690 #[tokio::test]
691 async fn test_console_sink_compact() {
692 let sink = ConsoleSink::new("test").compact();
693 assert!(!sink.pretty);
694 let event = Event::new("TestEvent").with_field("value", 42i64);
695 assert!(sink.send(&event).await.is_ok());
696 }
697
698 #[tokio::test]
699 async fn test_console_sink_flush_close() {
700 let sink = ConsoleSink::new("test");
701 assert!(sink.flush().await.is_ok());
702 assert!(sink.close().await.is_ok());
703 }
704
705 #[tokio::test]
710 async fn test_file_sink() {
711 let temp_file = NamedTempFile::new().unwrap();
712 let sink = FileSink::new("test_file", temp_file.path()).unwrap();
713
714 let event = Event::new("TestEvent").with_field("value", 42i64);
715 assert!(sink.send(&event).await.is_ok());
716
717 assert!(sink.flush().await.is_ok());
718 assert!(sink.close().await.is_ok());
719
720 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
722 assert!(contents.contains("\"value\":42"));
723 }
724
725 #[tokio::test]
726 async fn test_file_sink_name() {
727 let temp_file = NamedTempFile::new().unwrap();
728 let sink = FileSink::new("my_file", temp_file.path()).unwrap();
729 assert_eq!(sink.name(), "my_file");
730 }
731
732 #[test]
737 fn test_http_sink_new() {
738 let sink = HttpSink::new("http_test", "http://localhost:8080/webhook");
739 assert_eq!(sink.name(), "http_test");
740 assert_eq!(sink.url, "http://localhost:8080/webhook");
741 }
742
743 #[test]
744 fn test_http_sink_with_header() {
745 let sink = HttpSink::new("http_test", "http://localhost:8080")
746 .with_header("Authorization", "Bearer token123")
747 .with_header("X-Custom", "value");
748
749 assert_eq!(sink.headers.len(), 2);
750 assert_eq!(
751 sink.headers.get("Authorization"),
752 Some(&"Bearer token123".to_string())
753 );
754 }
755
756 #[tokio::test]
757 async fn test_http_sink_flush_close() {
758 let sink = HttpSink::new("http_test", "http://localhost:8080");
759 assert!(sink.flush().await.is_ok());
760 assert!(sink.close().await.is_ok());
761 }
762
763 #[tokio::test]
768 async fn test_multi_sink_empty() {
769 let sink = MultiSink::new("multi");
770 assert_eq!(sink.name(), "multi");
771
772 let event = Event::new("Test");
773 assert!(sink.send(&event).await.is_ok());
774 assert!(sink.flush().await.is_ok());
775 assert!(sink.close().await.is_ok());
776 }
777
778 #[tokio::test]
779 async fn test_multi_sink_with_console() {
780 let multi = MultiSink::new("multi")
781 .with_sink(Box::new(ConsoleSink::new("console1")))
782 .with_sink(Box::new(ConsoleSink::new("console2")));
783
784 let event = Event::new("Test").with_field("x", 1i64);
785 assert!(multi.send(&event).await.is_ok());
786
787 assert!(multi.flush().await.is_ok());
788 assert!(multi.close().await.is_ok());
789 }
790
791 #[tokio::test]
792 async fn test_multi_sink_with_file() {
793 let temp_file = NamedTempFile::new().unwrap();
794 let file_sink = FileSink::new("file", temp_file.path()).unwrap();
795
796 let multi = MultiSink::new("multi").with_sink(Box::new(file_sink));
797
798 let event = Event::new("MultiEvent").with_field("val", 100i64);
799 assert!(multi.send(&event).await.is_ok());
800 assert!(multi.flush().await.is_ok());
801
802 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
803 assert!(contents.contains("\"val\":100"));
804 }
805
806 #[tokio::test]
811 async fn test_file_sink_path() {
812 let temp_file = NamedTempFile::new().unwrap();
813 let expected_path = temp_file.path().to_path_buf();
814 let sink = FileSink::new("test", temp_file.path()).unwrap();
815
816 assert_eq!(sink.path(), &expected_path);
817 }
818
819 #[tokio::test]
820 async fn test_file_sink_multiple_events() {
821 let temp_file = NamedTempFile::new().unwrap();
822 let sink = FileSink::new("test", temp_file.path()).unwrap();
823
824 for i in 0..5 {
826 let event = Event::new("Event").with_field("id", i as i64);
827 sink.send(&event).await.unwrap();
828 }
829 sink.flush().await.unwrap();
830
831 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
833 let lines: Vec<&str> = contents.lines().collect();
834 assert_eq!(lines.len(), 5);
835 }
836
837 #[tokio::test]
842 async fn test_multi_sink_three_sinks() {
843 let temp1 = NamedTempFile::new().unwrap();
844 let temp2 = NamedTempFile::new().unwrap();
845
846 let multi = MultiSink::new("triple")
847 .with_sink(Box::new(ConsoleSink::new("console")))
848 .with_sink(Box::new(FileSink::new("file1", temp1.path()).unwrap()))
849 .with_sink(Box::new(FileSink::new("file2", temp2.path()).unwrap()));
850
851 let event = Event::new("TripleEvent");
852 multi.send(&event).await.unwrap();
853 multi.flush().await.unwrap();
854 multi.close().await.unwrap();
855
856 let contents1 = std::fs::read_to_string(temp1.path()).unwrap();
858 let contents2 = std::fs::read_to_string(temp2.path()).unwrap();
859 assert!(contents1.contains("timestamp"));
860 assert!(contents2.contains("timestamp"));
861 }
862
863 #[test]
868 fn test_file_sink_invalid_path() {
869 let result = FileSink::new("test", "/nonexistent/path/file.json");
871 assert!(result.is_err());
872 }
873
874 #[tokio::test]
879 async fn test_async_file_sink_basic() {
880 let temp_file = NamedTempFile::new().unwrap();
881 let sink = AsyncFileSink::new("test_async", temp_file.path())
882 .await
883 .unwrap();
884
885 let event = Event::new("AsyncTestEvent").with_field("value", 123i64);
886 assert!(sink.send(&event).await.is_ok());
887 assert!(sink.flush().await.is_ok());
888 assert!(sink.close().await.is_ok());
889
890 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
892 assert!(contents.contains("\"value\":123"));
893 assert!(contents.contains("123"));
894 }
895
896 #[tokio::test]
897 async fn test_async_file_sink_name() {
898 let temp_file = NamedTempFile::new().unwrap();
899 let sink = AsyncFileSink::new("my_async_file", temp_file.path())
900 .await
901 .unwrap();
902 assert_eq!(sink.name(), "my_async_file");
903 }
904
905 #[tokio::test]
906 async fn test_async_file_sink_path() {
907 let temp_file = NamedTempFile::new().unwrap();
908 let expected_path = temp_file.path().to_path_buf();
909 let sink = AsyncFileSink::new("test", temp_file.path()).await.unwrap();
910 assert_eq!(sink.path(), &expected_path);
911 }
912
913 #[tokio::test]
914 async fn test_async_file_sink_multiple_events() {
915 let temp_file = NamedTempFile::new().unwrap();
916 let sink = AsyncFileSink::new("test_async", temp_file.path())
917 .await
918 .unwrap();
919
920 for i in 0..5 {
922 let event = Event::new("AsyncEvent").with_field("id", i as i64);
923 sink.send(&event).await.unwrap();
924 }
925 sink.flush().await.unwrap();
926
927 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
929 let lines: Vec<&str> = contents.lines().collect();
930 assert_eq!(lines.len(), 5);
931 }
932
933 #[tokio::test]
934 async fn test_async_file_sink_custom_buffer_size() {
935 let temp_file = NamedTempFile::new().unwrap();
936 let sink = AsyncFileSink::with_buffer_size("test", temp_file.path(), 50)
938 .await
939 .unwrap();
940
941 for i in 0..10 {
943 let event = Event::new("BufferTest").with_field("id", i as i64);
944 sink.send(&event).await.unwrap();
945 }
946 sink.flush().await.unwrap();
948
949 let contents = std::fs::read_to_string(temp_file.path()).unwrap();
950 let lines: Vec<&str> = contents.lines().collect();
951 assert_eq!(lines.len(), 10);
952 }
953
954 #[tokio::test]
955 async fn test_async_file_sink_invalid_path() {
956 let result = AsyncFileSink::new("test", "/nonexistent/path/file.json").await;
957 assert!(result.is_err());
958 }
959
960 #[tokio::test]
961 async fn test_async_file_sink_in_multi_sink() {
962 let temp = NamedTempFile::new().unwrap();
963 let async_sink = AsyncFileSink::new("async", temp.path()).await.unwrap();
964
965 let multi = MultiSink::new("multi_with_async").with_sink(Box::new(async_sink));
966
967 let event = Event::new("MultiAsyncEvent");
968 multi.send(&event).await.unwrap();
969 multi.flush().await.unwrap();
970
971 let contents = std::fs::read_to_string(temp.path()).unwrap();
972 assert!(contents.contains("timestamp"));
973 }
974
975 #[test]
980 fn test_http_retry_config_default() {
981 let config = HttpRetryConfig::default();
982 assert_eq!(config.max_retries, 3);
983 assert_eq!(config.initial_delay, std::time::Duration::from_millis(100));
984 assert_eq!(config.max_delay, std::time::Duration::from_secs(5));
985 assert_eq!(config.timeout, std::time::Duration::from_secs(30));
986 }
987
988 #[test]
989 fn test_http_sink_with_retry_creation() {
990 let sink = HttpSinkWithRetry::new("retry_test", "http://localhost:8080/webhook");
991 assert_eq!(sink.name(), "retry_test");
992 assert_eq!(sink.url, "http://localhost:8080/webhook");
993 }
994
995 #[test]
996 fn test_http_sink_with_retry_headers() {
997 let sink = HttpSinkWithRetry::new("test", "http://localhost:8080")
998 .with_header("Authorization", "Bearer token123")
999 .with_header("X-Custom", "value");
1000
1001 assert_eq!(sink.headers.len(), 2);
1002 assert_eq!(
1003 sink.headers.get("Authorization"),
1004 Some(&"Bearer token123".to_string())
1005 );
1006 }
1007
1008 #[test]
1009 fn test_http_sink_with_retry_custom_config() {
1010 let config = HttpRetryConfig {
1011 max_retries: 5,
1012 initial_delay: std::time::Duration::from_millis(200),
1013 max_delay: std::time::Duration::from_secs(10),
1014 timeout: std::time::Duration::from_secs(60),
1015 };
1016 let sink =
1017 HttpSinkWithRetry::new("test", "http://localhost:8080").with_retry_config(config);
1018
1019 assert_eq!(sink.retry_config.max_retries, 5);
1020 assert_eq!(
1021 sink.retry_config.initial_delay,
1022 std::time::Duration::from_millis(200)
1023 );
1024 }
1025
1026 #[tokio::test]
1027 async fn test_http_sink_with_retry_flush_close() {
1028 let sink = HttpSinkWithRetry::new("test", "http://localhost:8080");
1029 assert!(sink.flush().await.is_ok());
1030 assert!(sink.close().await.is_ok());
1031 }
1032
1033 struct MockSink {
1039 name: String,
1040 fail: std::sync::atomic::AtomicBool,
1041 send_count: std::sync::atomic::AtomicU64,
1042 }
1043
1044 impl MockSink {
1045 fn new(name: &str) -> Self {
1046 Self {
1047 name: name.to_string(),
1048 fail: std::sync::atomic::AtomicBool::new(false),
1049 send_count: std::sync::atomic::AtomicU64::new(0),
1050 }
1051 }
1052
1053 fn set_fail(&self, fail: bool) {
1054 self.fail.store(fail, std::sync::atomic::Ordering::Relaxed);
1055 }
1056
1057 fn send_count(&self) -> u64 {
1058 self.send_count.load(std::sync::atomic::Ordering::Relaxed)
1059 }
1060 }
1061
1062 #[async_trait]
1063 impl Sink for MockSink {
1064 fn name(&self) -> &str {
1065 &self.name
1066 }
1067
1068 async fn send(&self, _event: &Event) -> Result<(), SinkError> {
1069 if self.fail.load(std::sync::atomic::Ordering::Relaxed) {
1070 Err(SinkError::other("mock send failure"))
1071 } else {
1072 self.send_count
1073 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1074 Ok(())
1075 }
1076 }
1077
1078 async fn flush(&self) -> Result<(), SinkError> {
1079 Ok(())
1080 }
1081 async fn close(&self) -> Result<(), SinkError> {
1082 Ok(())
1083 }
1084 }
1085
1086 #[tokio::test]
1087 async fn test_resilient_sink_success_passthrough() {
1088 let mock = Arc::new(MockSink::new("test-sink"));
1089 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1090 crate::circuit_breaker::CircuitBreakerConfig {
1091 failure_threshold: 3,
1092 reset_timeout: std::time::Duration::from_secs(60),
1093 },
1094 ));
1095 let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1096
1097 let event = Event::new("TestEvent");
1098 assert!(resilient.send(&event).await.is_ok());
1099 assert_eq!(mock.send_count(), 1);
1100 assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1101 }
1102
1103 #[tokio::test]
1104 async fn test_resilient_sink_failure_opens_circuit() {
1105 let mock = Arc::new(MockSink::new("test-sink"));
1106 mock.set_fail(true);
1107
1108 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1109 crate::circuit_breaker::CircuitBreakerConfig {
1110 failure_threshold: 2,
1111 reset_timeout: std::time::Duration::from_secs(60),
1112 },
1113 ));
1114
1115 let dlq_path = std::env::temp_dir().join("varpulis_resilient_test_dlq.jsonl");
1116 let _ = std::fs::remove_file(&dlq_path);
1117 let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1118
1119 let resilient = ResilientSink::new(mock.clone(), cb.clone(), Some(dlq.clone()), None);
1120
1121 let event = Event::new("TestEvent");
1122
1123 assert!(resilient.send(&event).await.is_err());
1125 assert_eq!(dlq.count(), 1);
1126
1127 assert!(resilient.send(&event).await.is_err());
1129 assert_eq!(dlq.count(), 2);
1130 assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1131
1132 assert!(resilient.send(&event).await.is_err());
1134 assert_eq!(dlq.count(), 3);
1135
1136 let _ = std::fs::remove_file(&dlq_path);
1137 }
1138
1139 #[tokio::test]
1140 async fn test_resilient_sink_batch_with_dlq() {
1141 let mock = Arc::new(MockSink::new("batch-sink"));
1142 mock.set_fail(true);
1143
1144 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1145 crate::circuit_breaker::CircuitBreakerConfig::default(),
1146 ));
1147
1148 let dlq_path = std::env::temp_dir().join("varpulis_resilient_batch_dlq.jsonl");
1149 let _ = std::fs::remove_file(&dlq_path);
1150 let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1151
1152 let resilient = ResilientSink::new(mock, cb, Some(dlq.clone()), None);
1153
1154 let events: Vec<Arc<Event>> = (0..3)
1155 .map(|i| Arc::new(Event::new(format!("Event{i}"))))
1156 .collect();
1157
1158 assert!(resilient.send_batch(&events).await.is_err());
1159 assert_eq!(dlq.count(), 3); let _ = std::fs::remove_file(&dlq_path);
1162 }
1163
1164 #[tokio::test]
1165 async fn test_resilient_sink_recovery() {
1166 let mock = Arc::new(MockSink::new("recover-sink"));
1167 mock.set_fail(true);
1168
1169 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1170 crate::circuit_breaker::CircuitBreakerConfig {
1171 failure_threshold: 1,
1172 reset_timeout: std::time::Duration::from_millis(10),
1173 },
1174 ));
1175
1176 let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1177
1178 let event = Event::new("TestEvent");
1179
1180 assert!(resilient.send(&event).await.is_err());
1182 assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1183
1184 tokio::time::sleep(std::time::Duration::from_millis(15)).await;
1186
1187 mock.set_fail(false);
1189
1190 assert!(resilient.send(&event).await.is_ok());
1192 assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1193 assert_eq!(mock.send_count(), 1);
1194 }
1195
1196 #[tokio::test]
1197 async fn test_resilient_sink_name_passthrough() {
1198 let mock = Arc::new(MockSink::new("my-kafka-sink"));
1199 let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1200 crate::circuit_breaker::CircuitBreakerConfig::default(),
1201 ));
1202 let resilient = ResilientSink::new(mock, cb, None, None);
1203 assert_eq!(resilient.name(), "my-kafka-sink");
1204 }
1205}