Skip to main content

varpulis_runtime/
sink.rs

1//! Sink implementations for outputting processed events
2//!
3//! The `Sink` trait and `SinkError` are defined in `varpulis-connectors` and
4//! re-exported here. This module provides the built-in sink implementations.
5
6use std::fs::{File, OpenOptions};
7use std::io::Write;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use indexmap::IndexMap;
14use serde_json;
15use tokio::sync::Mutex;
16use tracing::{error, warn};
17pub use varpulis_connectors::sink::{Sink, SinkConnectorAdapter, SinkError};
18
19use crate::event::Event;
20
21/// Console sink - prints to stdout
22#[derive(Debug)]
23pub struct ConsoleSink {
24    name: String,
25    pretty: bool,
26}
27
28impl ConsoleSink {
29    pub fn new(name: impl Into<String>) -> Self {
30        Self {
31            name: name.into(),
32            pretty: true,
33        }
34    }
35
36    pub const fn compact(mut self) -> Self {
37        self.pretty = false;
38        self
39    }
40}
41
42#[async_trait]
43impl Sink for ConsoleSink {
44    fn name(&self) -> &str {
45        &self.name
46    }
47
48    async fn send(&self, event: &Event) -> Result<(), SinkError> {
49        if self.pretty {
50            println!(
51                "[{}] {} | {:?}",
52                event.timestamp.format("%H:%M:%S"),
53                event.event_type,
54                event.data
55            );
56        } else {
57            println!("{}", serde_json::to_string(event)?);
58        }
59        Ok(())
60    }
61
62    async fn flush(&self) -> Result<(), SinkError> {
63        Ok(())
64    }
65
66    async fn close(&self) -> Result<(), SinkError> {
67        Ok(())
68    }
69}
70
71/// File sink - writes JSON lines to a file
72#[derive(Debug)]
73pub struct FileSink {
74    name: String,
75    path: PathBuf,
76    file: Arc<Mutex<File>>,
77}
78
79impl FileSink {
80    /// Get the file path
81    pub const fn path(&self) -> &PathBuf {
82        &self.path
83    }
84
85    pub fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
86        let path = path.into();
87        let file = OpenOptions::new().create(true).append(true).open(&path)?;
88
89        Ok(Self {
90            name: name.into(),
91            path,
92            file: Arc::new(Mutex::new(file)),
93        })
94    }
95}
96
97#[async_trait]
98impl Sink for FileSink {
99    fn name(&self) -> &str {
100        &self.name
101    }
102
103    async fn send(&self, event: &Event) -> Result<(), SinkError> {
104        let buf = event.to_sink_payload();
105        let mut file = self.file.lock().await;
106        file.write_all(&buf)?;
107        file.write_all(b"\n")?;
108        Ok(())
109    }
110
111    async fn flush(&self) -> Result<(), SinkError> {
112        let mut file = self.file.lock().await;
113        file.flush()?;
114        Ok(())
115    }
116
117    async fn close(&self) -> Result<(), SinkError> {
118        self.flush().await
119    }
120}
121
122/// Async file sink - writes JSON lines using tokio::fs (non-blocking)
123///
124/// Unlike `FileSink`, this implementation uses async I/O and does not block
125/// the tokio runtime. It also includes buffering for better throughput.
126///
127/// # Example
128/// ```text
129/// let sink = AsyncFileSink::new("output", "/tmp/events.jsonl").await?;
130/// sink.send(&event).await?;
131/// sink.flush().await?;
132/// ```
133#[derive(Debug)]
134pub struct AsyncFileSink {
135    name: String,
136    path: PathBuf,
137    file: Arc<Mutex<tokio::fs::File>>,
138    buffer: Arc<Mutex<Vec<u8>>>,
139    buffer_size: usize,
140}
141
142impl AsyncFileSink {
143    /// Default buffer size (64KB)
144    pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
145
146    /// Get the file path
147    pub const fn path(&self) -> &PathBuf {
148        &self.path
149    }
150
151    /// Create a new async file sink with default buffer size
152    pub async fn new(name: impl Into<String>, path: impl Into<PathBuf>) -> Result<Self, SinkError> {
153        Self::with_buffer_size(name, path, Self::DEFAULT_BUFFER_SIZE).await
154    }
155
156    /// Create a new async file sink with custom buffer size
157    pub async fn with_buffer_size(
158        name: impl Into<String>,
159        path: impl Into<PathBuf>,
160        buffer_size: usize,
161    ) -> Result<Self, SinkError> {
162        use tokio::fs::OpenOptions;
163
164        let path = path.into();
165        let file = OpenOptions::new()
166            .create(true)
167            .append(true)
168            .open(&path)
169            .await?;
170
171        Ok(Self {
172            name: name.into(),
173            path,
174            file: Arc::new(Mutex::new(file)),
175            buffer: Arc::new(Mutex::new(Vec::with_capacity(buffer_size))),
176            buffer_size,
177        })
178    }
179}
180
181#[async_trait]
182impl Sink for AsyncFileSink {
183    fn name(&self) -> &str {
184        &self.name
185    }
186
187    async fn send(&self, event: &Event) -> Result<(), SinkError> {
188        let buf = event.to_sink_payload();
189
190        let should_flush = {
191            let mut buffer = self.buffer.lock().await;
192            buffer.extend_from_slice(&buf);
193            buffer.push(b'\n');
194            buffer.len() >= self.buffer_size
195        };
196
197        if should_flush {
198            self.flush().await?;
199        }
200
201        Ok(())
202    }
203
204    async fn flush(&self) -> Result<(), SinkError> {
205        use tokio::io::AsyncWriteExt;
206
207        let data = {
208            let mut buffer = self.buffer.lock().await;
209            std::mem::take(&mut *buffer)
210        };
211
212        if !data.is_empty() {
213            let mut file = self.file.lock().await;
214            file.write_all(&data).await?;
215            file.flush().await?;
216        }
217
218        Ok(())
219    }
220
221    async fn close(&self) -> Result<(), SinkError> {
222        self.flush().await
223    }
224}
225
226/// HTTP webhook sink
227#[derive(Debug)]
228pub struct HttpSink {
229    name: String,
230    url: String,
231    client: reqwest::Client,
232    headers: IndexMap<String, String>,
233}
234
235impl HttpSink {
236    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
237        Self {
238            name: name.into(),
239            url: url.into(),
240            client: reqwest::Client::new(),
241            headers: IndexMap::new(),
242        }
243    }
244
245    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
246        self.headers.insert(key.into(), value.into());
247        self
248    }
249}
250
251#[async_trait]
252impl Sink for HttpSink {
253    fn name(&self) -> &str {
254        &self.name
255    }
256
257    async fn send(&self, event: &Event) -> Result<(), SinkError> {
258        let mut req = self.client.post(&self.url);
259        for (k, v) in &self.headers {
260            req = req.header(k.as_str(), v.as_str());
261        }
262        req = req.header("Content-Type", "application/json");
263        req = req.body(event.to_sink_payload());
264
265        match req.send().await {
266            Ok(resp) => {
267                if !resp.status().is_success() {
268                    error!("HTTP sink {} got status {}", self.name, resp.status());
269                }
270            }
271            Err(e) => {
272                error!("HTTP sink {} error: {}", self.name, e);
273            }
274        }
275        Ok(())
276    }
277
278    async fn flush(&self) -> Result<(), SinkError> {
279        Ok(())
280    }
281
282    async fn close(&self) -> Result<(), SinkError> {
283        Ok(())
284    }
285}
286
287/// Configuration for HTTP sink retry behavior
288#[derive(Debug, Clone)]
289pub struct HttpRetryConfig {
290    /// Maximum number of retry attempts (0 = no retries)
291    pub max_retries: usize,
292    /// Initial delay between retries (doubles each attempt)
293    pub initial_delay: Duration,
294    /// Maximum delay between retries
295    pub max_delay: Duration,
296    /// Request timeout
297    pub timeout: Duration,
298}
299
300impl Default for HttpRetryConfig {
301    fn default() -> Self {
302        Self {
303            max_retries: 3,
304            initial_delay: Duration::from_millis(100),
305            max_delay: Duration::from_secs(5),
306            timeout: Duration::from_secs(30),
307        }
308    }
309}
310
311/// HTTP webhook sink with retry logic
312///
313/// Unlike `HttpSink`, this implementation retries failed requests with exponential
314/// backoff. It distinguishes between retryable errors (5xx, timeouts, network errors)
315/// and non-retryable errors (4xx client errors).
316///
317/// # Example
318/// ```text
319/// let sink = HttpSinkWithRetry::new("webhook", "https://api.example.com/events")
320///     .with_header("Authorization", "Bearer token123")
321///     .with_retry_config(HttpRetryConfig {
322///         max_retries: 5,
323///         ..Default::default()
324///     });
325/// sink.send(&event).await?;
326/// ```
327#[derive(Debug)]
328pub struct HttpSinkWithRetry {
329    name: String,
330    url: String,
331    client: reqwest::Client,
332    headers: IndexMap<String, String>,
333    retry_config: HttpRetryConfig,
334}
335
336impl HttpSinkWithRetry {
337    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
338        let config = HttpRetryConfig::default();
339        let client = reqwest::Client::builder()
340            .timeout(config.timeout)
341            .build()
342            .unwrap_or_else(|_| reqwest::Client::new());
343
344        Self {
345            name: name.into(),
346            url: url.into(),
347            client,
348            headers: IndexMap::new(),
349            retry_config: config,
350        }
351    }
352
353    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
354        self.headers.insert(key.into(), value.into());
355        self
356    }
357
358    pub fn with_retry_config(mut self, config: HttpRetryConfig) -> Self {
359        self.retry_config = config;
360        // Rebuild client with new timeout
361        self.client = reqwest::Client::builder()
362            .timeout(self.retry_config.timeout)
363            .build()
364            .unwrap_or_else(|_| reqwest::Client::new());
365        self
366    }
367
368    /// Send with retry logic
369    async fn send_with_retry(&self, body: Vec<u8>) -> Result<(), SinkError> {
370        let mut attempt = 0;
371        let mut delay = self.retry_config.initial_delay;
372
373        loop {
374            let mut req = self.client.post(&self.url);
375            for (k, v) in &self.headers {
376                req = req.header(k.as_str(), v.as_str());
377            }
378            req = req.header("Content-Type", "application/json");
379            req = req.body(body.clone());
380
381            match req.send().await {
382                Ok(resp) => {
383                    if resp.status().is_success() {
384                        return Ok(());
385                    } else if resp.status().is_server_error() {
386                        // 5xx: retryable
387                        if attempt >= self.retry_config.max_retries {
388                            return Err(SinkError::other(format!(
389                                "HTTP sink {} failed with status {} after {} retries",
390                                self.name,
391                                resp.status(),
392                                attempt
393                            )));
394                        }
395                        warn!(
396                            "HTTP sink {} got {}, retrying ({}/{})",
397                            self.name,
398                            resp.status(),
399                            attempt + 1,
400                            self.retry_config.max_retries
401                        );
402                    } else {
403                        // 4xx: not retryable (client error)
404                        return Err(SinkError::other(format!(
405                            "HTTP sink {} got client error status {}",
406                            self.name,
407                            resp.status()
408                        )));
409                    }
410                }
411                Err(e) => {
412                    // Network errors and timeouts are retryable
413                    if e.is_timeout() || e.is_connect() || e.is_request() {
414                        if attempt >= self.retry_config.max_retries {
415                            return Err(SinkError::other(format!(
416                                "HTTP sink {} failed with error {} after {} retries",
417                                self.name, e, attempt
418                            )));
419                        }
420                        warn!(
421                            "HTTP sink {} error: {}, retrying ({}/{})",
422                            self.name,
423                            e,
424                            attempt + 1,
425                            self.retry_config.max_retries
426                        );
427                    } else {
428                        // Other errors (e.g., serialization) are not retryable
429                        return Err(e.into());
430                    }
431                }
432            }
433
434            // Exponential backoff
435            attempt += 1;
436            tokio::time::sleep(delay).await;
437            delay = (delay * 2).min(self.retry_config.max_delay);
438        }
439    }
440}
441
442#[async_trait]
443impl Sink for HttpSinkWithRetry {
444    fn name(&self) -> &str {
445        &self.name
446    }
447
448    async fn send(&self, event: &Event) -> Result<(), SinkError> {
449        self.send_with_retry(event.to_sink_payload()).await
450    }
451
452    async fn flush(&self) -> Result<(), SinkError> {
453        Ok(())
454    }
455
456    async fn close(&self) -> Result<(), SinkError> {
457        Ok(())
458    }
459}
460
461/// Multi-sink that broadcasts to multiple sinks
462pub struct MultiSink {
463    name: String,
464    sinks: Vec<Box<dyn Sink>>,
465}
466
467impl std::fmt::Debug for MultiSink {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        f.debug_struct("MultiSink")
470            .field("name", &self.name)
471            .field(
472                "sinks",
473                &self.sinks.iter().map(|s| s.name()).collect::<Vec<_>>(),
474            )
475            .finish()
476    }
477}
478
479impl MultiSink {
480    pub fn new(name: impl Into<String>) -> Self {
481        Self {
482            name: name.into(),
483            sinks: Vec::new(),
484        }
485    }
486
487    pub fn with_sink(mut self, sink: Box<dyn Sink>) -> Self {
488        self.sinks.push(sink);
489        self
490    }
491}
492
493#[async_trait]
494impl Sink for MultiSink {
495    fn name(&self) -> &str {
496        &self.name
497    }
498
499    async fn send(&self, event: &Event) -> Result<(), SinkError> {
500        for sink in &self.sinks {
501            if let Err(e) = sink.send(event).await {
502                error!("Sink {} error: {}", sink.name(), e);
503            }
504        }
505        Ok(())
506    }
507
508    async fn flush(&self) -> Result<(), SinkError> {
509        for sink in &self.sinks {
510            sink.flush().await?;
511        }
512        Ok(())
513    }
514
515    async fn close(&self) -> Result<(), SinkError> {
516        for sink in &self.sinks {
517            sink.close().await?;
518        }
519        Ok(())
520    }
521}
522
523/// A resilient sink wrapper that adds circuit breaker and dead letter queue.
524///
525/// Wraps any [`Sink`] implementation with:
526/// - **Circuit breaker**: rejects sends immediately when the downstream is unhealthy
527/// - **Dead letter queue**: routes failed events to a DLQ file instead of dropping them
528///
529/// When the circuit is open or a send fails, events are written to the DLQ.
530/// When the circuit recovers (half-open → closed), normal delivery resumes.
531pub struct ResilientSink {
532    inner: Arc<dyn Sink>,
533    cb: Arc<crate::circuit_breaker::CircuitBreaker>,
534    dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
535    metrics: Option<crate::metrics::Metrics>,
536}
537
538impl std::fmt::Debug for ResilientSink {
539    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540        f.debug_struct("ResilientSink")
541            .field("inner", &self.inner.name())
542            .field("cb", &self.cb)
543            .field("has_dlq", &self.dlq.is_some())
544            .field("has_metrics", &self.metrics.is_some())
545            .finish_non_exhaustive()
546    }
547}
548
549impl ResilientSink {
550    /// Wrap a sink with circuit breaker protection.
551    pub fn new(
552        inner: Arc<dyn Sink>,
553        cb: Arc<crate::circuit_breaker::CircuitBreaker>,
554        dlq: Option<Arc<crate::dead_letter::DeadLetterQueue>>,
555        metrics: Option<crate::metrics::Metrics>,
556    ) -> Self {
557        Self {
558            inner,
559            cb,
560            dlq,
561            metrics,
562        }
563    }
564
565    fn send_to_dlq(&self, error_msg: &str, events: &[Arc<Event>]) {
566        if let Some(ref dlq) = self.dlq {
567            dlq.write_batch(self.inner.name(), error_msg, events);
568            if let Some(ref metrics) = self.metrics {
569                metrics.dlq_events_total.inc_by(events.len() as f64);
570            }
571        }
572    }
573}
574
575#[async_trait]
576impl Sink for ResilientSink {
577    fn name(&self) -> &str {
578        self.inner.name()
579    }
580
581    async fn connect(&self) -> Result<(), SinkError> {
582        self.inner.connect().await
583    }
584
585    async fn send(&self, event: &Event) -> Result<(), SinkError> {
586        if !self.cb.allow_request() {
587            let arc_event = Arc::new(event.clone());
588            self.send_to_dlq("circuit breaker open", &[arc_event]);
589            return Err(SinkError::other(format!(
590                "circuit breaker open for sink '{}'",
591                self.name()
592            )));
593        }
594
595        match self.inner.send(event).await {
596            Ok(()) => {
597                self.cb.record_success();
598                Ok(())
599            }
600            Err(e) => {
601                self.cb.record_failure();
602                let error_msg = e.to_string();
603                let arc_event = Arc::new(event.clone());
604                self.send_to_dlq(&error_msg, &[arc_event]);
605                Err(e)
606            }
607        }
608    }
609
610    async fn send_batch(&self, events: &[Arc<Event>]) -> Result<(), SinkError> {
611        if !self.cb.allow_request() {
612            self.send_to_dlq("circuit breaker open", events);
613            return Err(SinkError::other(format!(
614                "circuit breaker open for sink '{}'",
615                self.name()
616            )));
617        }
618
619        match self.inner.send_batch(events).await {
620            Ok(()) => {
621                self.cb.record_success();
622                Ok(())
623            }
624            Err(e) => {
625                self.cb.record_failure();
626                self.send_to_dlq(&e.to_string(), events);
627                Err(e)
628            }
629        }
630    }
631
632    async fn flush(&self) -> Result<(), SinkError> {
633        self.inner.flush().await
634    }
635
636    async fn close(&self) -> Result<(), SinkError> {
637        self.inner.close().await
638    }
639}
640
641#[cfg(test)]
642mod tests {
643    use tempfile::NamedTempFile;
644
645    use super::*;
646
647    // ==========================================================================
648    // ConsoleSink Tests
649    // ==========================================================================
650
651    #[tokio::test]
652    async fn test_console_sink() {
653        let sink = ConsoleSink::new("test");
654        let event = Event::new("TestEvent").with_field("value", 42i64);
655        assert!(sink.send(&event).await.is_ok());
656    }
657
658    #[tokio::test]
659    async fn test_console_sink_name() {
660        let sink = ConsoleSink::new("my_console");
661        assert_eq!(sink.name(), "my_console");
662    }
663
664    #[tokio::test]
665    async fn test_console_sink_compact() {
666        let sink = ConsoleSink::new("test").compact();
667        assert!(!sink.pretty);
668        let event = Event::new("TestEvent").with_field("value", 42i64);
669        assert!(sink.send(&event).await.is_ok());
670    }
671
672    #[tokio::test]
673    async fn test_console_sink_flush_close() {
674        let sink = ConsoleSink::new("test");
675        assert!(sink.flush().await.is_ok());
676        assert!(sink.close().await.is_ok());
677    }
678
679    // ==========================================================================
680    // FileSink Tests
681    // ==========================================================================
682
683    #[tokio::test]
684    async fn test_file_sink() {
685        let temp_file = NamedTempFile::new().unwrap();
686        let sink = FileSink::new("test_file", temp_file.path()).unwrap();
687
688        let event = Event::new("TestEvent").with_field("value", 42i64);
689        assert!(sink.send(&event).await.is_ok());
690
691        assert!(sink.flush().await.is_ok());
692        assert!(sink.close().await.is_ok());
693
694        // Verify file contains the event
695        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
696        assert!(contents.contains("\"value\":42"));
697    }
698
699    #[tokio::test]
700    async fn test_file_sink_name() {
701        let temp_file = NamedTempFile::new().unwrap();
702        let sink = FileSink::new("my_file", temp_file.path()).unwrap();
703        assert_eq!(sink.name(), "my_file");
704    }
705
706    // ==========================================================================
707    // HttpSink Tests (no actual network calls)
708    // ==========================================================================
709
710    #[test]
711    fn test_http_sink_new() {
712        let sink = HttpSink::new("http_test", "http://localhost:8080/webhook");
713        assert_eq!(sink.name(), "http_test");
714        assert_eq!(sink.url, "http://localhost:8080/webhook");
715    }
716
717    #[test]
718    fn test_http_sink_with_header() {
719        let sink = HttpSink::new("http_test", "http://localhost:8080")
720            .with_header("Authorization", "Bearer token123")
721            .with_header("X-Custom", "value");
722
723        assert_eq!(sink.headers.len(), 2);
724        assert_eq!(
725            sink.headers.get("Authorization"),
726            Some(&"Bearer token123".to_string())
727        );
728    }
729
730    #[tokio::test]
731    async fn test_http_sink_flush_close() {
732        let sink = HttpSink::new("http_test", "http://localhost:8080");
733        assert!(sink.flush().await.is_ok());
734        assert!(sink.close().await.is_ok());
735    }
736
737    // ==========================================================================
738    // MultiSink Tests
739    // ==========================================================================
740
741    #[tokio::test]
742    async fn test_multi_sink_empty() {
743        let sink = MultiSink::new("multi");
744        assert_eq!(sink.name(), "multi");
745
746        let event = Event::new("Test");
747        assert!(sink.send(&event).await.is_ok());
748        assert!(sink.flush().await.is_ok());
749        assert!(sink.close().await.is_ok());
750    }
751
752    #[tokio::test]
753    async fn test_multi_sink_with_console() {
754        let multi = MultiSink::new("multi")
755            .with_sink(Box::new(ConsoleSink::new("console1")))
756            .with_sink(Box::new(ConsoleSink::new("console2")));
757
758        let event = Event::new("Test").with_field("x", 1i64);
759        assert!(multi.send(&event).await.is_ok());
760
761        assert!(multi.flush().await.is_ok());
762        assert!(multi.close().await.is_ok());
763    }
764
765    #[tokio::test]
766    async fn test_multi_sink_with_file() {
767        let temp_file = NamedTempFile::new().unwrap();
768        let file_sink = FileSink::new("file", temp_file.path()).unwrap();
769
770        let multi = MultiSink::new("multi").with_sink(Box::new(file_sink));
771
772        let event = Event::new("MultiEvent").with_field("val", 100i64);
773        assert!(multi.send(&event).await.is_ok());
774        assert!(multi.flush().await.is_ok());
775
776        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
777        assert!(contents.contains("\"val\":100"));
778    }
779
780    // ==========================================================================
781    // Additional FileSink Tests
782    // ==========================================================================
783
784    #[tokio::test]
785    async fn test_file_sink_path() {
786        let temp_file = NamedTempFile::new().unwrap();
787        let expected_path = temp_file.path().to_path_buf();
788        let sink = FileSink::new("test", temp_file.path()).unwrap();
789
790        assert_eq!(sink.path(), &expected_path);
791    }
792
793    #[tokio::test]
794    async fn test_file_sink_multiple_events() {
795        let temp_file = NamedTempFile::new().unwrap();
796        let sink = FileSink::new("test", temp_file.path()).unwrap();
797
798        // Write multiple events
799        for i in 0..5 {
800            let event = Event::new("Event").with_field("id", i as i64);
801            sink.send(&event).await.unwrap();
802        }
803        sink.flush().await.unwrap();
804
805        // Read and verify all events are written
806        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
807        let lines: Vec<&str> = contents.lines().collect();
808        assert_eq!(lines.len(), 5);
809    }
810
811    // ==========================================================================
812    // Additional MultiSink Tests
813    // ==========================================================================
814
815    #[tokio::test]
816    async fn test_multi_sink_three_sinks() {
817        let temp1 = NamedTempFile::new().unwrap();
818        let temp2 = NamedTempFile::new().unwrap();
819
820        let multi = MultiSink::new("triple")
821            .with_sink(Box::new(ConsoleSink::new("console")))
822            .with_sink(Box::new(FileSink::new("file1", temp1.path()).unwrap()))
823            .with_sink(Box::new(FileSink::new("file2", temp2.path()).unwrap()));
824
825        let event = Event::new("TripleEvent");
826        multi.send(&event).await.unwrap();
827        multi.flush().await.unwrap();
828        multi.close().await.unwrap();
829
830        // Verify both files got the event
831        let contents1 = std::fs::read_to_string(temp1.path()).unwrap();
832        let contents2 = std::fs::read_to_string(temp2.path()).unwrap();
833        assert!(contents1.contains("timestamp"));
834        assert!(contents2.contains("timestamp"));
835    }
836
837    // ==========================================================================
838    // Error Handling Tests
839    // ==========================================================================
840
841    #[test]
842    fn test_file_sink_invalid_path() {
843        // Trying to create a file in a non-existent directory should fail
844        let result = FileSink::new("test", "/nonexistent/path/file.json");
845        assert!(result.is_err());
846    }
847
848    // ==========================================================================
849    // AsyncFileSink Tests
850    // ==========================================================================
851
852    #[tokio::test]
853    async fn test_async_file_sink_basic() {
854        let temp_file = NamedTempFile::new().unwrap();
855        let sink = AsyncFileSink::new("test_async", temp_file.path())
856            .await
857            .unwrap();
858
859        let event = Event::new("AsyncTestEvent").with_field("value", 123i64);
860        assert!(sink.send(&event).await.is_ok());
861        assert!(sink.flush().await.is_ok());
862        assert!(sink.close().await.is_ok());
863
864        // Verify file contains the event
865        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
866        assert!(contents.contains("\"value\":123"));
867        assert!(contents.contains("123"));
868    }
869
870    #[tokio::test]
871    async fn test_async_file_sink_name() {
872        let temp_file = NamedTempFile::new().unwrap();
873        let sink = AsyncFileSink::new("my_async_file", temp_file.path())
874            .await
875            .unwrap();
876        assert_eq!(sink.name(), "my_async_file");
877    }
878
879    #[tokio::test]
880    async fn test_async_file_sink_path() {
881        let temp_file = NamedTempFile::new().unwrap();
882        let expected_path = temp_file.path().to_path_buf();
883        let sink = AsyncFileSink::new("test", temp_file.path()).await.unwrap();
884        assert_eq!(sink.path(), &expected_path);
885    }
886
887    #[tokio::test]
888    async fn test_async_file_sink_multiple_events() {
889        let temp_file = NamedTempFile::new().unwrap();
890        let sink = AsyncFileSink::new("test_async", temp_file.path())
891            .await
892            .unwrap();
893
894        // Write multiple events
895        for i in 0..5 {
896            let event = Event::new("AsyncEvent").with_field("id", i as i64);
897            sink.send(&event).await.unwrap();
898        }
899        sink.flush().await.unwrap();
900
901        // Read and verify all events are written
902        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
903        let lines: Vec<&str> = contents.lines().collect();
904        assert_eq!(lines.len(), 5);
905    }
906
907    #[tokio::test]
908    async fn test_async_file_sink_custom_buffer_size() {
909        let temp_file = NamedTempFile::new().unwrap();
910        // Use a small buffer to trigger auto-flush
911        let sink = AsyncFileSink::with_buffer_size("test", temp_file.path(), 50)
912            .await
913            .unwrap();
914
915        // Write events that should trigger buffer flush
916        for i in 0..10 {
917            let event = Event::new("BufferTest").with_field("id", i as i64);
918            sink.send(&event).await.unwrap();
919        }
920        // Final flush to ensure all data is written
921        sink.flush().await.unwrap();
922
923        let contents = std::fs::read_to_string(temp_file.path()).unwrap();
924        let lines: Vec<&str> = contents.lines().collect();
925        assert_eq!(lines.len(), 10);
926    }
927
928    #[tokio::test]
929    async fn test_async_file_sink_invalid_path() {
930        let result = AsyncFileSink::new("test", "/nonexistent/path/file.json").await;
931        assert!(result.is_err());
932    }
933
934    #[tokio::test]
935    async fn test_async_file_sink_in_multi_sink() {
936        let temp = NamedTempFile::new().unwrap();
937        let async_sink = AsyncFileSink::new("async", temp.path()).await.unwrap();
938
939        let multi = MultiSink::new("multi_with_async").with_sink(Box::new(async_sink));
940
941        let event = Event::new("MultiAsyncEvent");
942        multi.send(&event).await.unwrap();
943        multi.flush().await.unwrap();
944
945        let contents = std::fs::read_to_string(temp.path()).unwrap();
946        assert!(contents.contains("timestamp"));
947    }
948
949    // ==========================================================================
950    // HttpSinkWithRetry Tests
951    // ==========================================================================
952
953    #[test]
954    fn test_http_retry_config_default() {
955        let config = HttpRetryConfig::default();
956        assert_eq!(config.max_retries, 3);
957        assert_eq!(config.initial_delay, std::time::Duration::from_millis(100));
958        assert_eq!(config.max_delay, std::time::Duration::from_secs(5));
959        assert_eq!(config.timeout, std::time::Duration::from_secs(30));
960    }
961
962    #[test]
963    fn test_http_sink_with_retry_creation() {
964        let sink = HttpSinkWithRetry::new("retry_test", "http://localhost:8080/webhook");
965        assert_eq!(sink.name(), "retry_test");
966        assert_eq!(sink.url, "http://localhost:8080/webhook");
967    }
968
969    #[test]
970    fn test_http_sink_with_retry_headers() {
971        let sink = HttpSinkWithRetry::new("test", "http://localhost:8080")
972            .with_header("Authorization", "Bearer token123")
973            .with_header("X-Custom", "value");
974
975        assert_eq!(sink.headers.len(), 2);
976        assert_eq!(
977            sink.headers.get("Authorization"),
978            Some(&"Bearer token123".to_string())
979        );
980    }
981
982    #[test]
983    fn test_http_sink_with_retry_custom_config() {
984        let config = HttpRetryConfig {
985            max_retries: 5,
986            initial_delay: std::time::Duration::from_millis(200),
987            max_delay: std::time::Duration::from_secs(10),
988            timeout: std::time::Duration::from_secs(60),
989        };
990        let sink =
991            HttpSinkWithRetry::new("test", "http://localhost:8080").with_retry_config(config);
992
993        assert_eq!(sink.retry_config.max_retries, 5);
994        assert_eq!(
995            sink.retry_config.initial_delay,
996            std::time::Duration::from_millis(200)
997        );
998    }
999
1000    #[tokio::test]
1001    async fn test_http_sink_with_retry_flush_close() {
1002        let sink = HttpSinkWithRetry::new("test", "http://localhost:8080");
1003        assert!(sink.flush().await.is_ok());
1004        assert!(sink.close().await.is_ok());
1005    }
1006
1007    // ==========================================================================
1008    // ResilientSink Tests
1009    // ==========================================================================
1010
1011    /// A mock sink that fails on demand.
1012    struct MockSink {
1013        name: String,
1014        fail: std::sync::atomic::AtomicBool,
1015        send_count: std::sync::atomic::AtomicU64,
1016    }
1017
1018    impl MockSink {
1019        fn new(name: &str) -> Self {
1020            Self {
1021                name: name.to_string(),
1022                fail: std::sync::atomic::AtomicBool::new(false),
1023                send_count: std::sync::atomic::AtomicU64::new(0),
1024            }
1025        }
1026
1027        fn set_fail(&self, fail: bool) {
1028            self.fail.store(fail, std::sync::atomic::Ordering::Relaxed);
1029        }
1030
1031        fn send_count(&self) -> u64 {
1032            self.send_count.load(std::sync::atomic::Ordering::Relaxed)
1033        }
1034    }
1035
1036    #[async_trait]
1037    impl Sink for MockSink {
1038        fn name(&self) -> &str {
1039            &self.name
1040        }
1041
1042        async fn send(&self, _event: &Event) -> Result<(), SinkError> {
1043            if self.fail.load(std::sync::atomic::Ordering::Relaxed) {
1044                Err(SinkError::other("mock send failure"))
1045            } else {
1046                self.send_count
1047                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1048                Ok(())
1049            }
1050        }
1051
1052        async fn flush(&self) -> Result<(), SinkError> {
1053            Ok(())
1054        }
1055        async fn close(&self) -> Result<(), SinkError> {
1056            Ok(())
1057        }
1058    }
1059
1060    #[tokio::test]
1061    async fn test_resilient_sink_success_passthrough() {
1062        let mock = Arc::new(MockSink::new("test-sink"));
1063        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1064            crate::circuit_breaker::CircuitBreakerConfig {
1065                failure_threshold: 3,
1066                reset_timeout: std::time::Duration::from_secs(60),
1067            },
1068        ));
1069        let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1070
1071        let event = Event::new("TestEvent");
1072        assert!(resilient.send(&event).await.is_ok());
1073        assert_eq!(mock.send_count(), 1);
1074        assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1075    }
1076
1077    #[tokio::test]
1078    async fn test_resilient_sink_failure_opens_circuit() {
1079        let mock = Arc::new(MockSink::new("test-sink"));
1080        mock.set_fail(true);
1081
1082        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1083            crate::circuit_breaker::CircuitBreakerConfig {
1084                failure_threshold: 2,
1085                reset_timeout: std::time::Duration::from_secs(60),
1086            },
1087        ));
1088
1089        let dlq_path = std::env::temp_dir().join("varpulis_resilient_test_dlq.jsonl");
1090        let _ = std::fs::remove_file(&dlq_path);
1091        let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1092
1093        let resilient = ResilientSink::new(mock.clone(), cb.clone(), Some(dlq.clone()), None);
1094
1095        let event = Event::new("TestEvent");
1096
1097        // First failure
1098        assert!(resilient.send(&event).await.is_err());
1099        assert_eq!(dlq.count(), 1);
1100
1101        // Second failure → opens circuit
1102        assert!(resilient.send(&event).await.is_err());
1103        assert_eq!(dlq.count(), 2);
1104        assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1105
1106        // Third attempt: circuit open, rejected immediately (no send attempt)
1107        assert!(resilient.send(&event).await.is_err());
1108        assert_eq!(dlq.count(), 3);
1109
1110        let _ = std::fs::remove_file(&dlq_path);
1111    }
1112
1113    #[tokio::test]
1114    async fn test_resilient_sink_batch_with_dlq() {
1115        let mock = Arc::new(MockSink::new("batch-sink"));
1116        mock.set_fail(true);
1117
1118        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1119            crate::circuit_breaker::CircuitBreakerConfig::default(),
1120        ));
1121
1122        let dlq_path = std::env::temp_dir().join("varpulis_resilient_batch_dlq.jsonl");
1123        let _ = std::fs::remove_file(&dlq_path);
1124        let dlq = Arc::new(crate::dead_letter::DeadLetterQueue::open(&dlq_path).unwrap());
1125
1126        let resilient = ResilientSink::new(mock, cb, Some(dlq.clone()), None);
1127
1128        let events: Vec<Arc<Event>> = (0..3)
1129            .map(|i| Arc::new(Event::new(format!("Event{i}"))))
1130            .collect();
1131
1132        assert!(resilient.send_batch(&events).await.is_err());
1133        assert_eq!(dlq.count(), 3); // All 3 events written to DLQ
1134
1135        let _ = std::fs::remove_file(&dlq_path);
1136    }
1137
1138    #[tokio::test]
1139    async fn test_resilient_sink_recovery() {
1140        let mock = Arc::new(MockSink::new("recover-sink"));
1141        mock.set_fail(true);
1142
1143        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1144            crate::circuit_breaker::CircuitBreakerConfig {
1145                failure_threshold: 1,
1146                reset_timeout: std::time::Duration::from_millis(10),
1147            },
1148        ));
1149
1150        let resilient = ResilientSink::new(mock.clone(), cb.clone(), None, None);
1151
1152        let event = Event::new("TestEvent");
1153
1154        // Fail → opens circuit
1155        assert!(resilient.send(&event).await.is_err());
1156        assert_eq!(cb.state(), crate::circuit_breaker::State::Open);
1157
1158        // Wait for reset timeout
1159        tokio::time::sleep(std::time::Duration::from_millis(15)).await;
1160
1161        // Fix the sink
1162        mock.set_fail(false);
1163
1164        // Next request: half-open probe succeeds → closes
1165        assert!(resilient.send(&event).await.is_ok());
1166        assert_eq!(cb.state(), crate::circuit_breaker::State::Closed);
1167        assert_eq!(mock.send_count(), 1);
1168    }
1169
1170    #[tokio::test]
1171    async fn test_resilient_sink_name_passthrough() {
1172        let mock = Arc::new(MockSink::new("my-kafka-sink"));
1173        let cb = Arc::new(crate::circuit_breaker::CircuitBreaker::new(
1174            crate::circuit_breaker::CircuitBreakerConfig::default(),
1175        ));
1176        let resilient = ResilientSink::new(mock, cb, None, None);
1177        assert_eq!(resilient.name(), "my-kafka-sink");
1178    }
1179}