Skip to main content

varpulis_runtime/
sink.rs

1//! Sink implementations for outputting processed events
2//!
3//! The `Sink` trait and `SinkError` are defined in `varpulis-connectors` and
4//! re-exported here. This module provides the built-in sink implementations.
5
6use std::fs::{File, OpenOptions};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use indexmap::IndexMap;
14use serde_json;
15use tokio::sync::Mutex;
16use tracing::{error, warn};
17pub use varpulis_connectors::sink::{Sink, SinkConnectorAdapter, SinkError};
18
19use crate::event::Event;
20
21/// Console sink - prints to stdout
22#[derive(Debug)]
23pub struct ConsoleSink {
24    name: String,
25    pretty: bool,
26}
27
28impl ConsoleSink {
29    pub fn new(name: impl Into<String>) -> Self {
30        Self {
31            name: name.into(),
32            pretty: true,
33        }
34    }
35
36    pub const fn compact(mut self) -> Self {
37        self.pretty = false;
38        self
39    }
40}
41
42#[async_trait]
43impl Sink for ConsoleSink {
44    fn name(&self) -> &str {
45        &self.name
46    }
47
48    async fn send(&self, event: &Event) -> Result<(), SinkError> {
49        if self.pretty {
50            println!(
51                "[{}] {} | {:?}",
52                event.timestamp.format("%H:%M:%S"),
53                event.event_type,
54                event.data
55            );
56        } else {
57            println!("{}", serde_json::to_string(event)?);
58        }
59        Ok(())
60    }
61
62    async fn flush(&self) -> Result<(), SinkError> {
63        Ok(())
64    }
65
66    async fn close(&self) -> Result<(), SinkError> {
67        Ok(())
68    }
69}
70
71/// File sink - writes JSON lines to a file
72#[derive(Debug)]
73pub struct FileSink {
74    name: String,
75    path: PathBuf,
76    file: Arc<Mutex<File>>,
77}
78
79impl FileSink {
80    /// Get the file path
81    pub const fn path(&self) -> &PathBuf {
82        &self.path
83    }
84
85    pub fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
86        let path = path.into();
87        let file = OpenOptions::new().create(true).append(true).open(&path)?;
88
89        Ok(Self {
90            name: name.into(),
91            path,
92            file: Arc::new(Mutex::new(file)),
93        })
94    }
95}
96
97#[async_trait]
98impl Sink for FileSink {
99    fn name(&self) -> &str {
100        &self.name
101    }
102
103    async fn send(&self, event: &Event) -> Result<(), SinkError> {
104        let buf = event.to_sink_payload();
105        let mut file = self.file.lock().await;
106        file.write_all(&buf)?;
107        file.write_all(b"\n")?;
108        Ok(())
109    }
110
111    async fn flush(&self) -> Result<(), SinkError> {
112        let mut file = self.file.lock().await;
113        file.flush()?;
114        Ok(())
115    }
116
117    async fn close(&self) -> Result<(), SinkError> {
118        self.flush().await
119    }
120}
121
122/// Async file sink - writes JSON lines using tokio::fs (non-blocking)
123///
124/// Unlike `FileSink`, this implementation uses async I/O and does not block
125/// the tokio runtime. It also includes buffering for better throughput.
126///
127/// # Example
128/// ```text
129/// let sink = AsyncFileSink::new("output", "/tmp/events.jsonl").await?;
130/// sink.send(&event).await?;
131/// sink.flush().await?;
132/// ```
133#[derive(Debug)]
134pub struct AsyncFileSink {
135    name: String,
136    path: PathBuf,
137    file: Arc<Mutex<tokio::fs::File>>,
138    buffer: Arc<Mutex<Vec<u8>>>,
139    buffer_size: usize,
140}
141
142impl AsyncFileSink {
143    /// Default buffer size (64KB)
144    pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
145
146    /// Get the file path
147    pub const fn path(&self) -> &PathBuf {
148        &self.path
149    }
150
151    /// Create a new async file sink with default buffer size
152    pub async fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
153        Self::with_buffer_size(name, path, Self::DEFAULT_BUFFER_SIZE).await
154    }
155
156    /// Create a new async file sink with custom buffer size
157    pub async fn with_buffer_size(
158        name: impl Into<String>,
159        path: impl Into<PathBuf>,
160        buffer_size: usize,
161    ) -> Result<Self, SinkError> {
162        use tokio::fs::OpenOptions;
163
164        let path = path.into();
165        let file = OpenOptions::new()
166            .create(true)
167            .append(true)
168            .open(&path)
169            .await?;
170
171        Ok(Self {
172            name: name.into(),
173            path,
174            file: Arc::new(Mutex::new(file)),
175            buffer: Arc::new(Mutex::new(Vec::with_capacity(buffer_size))),
176            buffer_size,
177        })
178    }
179}
180
181#[async_trait]
182impl Sink for AsyncFileSink {
183    fn name(&self) -> &str {
184        &self.name
185    }
186
187    async fn send(&self, event: &Event) -> Result<(), SinkError> {
188        let buf = event.to_sink_payload();
189
190        let should_flush = {
191            let mut buffer = self.buffer.lock().await;
192            buffer.extend_from_slice(&buf);
193            buffer.push(b'\n');
194            buffer.len() >= self.buffer_size
195        };
196
197        if should_flush {
198            self.flush().await?;
199        }
200
201        Ok(())
202    }
203
204    async fn flush(&self) -> Result<(), SinkError> {
205        use tokio::io::AsyncWriteExt;
206
207        let data = {
208            let mut buffer = self.buffer.lock().await;
209            std::mem::take(&mut *buffer)
210        };
211
212        if !data.is_empty() {
213            let mut file = self.file.lock().await;
214            file.write_all(&data).await?;
215            file.flush().await?;
216        }
217
218        Ok(())
219    }
220
221    async fn close(&self) -> Result<(), SinkError> {
222        self.flush().await
223    }
224}
225
226/// HTTP webhook sink
227#[derive(Debug)]
228pub struct HttpSink {
229    name: String,
230    url: String,
231    client: reqwest::Client,
232    headers: IndexMap<String, String>,
233}
234
235impl HttpSink {
236    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
237        Self {
238            name: name.into(),
239            url: url.into(),
240            client: reqwest::Client::new(),
241            headers: IndexMap::new(),
242        }
243    }
244
245    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
246        self.headers.insert(key.into(), value.into());
247        self
248    }
249}
250
251#[async_trait]
252impl Sink for HttpSink {
253    fn name(&self) -> &str {
254        &self.name
255    }
256
257    async fn send(&self, event: &Event) -> Result<(), SinkError> {
258        let mut req = self.client.post(&self.url);
259        for (k, v) in &self.headers {
260            req = req.header(k.as_str(), v.as_str());
261        }
262        req = req.header("Content-Type", "application/json");
263        req = req.body(event.to_sink_payload());
264
265        match req.send().await {
266            Ok(resp) => {
267                if !resp.status().is_success() {
268                    error!("HTTP sink {} got status {}", self.name, resp.status());
269                }
270            }
271            Err(e) => {
272                error!("HTTP sink {} error: {}", self.name, e);
273            }
274        }
275        Ok(())
276    }
277
278    async fn flush(&self) -> Result<(), SinkError> {
279        Ok(())
280    }
281
282    async fn close(&self) -> Result<(), SinkError> {
283        Ok(())
284    }
285}
286
287/// Configuration for HTTP sink retry behavior
288#[derive(Debug, Clone)]
289pub struct HttpRetryConfig {
290    /// Maximum number of retry attempts (0 = no retries)
291    pub max_retries: usize,
292    /// Initial delay between retries (doubles each attempt)
293    pub initial_delay: Duration,
294    /// Maximum delay between retries
295    pub max_delay: Duration,
296    /// Request timeout
297    pub timeout: Duration,
298}
299
300impl Default for HttpRetryConfig {
301    fn default() -> Self {
302        Self {
303            max_retries: 3,
304            initial_delay: Duration::from_millis(100),
305            max_delay: Duration::from_secs(5),
306            timeout: Duration::from_secs(30),
307        }
308    }
309}
310
311/// HTTP webhook sink with retry logic
312///
313/// Unlike `HttpSink`, this implementation retries failed requests with exponential
314/// backoff. It distinguishes between retryable errors (5xx, timeouts, network errors)
315/// and non-retryable errors (4xx client errors).
316///
317/// # Example
318/// ```text
319/// let sink = HttpSinkWithRetry::new("webhook", "https://api.example.com/events")
320///     .with_header("Authorization", "Bearer token123")
321///     .with_retry_config(HttpRetryConfig {
322///         max_retries: 5,
323///         ..Default::default()
324///     });
325/// sink.send(&event).await?;
326/// ```
327#[derive(Debug)]
328pub struct HttpSinkWithRetry {
329    name: String,
330    url: String,
331    client: reqwest::Client,
332    headers: IndexMap<String, String>,
333    retry_config: HttpRetryConfig,
334}
335
336impl HttpSinkWithRetry {
337    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
338        let config = HttpRetryConfig::default();
339        let client = reqwest::Client::builder()
340            .timeout(config.timeout)
341            .build()
342            .unwrap_or_else(|_| reqwest::Client::new());
343
344        Self {
345            name: name.into(),
346            url: url.into(),
347            client,
348            headers: IndexMap::new(),
349            retry_config: config,
350        }
351    }
352
353    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
354        self.headers.insert(key.into(), value.into());
355        self
356    }
357
358    pub fn with_retry_config(mut self, config: HttpRetryConfig) -> Self {
359        self.retry_config = config;
360        // Rebuild client with new timeout
361        self.client = reqwest::Client::builder()
362            .timeout(self.retry_config.timeout)
363            .build()
364            .unwrap_or_else(|_| reqwest::Client::new());
365        self
366    }
367
368    /// Send with retry logic
369    async fn send_with_retry(&self, body: Vec<u8>) -> Result<(), SinkError> {
370        let mut attempt = 0;
371        let mut delay = self.retry_config.initial_delay;
372
373        loop {
374            let mut req = self.client.post(&self.url);
375            for (k, v) in &self.headers {
376                req = req.header(k.as_str(), v.as_str());
377            }
378            req = req.header("Content-Type", "application/json");
379            req = req.body(body.clone());
380
381            match req.send().await {
382                Ok(resp) => {
383                    if resp.status().is_success() {
384                        return Ok(());
385                    } else if resp.status().is_server_error() {
386                        // 5xx: retryable
387                        if attempt >= self.retry_config.max_retries {
388                            return Err(SinkError::other(format!(
389                                "HTTP sink {} failed with status {} after {} retries",
390                                self.name,
391                                resp.status(),
392                                attempt
393                            )));
394                        }
395                        warn!(
396                            "HTTP sink {} got {}, retrying ({}/{})",
397                            self.name,
398                            resp.status(),
399                            attempt + 1,
400                            self.retry_config.max_retries
401                        );
402                    } else {
403                        // 4xx: not retryable (client error)
404                        return Err(SinkError::other(format!(
405                            "HTTP sink {} got client error status {}",
406                            self.name,
407                            resp.status()
408                        )));
409                    }
410                }
411                Err(e) => {
412                    // Network errors and timeouts are retryable
413                    if e.is_timeout() || e.is_connect() || e.is_request() {
414                        if attempt >= self.retry_config.max_retries {
415                            return Err(SinkError::other(format!(
416                                "HTTP sink {} failed with error {} after {} retries",
417                                self.name, e, attempt
418                            )));
419                        }
420                        warn!(
421                            "HTTP sink {} error: {}, retrying ({}/{})",
422                            self.name,
423                            e,
424                            attempt + 1,
425                            self.retry_config.max_retries
426                        );
427                    } else {
428                        // Other errors (e.g., serialization) are not retryable
429                        return Err(e.into());
430                    }
431                }
432            }
433
434            // Exponential backoff
435            attempt += 1;
436            tokio::time::sleep(delay).await;
437            delay = (delay * 2).min(self.retry_config.max_delay);
438        }
439    }
440}
441
442#[async_trait]
443impl Sink for HttpSinkWithRetry {
444    fn name(&self) -> &str {
445        &self.name
446    }
447
448    async fn send(&self, event: &Event) -> Result<(), SinkError> {
449        self.send_with_retry(event.to_sink_payload()).await
450    }
451
452    async fn flush(&self) -> Result<(), SinkError> {
453        Ok(())
454    }
455
456    async fn close(&self) -> Result<(), SinkError> {
457        Ok(())
458    }
459}
460
461/// Multi-sink that broadcasts to multiple sinks
462pub struct MultiSink {
463    name: String,
464    sinks: Vec<Box<dyn Sink>>,
465}
466
467impl std::fmt::Debug for MultiSink {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        f.debug_struct("MultiSink")
470            .field("name", &self.name)
471            .field(
472                "sinks",
473                &self.sinks.iter().map(|s| s.name()).collect::<Vec<_>>(),
474            )
475            .finish()
476    }
477}
478
479impl MultiSink {
480    pub fn new(name: impl Into<String>) -> Self {
481        Self {
482            name: name.into(),
483            sinks: Vec::new(),
484        }
485    }
486
487    pub fn with_sink(mut self, sink: Box<dyn Sink>) -> Self {
488        self.sinks.push(sink);
489        self
490    }
491}
492
493#[async_trait]
494impl Sink for MultiSink {
495    fn name(&self) -> &str {
496        &self.name
497    }
498
499    async fn send(&self, event: &Event) -> Result<(), SinkError> {
500        for sink in &self.sinks {
501            if let Err(e) = sink.send(event).await {
502                error!("Sink {} error: {}", sink.name(), e);
503            }
504        }
505        Ok(())
506    }
507
508    async fn flush(&self) -> Result<(), SinkError> {
509        for sink in &self.sinks {
510            sink.flush().await?;
511        }
512        Ok(())
513    }
514
515    async fn close(&self) -> Result<(), SinkError> {
516        for sink in &self.sinks {
517            sink.close().await?;
518        }
519        Ok(())
520    }
521}
522
523/// A resilient sink wrapper that adds circuit breaker and dead letter queue.
524///
525/// Wraps any [`Sink`] implementation with:
526/// - **Circuit breaker**: rejects sends immediately when the downstream is unhealthy
527/// - **Dead letter queue**: routes failed events to a DLQ file instead of dropping them
528///
529/// When the circuit is open or a send fails, events are written to the DLQ.
530/// When the circuit recovers (half-open → closed), normal delivery resumes.
531pub struct ResilientSink {
532    inner: Arc<dyn Sink>,
533    cb: Arc<crate::circuit_breaker::CircuitBreaker>,
534    dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
535    metrics: Option<crate::metrics::Metrics>,
536}
537
538impl std::fmt::Debug for ResilientSink {
539    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540        f.debug_struct("ResilientSink")
541            .field("inner", &self.inner.name())
542            .field("cb", &self.cb)
543            .field("has_dlq", &self.dlq.is_some())
544            .field("has_metrics", &self.metrics.is_some())
545            .finish_non_exhaustive()
546    }
547}
548
549impl ResilientSink {
550    /// Wrap a sink with circuit breaker protection.
551    pub fn new(
552        inner: Arc<dyn Sink>,
553        cb: Arc<crate::circuit_breaker::CircuitBreaker>,
554        dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
555        metrics: Option<crate::metrics::Metrics>,
556    ) -> Self {
557        Self {
558            inner,
559            cb,
560            dlq,
561            metrics,
562        }
563    }
564
565    fn send_to_dlq(&self, error_msg: &str, events: &[Arc<Event>]) {
566        if let Some(ref dlq) = self.dlq {
567            dlq.write_batch(self.inner.name(), error_msg, events);
568            if let Some(ref metrics) = self.metrics {
569                metrics.dlq_events_total.inc_by(events.len() as f64);
570            }
571        }
572    }
573}
574
575#[async_trait]
576impl Sink for ResilientSink {
577    fn name(&self) -> &str {
578        self.inner.name()
579    }
580
581    async fn connect(&self) -> Result<(), SinkError> {
582        self.inner.connect().await
583    }
584
585    async fn send(&self, event: &Event) -> Result<(), SinkError> {
586        if !self.cb.allow_request() {
587            let arc_event = Arc::new(event.clone());
588            self.send_to_dlq("circuit breaker open", &[arc_event]);
589            return Err(SinkError::other(format!(
590                "circuit breaker open for sink '{}'",
591                self.name()
592            )));
593        }
594
595        match self.inner.send(event).await {
596            Ok(()) => {
597                self.cb.record_success();
598                Ok(())
599            }
600            Err(e) => {
601                self.cb.record_failure();
602                let error_msg = e.to_string();
603                let arc_event = Arc::new(event.clone());
604                self.send_to_dlq(&error_msg, &[arc_event]);
605                Err(e)
606            }
607        }
608    }
609
610    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
611        if !self.cb.allow_request() {
612            self.send_to_dlq("circuit breaker open", events);
613            return Err(SinkError::other(format!(
614                "circuit breaker open for sink '{}'",
615                self.name()
616            )));
617        }
618
619        match self.inner.send_batch(events).await {
620            Ok(()) => {
621                self.cb.record_success();
622                Ok(())
623            }
624            Err(e) => {
625                self.cb.record_failure();
626                self.send_to_dlq(&e.to_string(), events);
627                Err(e)
628            }
629        }
630    }
631
632    async fn send_batch_to_topic(
633        &self,
634        events: &[Arc<Event>],
635        topic: &str,
636    ) -> Result<(), SinkError> {
637        if !self.cb.allow_request() {
638            self.send_to_dlq("circuit breaker open", events);
639            return Err(SinkError::other(format!(
640                "circuit breaker open for sink '{}'",
641                self.name()
642            )));
643        }
644
645        match self.inner.send_batch_to_topic(events, topic).await {
646            Ok(()) => {
647                self.cb.record_success();
648                Ok(())
649            }
650            Err(e) => {
651                self.cb.record_failure();
652                self.send_to_dlq(&e.to_string(), events);
653                Err(e)
654            }
655        }
656    }
657
658    async fn flush(&self) -> Result<(), SinkError> {
659        self.inner.flush().await
660    }
661
662    async fn close(&self) -> Result<(), SinkError> {
663        self.inner.close().await
664    }
665}
666
667#[cfg(test)]
668mod tests {
669    use tempfile::NamedTempFile;
670
671    use super::*;
672
673    // ==========================================================================
674    // ConsoleSink Tests
675    // ==========================================================================
676
677    #[tokio::test]
678    async fn test_console_sink() {
679        let sink = ConsoleSink::new("test");
680        let event = Event::new("TestEvent").with_field("value", 42i64);
681        assert!(sink.send(&event).await.is_ok());
682    }
683
684    #[tokio::test]
685    async fn test_console_sink_name() {
686        let sink = ConsoleSink::new("my_console");
687        assert_eq!(sink.name(), "my_console");
688    }
689
690    #[tokio::test]
691    async fn test_console_sink_compact() {
692        let sink = ConsoleSink::new("test").compact();
693        assert!(!sink.pretty);
694        let event = Event::new("TestEvent").with_field("value", 42i64);
695        assert!(sink.send(&event).await.is_ok());
696    }
697
698    #[tokio::test]
699    async fn test_console_sink_flush_close() {
700        let sink = ConsoleSink::new("test");
701        assert!(sink.flush().await.is_ok());
702        assert!(sink.close().await.is_ok());
703    }
704
705    // ==========================================================================
706    // FileSink Tests
707    // ==========================================================================
708
709    #[tokio::test]
710    async fn test_file_sink() {
711        let temp_file = NamedTempFile::new().unwrap();
712        let sink = FileSink::new("test_file", temp_file.path()).unwrap();
713
714        let event = Event::new("TestEvent").with_field("value", 42i64);
715        assert!(sink.send(&event).await.is_ok());
716
717        assert!(sink.flush().await.is_ok());
718        assert!(sink.close().await.is_ok());
719
720        // Verify file contains the event
721        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
722        assert!(contents.contains("\"value\":42"));
723    }
724
725    #[tokio::test]
726    async fn test_file_sink_name() {
727        let temp_file = NamedTempFile::new().unwrap();
728        let sink = FileSink::new("my_file", temp_file.path()).unwrap();
729        assert_eq!(sink.name(), "my_file");
730    }
731
732    // ==========================================================================
733    // HttpSink Tests (no actual network calls)
734    // ==========================================================================
735
736    #[test]
737    fn test_http_sink_new() {
738        let sink = HttpSink::new("http_test", "http://localhost:8080/webhook");
739        assert_eq!(sink.name(), "http_test");
740        assert_eq!(sink.url, "http://localhost:8080/webhook");
741    }
742
743    #[test]
744    fn test_http_sink_with_header() {
745        let sink = HttpSink::new("http_test", "http://localhost:8080")
746            .with_header("Authorization", "Bearer token123")
747            .with_header("X-Custom", "value");
748
749        assert_eq!(sink.headers.len(), 2);
750        assert_eq!(
751            sink.headers.get("Authorization"),
752            Some(&"Bearer token123".to_string())
753        );
754    }
755
756    #[tokio::test]
757    async fn test_http_sink_flush_close() {
758        let sink = HttpSink::new("http_test", "http://localhost:8080");
759        assert!(sink.flush().await.is_ok());
760        assert!(sink.close().await.is_ok());
761    }
762
763    // ==========================================================================
764    // MultiSink Tests
765    // ==========================================================================
766
767    #[tokio::test]
768    async fn test_multi_sink_empty() {
769        let sink = MultiSink::new("multi");
770        assert_eq!(sink.name(), "multi");
771
772        let event = Event::new("Test");
773        assert!(sink.send(&event).await.is_ok());
774        assert!(sink.flush().await.is_ok());
775        assert!(sink.close().await.is_ok());
776    }
777
778    #[tokio::test]
779    async fn test_multi_sink_with_console() {
780        let multi = MultiSink::new("multi")
781            .with_sink(Box::new(ConsoleSink::new("console1")))
782            .with_sink(Box::new(ConsoleSink::new("console2")));
783
784        let event = Event::new("Test").with_field("x", 1i64);
785        assert!(multi.send(&event).await.is_ok());
786
787        assert!(multi.flush().await.is_ok());
788        assert!(multi.close().await.is_ok());
789    }
790
791    #[tokio::test]
792    async fn test_multi_sink_with_file() {
793        let temp_file = NamedTempFile::new().unwrap();
794        let file_sink = FileSink::new("file", temp_file.path()).unwrap();
795
796        let multi = MultiSink::new("multi").with_sink(Box::new(file_sink));
797
798        let event = Event::new("MultiEvent").with_field("val", 100i64);
799        assert!(multi.send(&event).await.is_ok());
800        assert!(multi.flush().await.is_ok());
801
802        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
803        assert!(contents.contains("\"val\":100"));
804    }
805
806    // ==========================================================================
807    // Additional FileSink Tests
808    // ==========================================================================
809
810    #[tokio::test]
811    async fn test_file_sink_path() {
812        let temp_file = NamedTempFile::new().unwrap();
813        let expected_path = temp_file.path().to_path_buf();
814        let sink = FileSink::new("test", temp_file.path()).unwrap();
815
816        assert_eq!(sink.path(), &expected_path);
817    }
818
819    #[tokio::test]
820    async fn test_file_sink_multiple_events() {
821        let temp_file = NamedTempFile::new().unwrap();
822        let sink = FileSink::new("test", temp_file.path()).unwrap();
823
824        // Write multiple events
825        for i in 0..5 {
826            let event = Event::new("Event").with_field("id", i as i64);
827            sink.send(&event).await.unwrap();
828        }
829        sink.flush().await.unwrap();
830
831        // Read and verify all events are written
832        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
833        let lines: Vec<&str> = contents.lines().collect();
834        assert_eq!(lines.len(), 5);
835    }
836
837    // ==========================================================================
838    // Additional MultiSink Tests
839    // ==========================================================================
840
841    #[tokio::test]
842    async fn test_multi_sink_three_sinks() {
843        let temp1 = NamedTempFile::new().unwrap();
844        let temp2 = NamedTempFile::new().unwrap();
845
846        let multi = MultiSink::new("triple")
847            .with_sink(Box::new(ConsoleSink::new("console")))
848            .with_sink(Box::new(FileSink::new("file1", temp1.path()).unwrap()))
849            .with_sink(Box::new(FileSink::new("file2", temp2.path()).unwrap()));
850
851        let event = Event::new("TripleEvent");
852        multi.send(&event).await.unwrap();
853        multi.flush().await.unwrap();
854        multi.close().await.unwrap();
855
856        // Verify both files got the event
857        let contents1 = std::fs::read_to_string(temp1.path()).unwrap();
858        let contents2 = std::fs::read_to_string(temp2.path()).unwrap();
859        assert!(contents1.contains("timestamp"));
860        assert!(contents2.contains("timestamp"));
861    }
862
863    // ==========================================================================
864    // Error Handling Tests
865    // ==========================================================================
866
867    #[test]
868    fn test_file_sink_invalid_path() {
869        // Trying to create a file in a non-existent directory should fail
870        let result = FileSink::new("test", "/nonexistent/path/file.json");
871        assert!(result.is_err());
872    }
873
874    // ==========================================================================
875    // AsyncFileSink Tests
876    // ==========================================================================
877
878    #[tokio::test]
879    async fn test_async_file_sink_basic() {
880        let temp_file = NamedTempFile::new().unwrap();
881        let sink = AsyncFileSink::new("test_async", temp_file.path())
882            .await
883            .unwrap();
884
885        let event = Event::new("AsyncTestEvent").with_field("value", 123i64);
886        assert!(sink.send(&event).await.is_ok());
887        assert!(sink.flush().await.is_ok());
888        assert!(sink.close().await.is_ok());
889
890        // Verify file contains the event
891        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
892        assert!(contents.contains("\"value\":123"));
893        assert!(contents.contains("123"));
894    }
895
896    #[tokio::test]
897    async fn test_async_file_sink_name() {
898        let temp_file = NamedTempFile::new().unwrap();
899        let sink = AsyncFileSink::new("my_async_file", temp_file.path())
900            .await
901            .unwrap();
902        assert_eq!(sink.name(), "my_async_file");
903    }
904
905    #[tokio::test]
906    async fn test_async_file_sink_path() {
907        let temp_file = NamedTempFile::new().unwrap();
908        let expected_path = temp_file.path().to_path_buf();
909        let sink = AsyncFileSink::new("test", temp_file.path()).await.unwrap();
910        assert_eq!(sink.path(), &expected_path);
911    }
912
913    #[tokio::test]
914    async fn test_async_file_sink_multiple_events() {
915        let temp_file = NamedTempFile::new().unwrap();
916        let sink = AsyncFileSink::new("test_async", temp_file.path())
917            .await
918            .unwrap();
919
920        // Write multiple events
921        for i in 0..5 {
922            let event = Event::new("AsyncEvent").with_field("id", i as i64);
923            sink.send(&event).await.unwrap();
924        }
925        sink.flush().await.unwrap();
926
927        // Read and verify all events are written
928        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
929        let lines: Vec<&str> = contents.lines().collect();
930        assert_eq!(lines.len(), 5);
931    }
932
933    #[tokio::test]
934    async fn test_async_file_sink_custom_buffer_size() {
935        let temp_file = NamedTempFile::new().unwrap();
936        // Use a small buffer to trigger auto-flush
937        let sink = AsyncFileSink::with_buffer_size("test", temp_file.path(), 50)
938            .await
939            .unwrap();
940
941        // Write events that should trigger buffer flush
942        for i in 0..10 {
943            let event = Event::new("BufferTest").with_field("id", i as i64);
944            sink.send(&event).await.unwrap();
945        }
946        // Final flush to ensure all data is written
947        sink.flush().await.unwrap();
948
949        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
950        let lines: Vec<&str> = contents.lines().collect();
951        assert_eq!(lines.len(), 10);
952    }
953
954    #[tokio::test]
955    async fn test_async_file_sink_invalid_path() {
956        let result = AsyncFileSink::new("test", "/nonexistent/path/file.json").await;
957        assert!(result.is_err());
958    }
959
960    #[tokio::test]
961    async fn test_async_file_sink_in_multi_sink() {
962        let temp = NamedTempFile::new().unwrap();
963        let async_sink = AsyncFileSink::new("async", temp.path()).await.unwrap();
964
965        let multi = MultiSink::new("multi_with_async").with_sink(Box::new(async_sink));
966
967        let event = Event::new("MultiAsyncEvent");
968        multi.send(&event).await.unwrap();
969        multi.flush().await.unwrap();
970
971        let contents = std::fs::read_to_string(temp.path()).unwrap();
972        assert!(contents.contains("timestamp"));
973    }
974
975    // ==========================================================================
976    // HttpSinkWithRetry Tests
977    // ==========================================================================
978
979    #[test]
980    fn test_http_retry_config_default() {
981        let config = HttpRetryConfig::default();
982        assert_eq!(config.max_retries, 3);
983        assert_eq!(config.initial_delay, std::time::Duration::from_millis(100));
984        assert_eq!(config.max_delay, std::time::Duration::from_secs(5));
985        assert_eq!(config.timeout, std::time::Duration::from_secs(30));
986    }
987
988    #[test]
989    fn test_http_sink_with_retry_creation() {
990        let sink = HttpSinkWithRetry::new("retry_test", "http://localhost:8080/webhook");
991        assert_eq!(sink.name(), "retry_test");
992        assert_eq!(sink.url, "http://localhost:8080/webhook");
993    }
994
995    #[test]
996    fn test_http_sink_with_retry_headers() {
997        let sink = HttpSinkWithRetry::new("test", "http://localhost:8080")
998            .with_header("Authorization", "Bearer token123")
999            .with_header("X-Custom", "value");
1000
1001        assert_eq!(sink.headers.len(), 2);
1002        assert_eq!(
1003            sink.headers.get("Authorization"),
1004            Some(&"Bearer token123".to_string())
1005        );
1006    }
1007
1008    #[test]
1009    fn test_http_sink_with_retry_custom_config() {
1010        let config = HttpRetryConfig {
1011            max_retries: 5,
1012            initial_delay: std::time::Duration::from_millis(200),
1013            max_delay: std::time::Duration::from_secs(10),
1014            timeout: std::time::Duration::from_secs(60),
1015        };
1016        let sink =
1017            HttpSinkWithRetry::new("test", "http://localhost:8080").with_retry_config(config);
1018
1019        assert_eq!(sink.retry_config.max_retries, 5);
1020        assert_eq!(
1021            sink.retry_config.initial_delay,
1022            std::time::Duration::from_millis(200)
1023        );
1024    }
1025
1026    #[tokio::test]
1027    async fn test_http_sink_with_retry_flush_close() {
1028        let sink = HttpSinkWithRetry::new("test", "http://localhost:8080");
1029        assert!(sink.flush().await.is_ok());
1030        assert!(sink.close().await.is_ok());
1031    }
1032
1033    // ==========================================================================
1034    // ResilientSink Tests
1035    // ==========================================================================
1036
1037    /// A mock sink that fails on demand.
1038    struct MockSink {
1039        name: String,
1040        fail: std::sync::atomic::AtomicBool,
1041        send_count: std::sync::atomic::AtomicU64,
1042    }
1043
1044    impl MockSink {
1045        fn new(name: &str) -> Self {
1046            Self {
1047                name: name.to_string(),
1048                fail: std::sync::atomic::AtomicBool::new(false),
1049                send_count: std::sync::atomic::AtomicU64::new(0),
1050            }
1051        }
1052
1053        fn set_fail(&self, fail: bool) {
1054            self.fail.store(fail, std::sync::atomic::Ordering::Relaxed);
1055        }
1056
1057        fn send_count(&self) -> u64 {
1058            self.send_count.load(std::sync::atomic::Ordering::Relaxed)
1059        }
1060    }
1061
1062    #[async_trait]
1063    impl Sink for MockSink {
1064        fn name(&self) -> &str {
1065            &self.name
1066        }
1067
1068        async fn send(&self, _event: &Event) -> Result<(), SinkError> {
1069            if self.fail.load(std::sync::atomic::Ordering::Relaxed) {
1070                Err(SinkError::other("mock send failure"))
1071            } else {
1072                self.send_count
1073                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1074                Ok(())
1075            }
1076        }
1077
1078        async fn flush(&self) -> Result<(), SinkError> {
1079            Ok(())
1080        }
1081        async fn close(&self) -> Result<(), SinkError> {
1082            Ok(())
1083        }
1084    }
1085
1086    #[tokio::test]
1087    async fn test_resilient_sink_success_passthrough() {
1088        let mock = Arc::new(MockSink::new("test-sink"));
1089        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1090            crate::circuit_breaker::CircuitBreakerConfig {
1091                failure_threshold: 3,
1092                reset_timeout: std::time::Duration::from_secs(60),
1093            },
1094        ));
1095        let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1096
1097        let event = Event::new("TestEvent");
1098        assert!(resilient.send(&event).await.is_ok());
1099        assert_eq!(mock.send_count(), 1);
1100        assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1101    }
1102
1103    #[tokio::test]
1104    async fn test_resilient_sink_failure_opens_circuit() {
1105        let mock = Arc::new(MockSink::new("test-sink"));
1106        mock.set_fail(true);
1107
1108        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1109            crate::circuit_breaker::CircuitBreakerConfig {
1110                failure_threshold: 2,
1111                reset_timeout: std::time::Duration::from_secs(60),
1112            },
1113        ));
1114
1115        let dlq_path = std::env::temp_dir().join("varpulis_resilient_test_dlq.jsonl");
1116        let _ = std::fs::remove_file(&dlq_path);
1117        let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1118
1119        let resilient = ResilientSink::new(mock.clone(), cb.clone(), Some(dlq.clone()), None);
1120
1121        let event = Event::new("TestEvent");
1122
1123        // First failure
1124        assert!(resilient.send(&event).await.is_err());
1125        assert_eq!(dlq.count(), 1);
1126
1127        // Second failure → opens circuit
1128        assert!(resilient.send(&event).await.is_err());
1129        assert_eq!(dlq.count(), 2);
1130        assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1131
1132        // Third attempt: circuit open, rejected immediately (no send attempt)
1133        assert!(resilient.send(&event).await.is_err());
1134        assert_eq!(dlq.count(), 3);
1135
1136        let _ = std::fs::remove_file(&dlq_path);
1137    }
1138
1139    #[tokio::test]
1140    async fn test_resilient_sink_batch_with_dlq() {
1141        let mock = Arc::new(MockSink::new("batch-sink"));
1142        mock.set_fail(true);
1143
1144        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1145            crate::circuit_breaker::CircuitBreakerConfig::default(),
1146        ));
1147
1148        let dlq_path = std::env::temp_dir().join("varpulis_resilient_batch_dlq.jsonl");
1149        let _ = std::fs::remove_file(&dlq_path);
1150        let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1151
1152        let resilient = ResilientSink::new(mock, cb, Some(dlq.clone()), None);
1153
1154        let events: Vec<Arc<Event>> = (0..3)
1155            .map(|i| Arc::new(Event::new(format!("Event{i}"))))
1156            .collect();
1157
1158        assert!(resilient.send_batch(&events).await.is_err());
1159        assert_eq!(dlq.count(), 3); // All 3 events written to DLQ
1160
1161        let _ = std::fs::remove_file(&dlq_path);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_resilient_sink_recovery() {
1166        let mock = Arc::new(MockSink::new("recover-sink"));
1167        mock.set_fail(true);
1168
1169        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1170            crate::circuit_breaker::CircuitBreakerConfig {
1171                failure_threshold: 1,
1172                reset_timeout: std::time::Duration::from_millis(10),
1173            },
1174        ));
1175
1176        let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1177
1178        let event = Event::new("TestEvent");
1179
1180        // Fail → opens circuit
1181        assert!(resilient.send(&event).await.is_err());
1182        assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1183
1184        // Wait for reset timeout
1185        tokio::time::sleep(std::time::Duration::from_millis(15)).await;
1186
1187        // Fix the sink
1188        mock.set_fail(false);
1189
1190        // Next request: half-open probe succeeds → closes
1191        assert!(resilient.send(&event).await.is_ok());
1192        assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1193        assert_eq!(mock.send_count(), 1);
1194    }
1195
1196    #[tokio::test]
1197    async fn test_resilient_sink_name_passthrough() {
1198        let mock = Arc::new(MockSink::new("my-kafka-sink"));
1199        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1200            crate::circuit_breaker::CircuitBreakerConfig::default(),
1201        ));
1202        let resilient = ResilientSink::new(mock, cb, None, None);
1203        assert_eq!(resilient.name(), "my-kafka-sink");
1204    }
1205}