feedme/
lib.rs

1//! # FeedMe
2//!
3//! **FeedMe is a deterministic, linear, streaming ingest pipeline with mechanical guarantees around memory, ordering, and failure.**
4//!
5//! FeedMe provides a linear, deterministic processing model for Rust applications that need
6//! reliable data ingestion. It emphasizes bounded resource usage, explicit error handling,
7//! and comprehensive observability without affecting execution.
8//!
9//! ## Key Features
10//!
11//! - **Streaming, bounded memory**: Processes one event at a time; memory usage stays flat
12//! - **Deterministic processing**: Same input + same config → same output
13//! - **Structured errors**: Stage, code, and message for every failure
14//! - **Observability**: Metrics exportable (Prometheus or JSON) without affecting execution
15//! - **Extensible**: Add custom stages via a defined plugin contract
16//!
17//! ## Guarantees
18//!
19//! FeedMe provides these mechanical guarantees:
20//!
21//! - Events are processed strictly in input order
22//! - Memory usage is bounded and input-size independent
23//! - Stages cannot observe shared or mutated state
24//! - Validation failures cannot be silently ignored
25//! - Metrics collection cannot influence execution
26//!
27//! ## Example
28//!
29//! ```rust
30//! use feedme::{
31//!     Pipeline, FieldSelect, RequiredFields, StdoutOutput, Deadletter,
32//!     PIIRedaction, Filter, InputSource, Stage
33//! };
34//! use std::path::PathBuf;
35//!
36//! fn main() -> anyhow::Result<()> {
37//!     // Create pipeline: select fields → redact PII → require fields → filter → output
38//!     let mut pipeline = Pipeline::new();
39//!     pipeline.add_stage(Box::new(FieldSelect::new(vec![
40//!         "timestamp".into(), "level".into(), "message".into(), "email".into()
41//!     ])));
42//!     let email_pattern = regex::Regex::new(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b")?;
43//!     pipeline.add_stage(Box::new(PIIRedaction::new(vec![email_pattern])));
44//!     pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".into()])));
45//!     pipeline.add_stage(Box::new(Filter::new(Box::new(|event| {
46//!         event.data.get("level").and_then(|v| v.as_str()) != Some("debug")
47//!     }))));
48//!     pipeline.add_stage(Box::new(StdoutOutput::new()));
49//!
50//!     // Deadletter for errors
51//!     let mut deadletter = Deadletter::new(PathBuf::from("samples/errors.ndjson"));
52//!
53//!     // Process input file
54//!     let mut input = InputSource::File(PathBuf::from("samples/input.ndjson"));
55//!     input.process_input(&mut pipeline, &mut Some(&mut deadletter))?;
56//!
57//!     // Export final metrics
58//!     println!("Pipeline complete. Metrics:");
59//!     for metric in pipeline.export_json_logs() {
60//!         println!("{}", serde_json::to_string(&metric)?);
61//!     }
62//!
63//!     Ok(())
64//! }
65//! ```
66
67use regex::Regex;
68use serde::{Deserialize, Serialize};
69use std::collections::{BTreeMap, HashMap};
70use std::fmt;
71use std::fs;
72use std::io::{self, BufRead};
73use std::path::PathBuf;
74use std::time::Instant;
75
76pub mod invariant_ppt;
77
78#[cfg(test)]
79mod ppt_invariant_contracts;
80
81pub mod replay;
82
83pub(crate) const INVARIANT_PROCESSED_INCREMENTS_ONCE: &str =
84    "processed increments exactly once per process_event";
85pub(crate) const INVARIANT_ERRORS_INCREMENT_ON_ERROR: &str = "errors increment exactly once per error";
86pub(crate) const INVARIANT_DROPPED_ONLY_FOR_NON_OUTPUT_NONE: &str =
87    "dropped increments only when non-output stage returns None";
88pub(crate) const INVARIANT_OUTPUT_NONE_NOT_DROPPED: &str =
89    "output stage returning None does not count as dropped";
90pub(crate) const INVARIANT_LATENCY_RECORDED_ON_SUCCESS: &str =
91    "latency is recorded for each successful stage execution";
92
93/// Type aliases for complex function types to reduce clippy warnings
94pub type EventDerivationFn = Box<dyn Fn(&Event) -> serde_json::Value>;
95pub type ValueConstraintFn = Box<dyn Fn(&serde_json::Value) -> bool>;
96pub type StageFactoryFn = Box<dyn Fn() -> Box<dyn Stage>>;
97
98/// Represents a structured event in the pipeline.
99/// Owned, mutable data, supports JSON-like types, typed field access, optional metadata.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Event {
102    /// The main data payload, JSON-like.
103    pub data: serde_json::Value,
104    /// Optional metadata associated with the event.
105    pub metadata: Option<BTreeMap<String, serde_json::Value>>,
106}
107
108impl Event {
109    /// Create a new event from raw input (assuming JSON for now).
110    pub fn from_raw_input(input: &str) -> anyhow::Result<Self> {
111        let data: serde_json::Value = serde_json::from_str(input)?;
112        Ok(Event {
113            data,
114            metadata: None,
115        })
116    }
117
118    /// Typed field access for strings.
119    pub fn get_string(&self, key: &str) -> Option<&str> {
120        self.data.get(key)?.as_str()
121    }
122
123    /// Typed field access for numbers.
124    pub fn get_number(&self, key: &str) -> Option<f64> {
125        self.data.get(key)?.as_f64()
126    }
127
128    // Add more typed access as needed.
129}
130
131/// Error taxonomy for pipeline failures.
132/// Explicit category, stage attribution, machine-readable code.
133#[derive(Debug, Clone)]
134pub enum PipelineError {
135    Parse(ParseError),
136    Transform(TransformError),
137    Validation(ValidationError),
138    Output(OutputError),
139    System(SystemError),
140}
141
142impl fmt::Display for PipelineError {
143    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144        match self {
145            PipelineError::Parse(e) => write!(f, "Parse error: {}", e.message),
146            PipelineError::Transform(e) => write!(f, "Transform error: {}", e.message),
147            PipelineError::Validation(e) => write!(f, "Validation error: {}", e.message),
148            PipelineError::Output(e) => write!(f, "Output error: {}", e.message),
149            PipelineError::System(e) => write!(f, "System error: {}", e.message),
150        }
151    }
152}
153
154impl PipelineError {
155    pub fn category(&self) -> &str {
156        match self {
157            PipelineError::Parse(_) => "Parse",
158            PipelineError::Transform(_) => "Transform",
159            PipelineError::Validation(_) => "Validation",
160            PipelineError::Output(_) => "Output",
161            PipelineError::System(_) => "System",
162        }
163    }
164
165    pub fn stage(&self) -> &str {
166        match self {
167            PipelineError::Parse(e) => &e.stage,
168            PipelineError::Transform(e) => &e.stage,
169            PipelineError::Validation(e) => &e.stage,
170            PipelineError::Output(e) => &e.stage,
171            PipelineError::System(e) => &e.stage,
172        }
173    }
174
175    pub fn code(&self) -> String {
176        match self {
177            PipelineError::Parse(e) => e.code.to_string(),
178            PipelineError::Transform(e) => e.code.to_string(),
179            PipelineError::Validation(e) => e.code.to_string(),
180            PipelineError::Output(e) => e.code.to_string(),
181            PipelineError::System(e) => e.code.to_string(),
182        }
183    }
184
185    pub fn message(&self) -> &str {
186        match self {
187            PipelineError::Parse(e) => &e.message,
188            PipelineError::Transform(e) => &e.message,
189            PipelineError::Validation(e) => &e.message,
190            PipelineError::Output(e) => &e.message,
191            PipelineError::System(e) => &e.message,
192        }
193    }
194}
195
196impl std::error::Error for PipelineError {}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub enum ParseErrorCode {
200    ParseError,
201    Utf8Error,
202    JsonError,
203    Test,
204}
205
206impl fmt::Display for ParseErrorCode {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        match self {
209            ParseErrorCode::ParseError => write!(f, "PARSE_ERROR"),
210            ParseErrorCode::Utf8Error => write!(f, "UTF8_ERROR"),
211            ParseErrorCode::JsonError => write!(f, "JSON_ERROR"),
212            ParseErrorCode::Test => write!(f, "TEST"),
213        }
214    }
215}
216
217#[derive(Debug, Clone)]
218pub struct ParseError {
219    pub stage: String,
220    pub code: ParseErrorCode,
221    pub message: String,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub enum TransformErrorCode {
226    MissingField,
227    TypeMismatch,
228    ConstraintViolation,
229    Test,
230}
231
232impl fmt::Display for TransformErrorCode {
233    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234        match self {
235            TransformErrorCode::MissingField => write!(f, "MISSING_FIELD"),
236            TransformErrorCode::TypeMismatch => write!(f, "TYPE_MISMATCH"),
237            TransformErrorCode::ConstraintViolation => write!(f, "CONSTRAINT_VIOLATION"),
238            TransformErrorCode::Test => write!(f, "TEST"),
239        }
240    }
241}
242
243#[derive(Debug, Clone)]
244pub struct TransformError {
245    pub stage: String,
246    pub code: TransformErrorCode,
247    pub message: String,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub enum ValidationErrorCode {
252    MissingField,
253    TypeMismatch,
254    ConstraintViolation,
255    Test,
256}
257
258impl fmt::Display for ValidationErrorCode {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        match self {
261            ValidationErrorCode::MissingField => write!(f, "MISSING_FIELD"),
262            ValidationErrorCode::TypeMismatch => write!(f, "TYPE_MISMATCH"),
263            ValidationErrorCode::ConstraintViolation => write!(f, "CONSTRAINT_VIOLATION"),
264            ValidationErrorCode::Test => write!(f, "TEST"),
265        }
266    }
267}
268
269#[derive(Debug, Clone)]
270pub struct ValidationError {
271    pub stage: String,
272    pub code: ValidationErrorCode,
273    pub message: String,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub enum OutputErrorCode {
278    SerializeError,
279    IoError,
280    Test,
281}
282
283impl fmt::Display for OutputErrorCode {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        match self {
286            OutputErrorCode::SerializeError => write!(f, "SERIALIZE_ERROR"),
287            OutputErrorCode::IoError => write!(f, "IO_ERROR"),
288            OutputErrorCode::Test => write!(f, "TEST"),
289        }
290    }
291}
292
293#[derive(Debug, Clone)]
294pub struct OutputError {
295    pub stage: String,
296    pub code: OutputErrorCode,
297    pub message: String,
298}
299
300#[derive(Debug, Clone, PartialEq, Eq)]
301pub enum SystemErrorCode {
302    IoError,
303    Test,
304}
305
306impl fmt::Display for SystemErrorCode {
307    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308        match self {
309            SystemErrorCode::IoError => write!(f, "IO_ERROR"),
310            SystemErrorCode::Test => write!(f, "TEST"),
311        }
312    }
313}
314
315#[derive(Debug, Clone)]
316pub struct SystemError {
317    pub stage: String,
318    pub code: SystemErrorCode,
319    pub message: String,
320}
321
322#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
323pub enum DropReason {
324    Filtered,
325}
326
327impl fmt::Display for DropReason {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329        match self {
330            DropReason::Filtered => write!(f, "filtered"),
331        }
332    }
333}
334
335/// Metrics for observability: counters, latency summaries, drop reason codes.
336/// No execution feedback loops. Bounded storage.
337#[derive(Debug)]
338pub struct Metrics {
339    events_processed: u64,
340    events_dropped: u64,
341    errors: u64,
342    stage_latencies: HashMap<String, LatencyStats>, // bounded stats
343    drop_reasons: HashMap<DropReason, u64>,         // bounded reasons
344}
345
346#[derive(Debug, Clone)]
347pub struct LatencyStats {
348    pub count: u64,
349    pub sum: f64,
350    pub min: f64,
351    pub max: f64,
352}
353
354impl LatencyStats {
355    pub fn new() -> Self {
356        LatencyStats {
357            count: 0,
358            sum: 0.0,
359            min: f64::INFINITY,
360            max: f64::NEG_INFINITY,
361        }
362    }
363
364    pub fn record(&mut self, duration: f64) {
365        self.count += 1;
366        self.sum += duration;
367        if duration < self.min {
368            self.min = duration;
369        }
370        if duration > self.max {
371            self.max = duration;
372        }
373    }
374}
375
376impl Default for LatencyStats {
377    fn default() -> Self {
378        Self::new()
379    }
380}
381
382impl Metrics {
383    pub fn new() -> Self {
384        Metrics {
385            events_processed: 0,
386            events_dropped: 0,
387            errors: 0,
388            stage_latencies: HashMap::new(),
389            drop_reasons: HashMap::new(),
390        }
391    }
392
393    pub fn increment_processed(&mut self) {
394        self.events_processed += 1;
395    }
396
397    pub fn increment_dropped(&mut self, reason: DropReason) {
398        self.events_dropped += 1;
399        *self.drop_reasons.entry(reason).or_insert(0) += 1;
400    }
401
402    pub fn increment_errors(&mut self) {
403        self.errors += 1;
404    }
405
406    pub fn record_latency(&mut self, stage: &str, duration: f64) {
407        self.stage_latencies
408            .entry(stage.to_string())
409            .or_default()
410            .record(duration);
411    }
412
413    pub fn to_prometheus(&self) -> String {
414        let mut output = String::new();
415        output.push_str("# HELP feedme_events_processed_total Total events processed\n");
416        output.push_str(&format!(
417            "feedme_events_processed_total {}\n",
418            self.events_processed
419        ));
420        output.push_str("# HELP feedme_events_dropped_total Total events dropped\n");
421        output.push_str(&format!(
422            "feedme_events_dropped_total {}\n",
423            self.events_dropped
424        ));
425        output.push_str("# HELP feedme_errors_total Total errors\n");
426        output.push_str(&format!("feedme_errors_total {}\n", self.errors));
427        output.push_str("# HELP feedme_stage_latency_ms Stage latency in milliseconds\n");
428        output.push_str("# TYPE feedme_stage_latency_ms gauge\n");
429        for (stage, stats) in &self.stage_latencies {
430            if stats.count > 0 {
431                output.push_str(&format!(
432                    "feedme_stage_latency_ms_sum{{stage=\"{}\"}} {}\n",
433                    stage, stats.sum
434                ));
435                output.push_str(&format!(
436                    "feedme_stage_latency_ms_count{{stage=\"{}\"}} {}\n",
437                    stage, stats.count
438                ));
439                output.push_str(&format!(
440                    "feedme_stage_latency_ms_min{{stage=\"{}\"}} {}\n",
441                    stage, stats.min
442                ));
443                output.push_str(&format!(
444                    "feedme_stage_latency_ms_max{{stage=\"{}\"}} {}\n",
445                    stage, stats.max
446                ));
447            }
448        }
449        output.push_str("# HELP feedme_drop_reasons_total Drop reasons\n");
450        output.push_str("# TYPE feedme_drop_reasons_total counter\n");
451        for (reason, count) in &self.drop_reasons {
452            output.push_str(&format!(
453                "feedme_drop_reasons_total{{reason=\"{}\"}} {}\n",
454                reason, count
455            ));
456        }
457        output
458    }
459
460    pub fn to_json_logs(&self) -> Vec<String> {
461        let mut logs = Vec::new();
462        logs.push(
463            serde_json::json!({
464                "metric": "events_processed",
465                "value": self.events_processed
466            })
467            .to_string(),
468        );
469        logs.push(
470            serde_json::json!({
471                "metric": "events_dropped",
472                "value": self.events_dropped
473            })
474            .to_string(),
475        );
476        logs.push(
477            serde_json::json!({
478                "metric": "errors",
479                "value": self.errors
480            })
481            .to_string(),
482        );
483        for (stage, stats) in &self.stage_latencies {
484            logs.push(
485                serde_json::json!({
486                    "metric": "stage_latencies",
487                    "stage": stage,
488                    "count": stats.count,
489                    "sum": stats.sum,
490                    "min": stats.min,
491                    "max": stats.max
492                })
493                .to_string(),
494            );
495        }
496        for (reason, count) in &self.drop_reasons {
497            logs.push(
498                serde_json::json!({
499                    "metric": "drop_reasons",
500                    "reason": reason,
501                    "count": count
502                })
503                .to_string(),
504            );
505        }
506        logs
507    }
508}
509
510impl Default for Metrics {
511    fn default() -> Self {
512        Self::new()
513    }
514}
515
516/// Stage contract: ownership-based execution.
517/// Takes `Event`, returns `Option<Event>`, with explicit drop semantics.
518pub trait Stage {
519    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError>;
520    fn name(&self) -> &str;
521    fn is_output(&self) -> bool {
522        false
523    }
524}
525
526/// Pipeline: linear, deterministic execution of stages.
527/// No distributed coordination, constant memory streaming.
528pub struct Pipeline {
529    stages: Vec<Box<dyn Stage>>,
530    metrics: Metrics,
531}
532
533impl Pipeline {
534    pub fn new() -> Self {
535        Pipeline {
536            stages: Vec::new(),
537            metrics: Metrics::new(),
538        }
539    }
540
541    pub fn add_stage(&mut self, stage: Box<dyn Stage>) {
542        self.stages.push(stage);
543    }
544
545    pub fn process_event(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
546        let prev_processed = self.metrics.events_processed;
547        let prev_errors = self.metrics.errors;
548        let prev_dropped = self.metrics.events_dropped;
549
550        self.metrics.increment_processed();
551        invariant_ppt::assert_invariant(
552            self.metrics.events_processed == prev_processed + 1,
553            INVARIANT_PROCESSED_INCREMENTS_ONCE,
554            Some("Pipeline::process_event"),
555        );
556
557        let mut current = Some(event);
558        for stage in &mut self.stages {
559            if let Some(evt) = current {
560                let start = Instant::now();
561                match stage.execute(evt) {
562                    Ok(opt) => {
563                        let duration = start.elapsed().as_secs_f64() * 1000.0;
564
565                        let prev_stage_count = self
566                            .metrics
567                            .stage_latencies
568                            .get(stage.name())
569                            .map(|s| s.count)
570                            .unwrap_or(0);
571                        self.metrics.record_latency(stage.name(), duration);
572                        let new_stage_count = self
573                            .metrics
574                            .stage_latencies
575                            .get(stage.name())
576                            .map(|s| s.count)
577                            .unwrap_or(0);
578                        invariant_ppt::assert_invariant(
579                            new_stage_count == prev_stage_count + 1,
580                            INVARIANT_LATENCY_RECORDED_ON_SUCCESS,
581                            Some("Pipeline::process_event"),
582                        );
583
584                        current = opt;
585                        if current.is_none() {
586                            if !stage.is_output() {
587                                let dropped_before = self.metrics.events_dropped;
588                                let reason_before = *self
589                                    .metrics
590                                    .drop_reasons
591                                    .get(&DropReason::Filtered)
592                                    .unwrap_or(&0);
593
594                                self.metrics.increment_dropped(DropReason::Filtered);
595
596                                let reason_after = *self
597                                    .metrics
598                                    .drop_reasons
599                                    .get(&DropReason::Filtered)
600                                    .unwrap_or(&0);
601
602                                invariant_ppt::assert_invariant(
603                                    self.metrics.events_dropped == dropped_before + 1
604                                        && reason_after == reason_before + 1,
605                                    INVARIANT_DROPPED_ONLY_FOR_NON_OUTPUT_NONE,
606                                    Some("Pipeline::process_event"),
607                                );
608                            } else {
609                                invariant_ppt::assert_invariant(
610                                    self.metrics.events_dropped == prev_dropped,
611                                    INVARIANT_OUTPUT_NONE_NOT_DROPPED,
612                                    Some("Pipeline::process_event"),
613                                );
614                            }
615                        }
616                    }
617                    Err(e) => {
618                        self.metrics.increment_errors();
619                        invariant_ppt::assert_invariant(
620                            self.metrics.errors == prev_errors + 1,
621                            INVARIANT_ERRORS_INCREMENT_ON_ERROR,
622                            Some("Pipeline::process_event"),
623                        );
624                        return Err(e);
625                    }
626                }
627            } else {
628                break;
629            }
630        }
631
632        // Sanity: these counters should never run backward.
633        invariant_ppt::assert_invariant(
634            self.metrics.events_processed >= prev_processed
635                && self.metrics.errors >= prev_errors
636                && self.metrics.events_dropped >= prev_dropped,
637            "metrics counters are monotonic",
638            Some("Pipeline::process_event"),
639        );
640        Ok(current)
641    }
642
643    pub fn export_prometheus(&self) -> String {
644        self.metrics.to_prometheus()
645    }
646
647    pub fn export_json_logs(&self) -> Vec<String> {
648        self.metrics.to_json_logs()
649    }
650}
651
652impl Default for Pipeline {
653    fn default() -> Self {
654        Self::new()
655    }
656}
657
658/// Input sources: local, synchronous, stream-oriented, ordered read.
659/// No distributed offsets, no remote coordination.
660pub enum InputSource {
661    Stdin,
662    File(PathBuf),
663    Directory(PathBuf), // non-recursive
664}
665
666impl InputSource {
667    pub fn process_input(
668        &mut self,
669        pipeline: &mut Pipeline,
670        deadletter: &mut Option<&mut dyn Stage>,
671    ) -> Result<(), PipelineError> {
672        match self {
673            InputSource::Stdin => {
674                let stdin = io::stdin();
675                let lines = stdin.lines();
676                for line in lines {
677                    let line = line.map_err(|e| {
678                        PipelineError::System(SystemError {
679                            stage: "Input_Stdin".to_string(),
680                            code: SystemErrorCode::IoError,
681                            message: e.to_string(),
682                        })
683                    })?;
684                    let event = match Event::from_raw_input(&line) {
685                        Ok(e) => e,
686                        Err(e) => {
687                            if let Some(ref mut dl) = *deadletter {
688                                let error_event = Event {
689                                    data: serde_json::json!({
690                                        "error": "parse",
691                                        "stage": "Input_Stdin",
692                                        "code": "PARSE_ERROR",
693                                        "message": e.to_string(),
694                                        "raw": line
695                                    }),
696                                    metadata: None,
697                                };
698                                let _ = dl.execute(error_event); // ignore error in deadletter
699                            } else {
700                                return Err(PipelineError::Parse(ParseError {
701                                    stage: "Input_Stdin".to_string(),
702                                    code: ParseErrorCode::ParseError,
703                                    message: e.to_string(),
704                                }));
705                            }
706                            continue;
707                        }
708                    };
709                    match pipeline.process_event(event) {
710                        Ok(_) => {}
711                        Err(e) => {
712                            if let Some(ref mut dl) = *deadletter {
713                                let error_event = Event {
714                                    data: serde_json::json!({
715                                        "error": "pipeline",
716                                        "category": e.category(),
717                                        "stage": e.stage(),
718                                        "code": e.code(),
719                                        "message": e.message(),
720                                        "raw": line
721                                    }),
722                                    metadata: None,
723                                };
724                                let _ = dl.execute(error_event);
725                            } else {
726                                return Err(e);
727                            }
728                        }
729                    }
730                }
731                Ok(())
732            }
733            InputSource::File(path) => {
734                let file = fs::File::open(path).map_err(|e| {
735                    PipelineError::System(SystemError {
736                        stage: "Input_File".to_string(),
737                        code: SystemErrorCode::IoError,
738                        message: e.to_string(),
739                    })
740                })?;
741                let lines = io::BufReader::new(file).lines();
742                for line in lines {
743                    let line = line.map_err(|e| {
744                        PipelineError::System(SystemError {
745                            stage: "Input_File".to_string(),
746                            code: SystemErrorCode::IoError,
747                            message: e.to_string(),
748                        })
749                    })?;
750                    let event = match Event::from_raw_input(&line) {
751                        Ok(e) => e,
752                        Err(e) => {
753                            if let Some(ref mut dl) = *deadletter {
754                                let error_event = Event {
755                                    data: serde_json::json!({
756                                        "error": "parse",
757                                        "stage": "Input_File",
758                                        "code": "PARSE_ERROR",
759                                        "message": e.to_string(),
760                                        "raw": line
761                                    }),
762                                    metadata: None,
763                                };
764                                let _ = dl.execute(error_event);
765                            } else {
766                                return Err(PipelineError::Parse(ParseError {
767                                    stage: "Input_File".to_string(),
768                                    code: ParseErrorCode::ParseError,
769                                    message: e.to_string(),
770                                }));
771                            }
772                            continue;
773                        }
774                    };
775                    match pipeline.process_event(event) {
776                        Ok(_) => {}
777                        Err(e) => {
778                            if let Some(ref mut dl) = *deadletter {
779                                let error_event = Event {
780                                    data: serde_json::json!({
781                                        "error": "pipeline",
782                                        "category": e.category(),
783                                        "stage": e.stage(),
784                                        "code": e.code(),
785                                        "message": e.message(),
786                                        "raw": line
787                                    }),
788                                    metadata: None,
789                                };
790                                let _ = dl.execute(error_event);
791                            } else {
792                                return Err(e);
793                            }
794                        }
795                    }
796                }
797                Ok(())
798            }
799            InputSource::Directory(dir) => {
800                let entries = fs::read_dir(dir).map_err(|e| {
801                    PipelineError::System(SystemError {
802                        stage: "Input_Directory".to_string(),
803                        code: SystemErrorCode::IoError,
804                        message: e.to_string(),
805                    })
806                })?;
807                let mut paths: Vec<PathBuf> = Vec::new();
808                for entry in entries {
809                    let entry = entry.map_err(|e| {
810                        PipelineError::System(SystemError {
811                            stage: "Input_Directory".to_string(),
812                            code: SystemErrorCode::IoError,
813                            message: e.to_string(),
814                        })
815                    })?;
816                    let path = entry.path();
817                    if path.is_file() {
818                        paths.push(path);
819                    }
820                    // Non-recursive, so no subdirs
821                }
822                paths.sort();
823                for path in paths {
824                    let mut file_input = InputSource::File(path);
825                    file_input.process_input(pipeline, deadletter)?;
826                }
827                Ok(())
828            }
829        }
830    }
831}
832
833/// Parsers: convert raw bytes to Event with explicit error handling.
834/// Best effort syslog, zero copy where possible, no implicit recovery.
835pub trait Parser {
836    fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError>;
837}
838
839pub struct NDJSONParser;
840
841impl Parser for NDJSONParser {
842    fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError> {
843        let s = std::str::from_utf8(raw).map_err(|e| {
844            PipelineError::Parse(ParseError {
845                stage: "NDJSON".to_string(),
846                code: ParseErrorCode::Utf8Error,
847                message: e.to_string(),
848            })
849        })?;
850        Event::from_raw_input(s).map_err(|e| {
851            PipelineError::Parse(ParseError {
852                stage: "NDJSON".to_string(),
853                code: ParseErrorCode::JsonError,
854                message: e.to_string(),
855            })
856        })
857    }
858}
859
860pub struct JSONArrayParser;
861
862impl Parser for JSONArrayParser {
863    fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError> {
864        let s = std::str::from_utf8(raw).map_err(|e| {
865            PipelineError::Parse(ParseError {
866                stage: "JSONArray".to_string(),
867                code: ParseErrorCode::Utf8Error,
868                message: e.to_string(),
869            })
870        })?;
871        let value: serde_json::Value = serde_json::from_str(s).map_err(|e| {
872            PipelineError::Parse(ParseError {
873                stage: "JSONArray".to_string(),
874                code: ParseErrorCode::JsonError,
875                message: e.to_string(),
876            })
877        })?;
878        // For array, perhaps wrap in an event with the array as data
879        Ok(Event {
880            data: value,
881            metadata: None,
882        })
883    }
884}
885
886pub struct SyslogParser;
887
888impl Parser for SyslogParser {
889    fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError> {
890        // Best effort syslog parsing: simple regex or basic parsing
891        let s = std::str::from_utf8(raw).map_err(|e| {
892            PipelineError::Parse(ParseError {
893                stage: "Syslog".to_string(),
894                code: ParseErrorCode::Utf8Error,
895                message: e.to_string(),
896            })
897        })?;
898        // Simple syslog: <pri>timestamp host message
899        // For now, create an event with the raw string
900        Ok(Event {
901            data: serde_json::json!({ "message": s }),
902            metadata: None,
903        })
904    }
905}
906
907/// Transforms: bounded, explicit modification or filtering of events.
908/// Deterministic, side-effect free, no network, no persistence.
909pub trait Transform: Stage {}
910
911pub struct FieldSelect {
912    fields: Vec<String>,
913}
914
915impl FieldSelect {
916    pub fn new(fields: Vec<String>) -> Self {
917        FieldSelect { fields }
918    }
919}
920
921impl Stage for FieldSelect {
922    fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
923        if let serde_json::Value::Object(ref mut map) = event.data {
924            let mut new_map = serde_json::Map::new();
925            for field in &self.fields {
926                if let Some(value) = map.remove(field) {
927                    new_map.insert(field.clone(), value);
928                }
929            }
930            event.data = serde_json::Value::Object(new_map);
931        }
932        Ok(Some(event))
933    }
934
935    fn name(&self) -> &str {
936        "FieldSelect"
937    }
938}
939
940impl Transform for FieldSelect {}
941
942pub struct FieldRemap {
943    mappings: HashMap<String, String>,
944}
945
946impl FieldRemap {
947    pub fn new(mappings: HashMap<String, String>) -> Self {
948        FieldRemap { mappings }
949    }
950}
951
952impl Stage for FieldRemap {
953    fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
954        if let serde_json::Value::Object(ref mut map) = event.data {
955            for (old_key, new_key) in &self.mappings {
956                if let Some(value) = map.remove(old_key) {
957                    map.insert(new_key.clone(), value);
958                }
959            }
960        }
961        Ok(Some(event))
962    }
963
964    fn name(&self) -> &str {
965        "FieldRemap"
966    }
967}
968
969impl Transform for FieldRemap {}
970
971pub struct PIIRedaction {
972    patterns: Vec<Regex>,
973}
974
975impl PIIRedaction {
976    pub fn new(patterns: Vec<Regex>) -> Self {
977        PIIRedaction { patterns }
978    }
979}
980
981impl Stage for PIIRedaction {
982    fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
983        if let serde_json::Value::Object(ref mut map) = event.data {
984            for (_, value) in map.iter_mut() {
985                if let serde_json::Value::String(ref mut s) = value {
986                    for pattern in &self.patterns {
987                        *s = pattern.replace_all(s, "[REDACTED]").to_string();
988                    }
989                }
990            }
991        }
992        Ok(Some(event))
993    }
994
995    fn name(&self) -> &str {
996        "PIIRedaction"
997    }
998}
999
1000impl Transform for PIIRedaction {}
1001
1002pub struct DerivedFields {
1003    derivations: HashMap<String, EventDerivationFn>,
1004}
1005
1006impl DerivedFields {
1007    pub fn new(derivations: HashMap<String, EventDerivationFn>) -> Self {
1008        DerivedFields { derivations }
1009    }
1010}
1011
1012impl Stage for DerivedFields {
1013    fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
1014        let mut new_values = Vec::new();
1015        for (key, func) in &self.derivations {
1016            new_values.push((key.clone(), func(&event)));
1017        }
1018        if let serde_json::Value::Object(ref mut map) = event.data {
1019            for (key, value) in new_values {
1020                map.insert(key, value);
1021            }
1022        }
1023        Ok(Some(event))
1024    }
1025
1026    fn name(&self) -> &str {
1027        "DerivedFields"
1028    }
1029}
1030
1031impl Transform for DerivedFields {}
1032
1033pub struct Filter {
1034    condition: Box<dyn Fn(&Event) -> bool>,
1035}
1036
1037impl Filter {
1038    pub fn new(condition: Box<dyn Fn(&Event) -> bool>) -> Self {
1039        Filter { condition }
1040    }
1041}
1042
1043impl Stage for Filter {
1044    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1045        if (self.condition)(&event) {
1046            Ok(Some(event))
1047        } else {
1048            Ok(None)
1049        }
1050    }
1051
1052    fn name(&self) -> &str {
1053        "Filter"
1054    }
1055}
1056
1057impl Transform for Filter {}
1058
1059/// Validators: enforce structural and semantic correctness of events before output.
1060/// Schema enforced, fail closed, no silent acceptance.
1061pub trait Validator: Stage {}
1062
1063pub struct RequiredFields {
1064    fields: Vec<String>,
1065}
1066
1067impl RequiredFields {
1068    pub fn new(fields: Vec<String>) -> Self {
1069        RequiredFields { fields }
1070    }
1071}
1072
1073impl Stage for RequiredFields {
1074    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1075        if let serde_json::Value::Object(map) = &event.data {
1076            for field in &self.fields {
1077                if !map.contains_key(field) {
1078                    return Err(PipelineError::Validation(ValidationError {
1079                        stage: "RequiredFields".to_string(),
1080                        code: ValidationErrorCode::MissingField,
1081                        message: format!("Missing required field: {}", field),
1082                    }));
1083                }
1084            }
1085        }
1086        Ok(Some(event))
1087    }
1088
1089    fn name(&self) -> &str {
1090        "RequiredFields"
1091    }
1092}
1093
1094impl Validator for RequiredFields {}
1095
1096pub struct TypeChecking {
1097    type_checks: HashMap<String, String>, // field -> expected type (e.g., "string", "number")
1098}
1099
1100impl TypeChecking {
1101    pub fn new(type_checks: HashMap<String, String>) -> Self {
1102        TypeChecking { type_checks }
1103    }
1104}
1105
1106impl Stage for TypeChecking {
1107    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1108        if let serde_json::Value::Object(map) = &event.data {
1109            for (field, expected_type) in &self.type_checks {
1110                if let Some(value) = map.get(field) {
1111                    let actual_type = match value {
1112                        serde_json::Value::String(_) => "string",
1113                        serde_json::Value::Number(_) => "number",
1114                        serde_json::Value::Bool(_) => "boolean",
1115                        serde_json::Value::Object(_) => "object",
1116                        serde_json::Value::Array(_) => "array",
1117                        serde_json::Value::Null => "null",
1118                    };
1119                    if actual_type != expected_type {
1120                        return Err(PipelineError::Validation(ValidationError {
1121                            stage: "TypeChecking".to_string(),
1122                            code: ValidationErrorCode::TypeMismatch,
1123                            message: format!(
1124                                "Field {} expected {} but got {}",
1125                                field, expected_type, actual_type
1126                            ),
1127                        }));
1128                    }
1129                }
1130            }
1131        }
1132        Ok(Some(event))
1133    }
1134
1135    fn name(&self) -> &str {
1136        "TypeChecking"
1137    }
1138}
1139
1140impl Validator for TypeChecking {}
1141
1142pub struct ValueConstraints {
1143    constraints: HashMap<String, ValueConstraintFn>,
1144}
1145
1146impl ValueConstraints {
1147    pub fn new(constraints: HashMap<String, ValueConstraintFn>) -> Self {
1148        ValueConstraints { constraints }
1149    }
1150}
1151
1152impl Stage for ValueConstraints {
1153    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1154        if let serde_json::Value::Object(map) = &event.data {
1155            for (field, check) in &self.constraints {
1156                if let Some(value) = map.get(field) {
1157                    if !check(value) {
1158                        return Err(PipelineError::Validation(ValidationError {
1159                            stage: "ValueConstraints".to_string(),
1160                            code: ValidationErrorCode::ConstraintViolation,
1161                            message: format!("Field {} violates constraint", field),
1162                        }));
1163                    }
1164                }
1165            }
1166        }
1167        Ok(Some(event))
1168    }
1169
1170    fn name(&self) -> &str {
1171        "ValueConstraints"
1172    }
1173}
1174
1175impl Validator for ValueConstraints {}
1176
1177/// Outputs: emit processed events to local or synchronous destinations with explicit failure semantics.
1178/// Ordered write, bounded retry, no unbounded retry, no background flush.
1179pub trait Output: Stage {}
1180
1181pub struct StdoutOutput;
1182
1183impl StdoutOutput {
1184    pub fn new() -> Self {
1185        StdoutOutput
1186    }
1187}
1188
1189impl Stage for StdoutOutput {
1190    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1191        println!(
1192            "{}",
1193            serde_json::to_string(&event.data).map_err(|e| PipelineError::Output(OutputError {
1194                stage: "Stdout".to_string(),
1195                code: OutputErrorCode::SerializeError,
1196                message: e.to_string(),
1197            }))?
1198        );
1199        Ok(None) // Consumed
1200    }
1201
1202    fn name(&self) -> &str {
1203        "StdoutOutput"
1204    }
1205
1206    fn is_output(&self) -> bool {
1207        true
1208    }
1209}
1210
1211impl Output for StdoutOutput {}
1212
1213impl Default for StdoutOutput {
1214    fn default() -> Self {
1215        Self::new()
1216    }
1217}
1218
1219pub struct FileOutput {
1220    path: PathBuf,
1221}
1222
1223impl FileOutput {
1224    pub fn new(path: PathBuf) -> Self {
1225        FileOutput { path }
1226    }
1227}
1228
1229impl Stage for FileOutput {
1230    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1231        use std::io::Write;
1232        let mut file = fs::OpenOptions::new()
1233            .append(true)
1234            .create(true)
1235            .open(&self.path)
1236            .map_err(|e| {
1237                PipelineError::Output(OutputError {
1238                    stage: "File".to_string(),
1239                    code: OutputErrorCode::IoError,
1240                    message: e.to_string(),
1241                })
1242            })?;
1243        writeln!(
1244            file,
1245            "{}",
1246            serde_json::to_string(&event.data).map_err(|e| PipelineError::Output(OutputError {
1247                stage: "File".to_string(),
1248                code: OutputErrorCode::SerializeError,
1249                message: e.to_string(),
1250            }))?
1251        )
1252        .map_err(|e| {
1253            PipelineError::Output(OutputError {
1254                stage: "File".to_string(),
1255                code: OutputErrorCode::IoError,
1256                message: e.to_string(),
1257            })
1258        })?;
1259        Ok(None) // Consumed
1260    }
1261
1262    fn name(&self) -> &str {
1263        "FileOutput"
1264    }
1265
1266    fn is_output(&self) -> bool {
1267        true
1268    }
1269}
1270
1271impl Output for FileOutput {}
1272
1273pub struct Deadletter {
1274    path: PathBuf,
1275}
1276
1277impl Deadletter {
1278    pub fn new(path: PathBuf) -> Self {
1279        Deadletter { path }
1280    }
1281}
1282
1283impl Stage for Deadletter {
1284    fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1285        use std::io::Write;
1286        let mut file = fs::OpenOptions::new()
1287            .append(true)
1288            .create(true)
1289            .open(&self.path)
1290            .map_err(|e| {
1291                PipelineError::Output(OutputError {
1292                    stage: "Deadletter".to_string(),
1293                    code: OutputErrorCode::IoError,
1294                    message: e.to_string(),
1295                })
1296            })?;
1297        writeln!(
1298            file,
1299            "{}",
1300            serde_json::to_string(&event).map_err(|e| PipelineError::Output(OutputError {
1301                stage: "Deadletter".to_string(),
1302                code: OutputErrorCode::SerializeError,
1303                message: e.to_string(),
1304            }))?
1305        )
1306        .map_err(|e| {
1307            PipelineError::Output(OutputError {
1308                stage: "Deadletter".to_string(),
1309                code: OutputErrorCode::IoError,
1310                message: e.to_string(),
1311            })
1312        })?;
1313        Ok(None) // Consumed
1314    }
1315
1316    fn name(&self) -> &str {
1317        "Deadletter"
1318    }
1319
1320    fn is_output(&self) -> bool {
1321        true
1322    }
1323}
1324
1325impl Output for Deadletter {}
1326
1327/// Configuration: ensure pipeline behavior is fully declared and validated before execution.
1328/// YAML input, version required, schema validated, unknown field rejection, no runtime mutation.
1329#[derive(Debug, serde::Deserialize)]
1330#[serde(deny_unknown_fields)]
1331pub struct Config {
1332    version: u32,
1333    // For now, minimal; can extend to full pipeline definition
1334}
1335
1336impl Config {
1337    pub fn from_yaml(yaml: &str) -> anyhow::Result<Self> {
1338        let config: Config = serde_yaml::from_str(yaml)?;
1339        if config.version != 1 {
1340            return Err(anyhow::anyhow!("Unsupported version: {}", config.version));
1341        }
1342        Ok(config)
1343    }
1344}
1345
1346/// Plugins: enable user-defined stages with explicit registration and isolation.
1347/// No implicit discovery.
1348pub struct PluginRegistry {
1349    plugins: HashMap<String, StageFactoryFn>,
1350}
1351
1352impl PluginRegistry {
1353    pub fn new() -> Self {
1354        PluginRegistry {
1355            plugins: HashMap::new(),
1356        }
1357    }
1358
1359    pub fn register(&mut self, name: String, factory: StageFactoryFn) {
1360        self.plugins.insert(name, factory);
1361    }
1362
1363    pub fn get_stage(&self, name: &str) -> Option<Box<dyn Stage>> {
1364        self.plugins.get(name).map(|f| f())
1365    }
1366}
1367
1368impl Default for PluginRegistry {
1369    fn default() -> Self {
1370        Self::new()
1371    }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376    use super::*;
1377
1378    #[test]
1379    fn test_event_creation() {
1380        let data = serde_json::json!({"key": "value"});
1381        let event = Event {
1382            data,
1383            metadata: None,
1384        };
1385        assert_eq!(event.get_string("key"), Some("value"));
1386        assert_eq!(event.get_string("missing"), None);
1387    }
1388
1389    #[test]
1390    fn test_event_from_raw_input() {
1391        let input = r#"{"level": "info", "message": "test"}"#;
1392        let event = Event::from_raw_input(input).unwrap();
1393        assert_eq!(event.get_string("level"), Some("info"));
1394        assert_eq!(event.get_string("message"), Some("test"));
1395    }
1396
1397    #[test]
1398    fn test_pipeline_creation() {
1399        let pipeline = Pipeline::new();
1400        assert_eq!(pipeline.stages.len(), 0);
1401    }
1402
1403    #[test]
1404    fn test_pipeline_add_stage() {
1405        let mut pipeline = Pipeline::new();
1406        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1407        assert_eq!(pipeline.stages.len(), 1);
1408    }
1409
1410    #[test]
1411    fn test_field_select_stage() {
1412        let mut stage = FieldSelect::new(vec!["level".to_string(), "message".to_string()]);
1413        let event = Event {
1414            data: serde_json::json!({"level": "info", "message": "test", "extra": "value"}),
1415            metadata: None,
1416        };
1417        let result = stage.execute(event).unwrap();
1418        assert!(result.is_some());
1419        let filtered = result.unwrap();
1420        assert_eq!(filtered.data.get("level"), Some(&serde_json::json!("info")));
1421        assert_eq!(
1422            filtered.data.get("message"),
1423            Some(&serde_json::json!("test"))
1424        );
1425        assert_eq!(filtered.data.get("extra"), None);
1426    }
1427
1428    #[test]
1429    fn test_filter_stage() {
1430        let mut filter = Filter::new(Box::new(|e| e.get_string("level") == Some("info")));
1431        let info_event = Event {
1432            data: serde_json::json!({"level": "info"}),
1433            metadata: None,
1434        };
1435        let warn_event = Event {
1436            data: serde_json::json!({"level": "warn"}),
1437            metadata: None,
1438        };
1439        assert!(filter.execute(info_event).unwrap().is_some());
1440        assert!(filter.execute(warn_event).unwrap().is_none());
1441    }
1442
1443    #[test]
1444    fn test_required_fields_stage() {
1445        let mut stage = RequiredFields::new(vec!["level".to_string(), "message".to_string()]);
1446        let valid_event = Event {
1447            data: serde_json::json!({"level": "info", "message": "test"}),
1448            metadata: None,
1449        };
1450        let invalid_event = Event {
1451            data: serde_json::json!({"level": "info"}),
1452            metadata: None,
1453        };
1454        assert!(stage.execute(valid_event).unwrap().is_some());
1455        assert!(stage.execute(invalid_event).is_err());
1456    }
1457
1458    #[test]
1459    fn test_pii_redaction_stage() {
1460        let patterns = vec![regex::Regex::new(r"\b\d{3}-\d{2}-\d{4}\b").unwrap()]; // SSN
1461        let mut stage = PIIRedaction::new(patterns);
1462        let event = Event {
1463            data: serde_json::json!({"ssn": "123-45-6789", "name": "John"}),
1464            metadata: None,
1465        };
1466        let result = stage.execute(event).unwrap().unwrap();
1467        assert_eq!(result.get_string("ssn"), Some("[REDACTED]"));
1468        assert_eq!(result.get_string("name"), Some("John"));
1469    }
1470
1471    #[test]
1472    fn test_pipeline_execution_success() {
1473        let mut pipeline = Pipeline::new();
1474        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1475        let event = Event {
1476            data: serde_json::json!({"level": "info", "extra": "value"}),
1477            metadata: None,
1478        };
1479        let result = pipeline.process_event(event).unwrap();
1480        assert!(result.is_some());
1481        let processed = result.unwrap();
1482        assert_eq!(processed.get_string("level"), Some("info"));
1483        assert_eq!(processed.get_string("extra"), None);
1484    }
1485
1486    #[test]
1487    fn test_pipeline_execution_error() {
1488        let mut pipeline = Pipeline::new();
1489        pipeline.add_stage(Box::new(RequiredFields::new(vec!["missing".to_string()])));
1490        let event = Event {
1491            data: serde_json::json!({"level": "info"}),
1492            metadata: None,
1493        };
1494        let result = pipeline.process_event(event);
1495        assert!(result.is_err());
1496        if let Err(PipelineError::Validation(_)) = result {
1497            // correct
1498        } else {
1499            panic!("Expected Validation error");
1500        }
1501    }
1502
1503    #[test]
1504    fn test_metrics_increment() {
1505        let mut metrics = Metrics::new();
1506        assert_eq!(metrics.events_processed, 0);
1507        metrics.increment_processed();
1508        assert_eq!(metrics.events_processed, 1);
1509    }
1510
1511    #[test]
1512    fn test_metrics_dropped() {
1513        let mut metrics = Metrics::new();
1514        metrics.increment_dropped(DropReason::Filtered);
1515        assert_eq!(metrics.events_dropped, 1);
1516        assert_eq!(metrics.drop_reasons.get(&DropReason::Filtered), Some(&1));
1517    }
1518
1519    #[test]
1520    fn test_latency_stats() {
1521        let mut stats = LatencyStats::new();
1522        stats.record(1.0);
1523        stats.record(3.0);
1524        stats.record(2.0);
1525        assert_eq!(stats.count, 3);
1526        assert_eq!(stats.sum, 6.0);
1527        assert_eq!(stats.min, 1.0);
1528        assert_eq!(stats.max, 3.0);
1529    }
1530
1531    #[test]
1532    fn test_std_output_stage() {
1533        let mut stage = StdoutOutput::new();
1534        let event = Event {
1535            data: serde_json::json!({"test": "data"}),
1536            metadata: None,
1537        };
1538        // Should output and consume
1539        let result = stage.execute(event).unwrap();
1540        assert!(result.is_none()); // consumed
1541        assert!(stage.is_output());
1542    }
1543
1544    #[test]
1545    fn test_file_output_stage() {
1546        use std::fs;
1547        use std::path::PathBuf;
1548        let temp_file = PathBuf::from("test_output.ndjson");
1549        let mut stage = FileOutput::new(temp_file.clone());
1550        let event = Event {
1551            data: serde_json::json!({"test": "data"}),
1552            metadata: None,
1553        };
1554        let result = stage.execute(event).unwrap();
1555        assert!(result.is_none());
1556        assert!(stage.is_output());
1557        // Check file exists and has content
1558        assert!(temp_file.exists());
1559        let content = fs::read_to_string(&temp_file).unwrap();
1560        assert!(content.contains("test"));
1561        // Cleanup
1562        fs::remove_file(temp_file).unwrap();
1563    }
1564
1565    #[test]
1566    fn test_deadletter_stage() {
1567        use std::fs;
1568        use std::path::PathBuf;
1569        let temp_file = PathBuf::from("test_deadletter.ndjson");
1570        let mut stage = Deadletter::new(temp_file.clone());
1571        let event = Event {
1572            data: serde_json::json!({"error": "test", "message": "failed"}),
1573            metadata: None,
1574        };
1575        let result = stage.execute(event).unwrap();
1576        assert!(result.is_none());
1577        assert!(stage.is_output());
1578        // Check file exists and has content
1579        assert!(temp_file.exists());
1580        let content = fs::read_to_string(&temp_file).unwrap();
1581        assert!(content.contains("test"));
1582        // Cleanup
1583        fs::remove_file(temp_file).unwrap();
1584    }
1585
1586    #[test]
1587    fn test_directory_ingest_determinism() {
1588        use std::fs;
1589        use tempfile::TempDir;
1590        let temp_dir = TempDir::new().unwrap();
1591
1592        // Create files in non-alphabetical order to test sorting
1593        let file_z = temp_dir.path().join("z.ndjson");
1594        let file_a = temp_dir.path().join("a.ndjson");
1595        let file_m = temp_dir.path().join("m.ndjson");
1596
1597        fs::write(&file_z, r#"{"file": "z"}"#).unwrap();
1598        fs::write(&file_a, r#"{"file": "a"}"#).unwrap();
1599        fs::write(&file_m, r#"{"file": "m"}"#).unwrap();
1600
1601        // Create a pipeline that will process all files
1602        let mut pipeline = Pipeline::new();
1603        pipeline.add_stage(Box::new(StdoutOutput::new()));
1604
1605        // Process directory - should work without errors
1606        // The determinism guarantee is that files are sorted before processing
1607        let mut input_source = InputSource::Directory(temp_dir.path().to_path_buf());
1608        let result = input_source.process_input(&mut pipeline, &mut None);
1609
1610        // Should succeed - if it does, the sorting logic worked
1611        assert!(result.is_ok());
1612
1613        // Check that we processed 3 events (one from each file)
1614        let prometheus = pipeline.export_prometheus();
1615        assert!(prometheus.contains("feedme_events_processed_total 3"));
1616
1617        // Verify files still exist (weren't corrupted)
1618        assert!(file_a.exists());
1619        assert!(file_m.exists());
1620        assert!(file_z.exists());
1621    }
1622
1623    #[test]
1624    fn test_deadletter_attribution() {
1625        use std::fs;
1626        use std::path::PathBuf;
1627        let temp_file = PathBuf::from("test_deadletter_attr.ndjson");
1628
1629        let mut pipeline = Pipeline::new();
1630        pipeline.add_stage(Box::new(RequiredFields::new(vec![
1631            "missing_field".to_string()
1632        ])));
1633
1634        let mut deadletter = Deadletter::new(temp_file.clone());
1635
1636        let event = Event {
1637            data: serde_json::json!({"existing_field": "value"}),
1638            metadata: None,
1639        };
1640
1641        // Process should fail and go to deadletter
1642        let result = pipeline.process_event(event);
1643        assert!(result.is_err());
1644
1645        // Simulate deadletter execution (normally done by InputSource)
1646        if let Err(e) = result {
1647            let error_event = Event {
1648                data: serde_json::json!({
1649                    "error": "pipeline",
1650                    "category": e.category(),
1651                    "stage": e.stage(),
1652                    "code": e.code(),
1653                    "message": e.message()
1654                }),
1655                metadata: None,
1656            };
1657            deadletter.execute(error_event).unwrap();
1658        }
1659
1660        // Check deadletter file contains structured error info
1661        assert!(temp_file.exists());
1662        let content = fs::read_to_string(&temp_file).unwrap();
1663        let first_line = content.lines().next().unwrap();
1664        let deadletter_json: serde_json::Value = serde_json::from_str(first_line).unwrap();
1665
1666        assert_eq!(deadletter_json["data"]["error"], "pipeline");
1667        assert_eq!(deadletter_json["data"]["category"], "Validation");
1668        assert_eq!(deadletter_json["data"]["stage"], "RequiredFields");
1669        assert_eq!(deadletter_json["data"]["code"], "MISSING_FIELD");
1670        assert!(deadletter_json["data"]["message"]
1671            .as_str()
1672            .unwrap()
1673            .contains("Missing required field"));
1674
1675        // Cleanup
1676        fs::remove_file(temp_file).unwrap();
1677    }
1678
1679    #[test]
1680    fn test_pipeline_metrics_export() {
1681        let mut pipeline = Pipeline::new();
1682        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1683        let event = Event {
1684            data: serde_json::json!({"level": "info"}),
1685            metadata: None,
1686        };
1687        pipeline.process_event(event).unwrap();
1688        let json_logs = pipeline.export_json_logs();
1689        assert!(!json_logs.is_empty());
1690        assert!(json_logs.iter().any(|s| s.contains("events_processed")));
1691        let prometheus = pipeline.export_prometheus();
1692        assert!(prometheus.contains("# HELP feedme_events_processed_total"));
1693        assert!(prometheus.contains("feedme_events_processed_total 1"));
1694    }
1695
1696    #[test]
1697    fn test_error_taxonomy() {
1698        let parse_err = PipelineError::Parse(ParseError {
1699            stage: "test".to_string(),
1700            code: ParseErrorCode::Test,
1701            message: "test".to_string(),
1702        });
1703        assert_eq!(parse_err.category(), "Parse");
1704        assert_eq!(parse_err.stage(), "test");
1705        assert_eq!(parse_err.code(), "TEST");
1706        assert_eq!(parse_err.message(), "test");
1707
1708        let transform_err = PipelineError::Transform(TransformError {
1709            stage: "transform_test".to_string(),
1710            code: TransformErrorCode::Test,
1711            message: "transform test".to_string(),
1712        });
1713        assert_eq!(transform_err.category(), "Transform");
1714        assert_eq!(transform_err.stage(), "transform_test");
1715        assert_eq!(transform_err.code(), "TEST");
1716        assert_eq!(transform_err.message(), "transform test");
1717
1718        let validation_err = PipelineError::Validation(ValidationError {
1719            stage: "validation_test".to_string(),
1720            code: ValidationErrorCode::Test,
1721            message: "validation test".to_string(),
1722        });
1723        assert_eq!(validation_err.category(), "Validation");
1724        assert_eq!(validation_err.stage(), "validation_test");
1725        assert_eq!(validation_err.code(), "TEST");
1726        assert_eq!(validation_err.message(), "validation test");
1727
1728        let output_err = PipelineError::Output(OutputError {
1729            stage: "output_test".to_string(),
1730            code: OutputErrorCode::Test,
1731            message: "output test".to_string(),
1732        });
1733        assert_eq!(output_err.category(), "Output");
1734        assert_eq!(output_err.stage(), "output_test");
1735        assert_eq!(output_err.code(), "TEST");
1736        assert_eq!(output_err.message(), "output test");
1737
1738        let system_err = PipelineError::System(SystemError {
1739            stage: "system_test".to_string(),
1740            code: SystemErrorCode::Test,
1741            message: "system test".to_string(),
1742        });
1743        assert_eq!(system_err.category(), "System");
1744        assert_eq!(system_err.stage(), "system_test");
1745        assert_eq!(system_err.code(), "TEST");
1746        assert_eq!(system_err.message(), "system test");
1747    }
1748
1749    #[test]
1750    fn test_input_source_file() {
1751        use std::fs;
1752        use std::io::Write;
1753        let temp_file = "test_input.ndjson";
1754        let mut file = fs::File::create(temp_file).unwrap();
1755        writeln!(file, r#"{{"level": "info"}}"#).unwrap();
1756        drop(file);
1757
1758        let mut pipeline = Pipeline::new();
1759        pipeline.add_stage(Box::new(StdoutOutput::new()));
1760        let mut input = InputSource::File(temp_file.into());
1761        let mut deadletter: Option<&mut dyn Stage> = None;
1762        let result = input.process_input(&mut pipeline, &mut deadletter);
1763        assert!(result.is_ok());
1764
1765        // Cleanup
1766        fs::remove_file(temp_file).unwrap();
1767    }
1768
1769    #[test]
1770    fn test_input_source_file_parse_error() {
1771        use std::fs;
1772        use std::io::Write;
1773        let temp_file = "test_invalid.ndjson";
1774        let mut file = fs::File::create(temp_file).unwrap();
1775        writeln!(file, "invalid json").unwrap();
1776        drop(file);
1777
1778        let mut pipeline = Pipeline::new();
1779        pipeline.add_stage(Box::new(StdoutOutput::new()));
1780        let mut input = InputSource::File(temp_file.into());
1781        let mut deadletter: Option<&mut dyn Stage> = None;
1782        let result = input.process_input(&mut pipeline, &mut deadletter);
1783        assert!(result.is_err()); // fails on parse error when no deadletter
1784
1785        // Cleanup
1786        fs::remove_file(temp_file).unwrap();
1787    }
1788
1789    #[test]
1790    fn test_input_source_stdin() {
1791        // Hard to test stdin directly, but can test the enum
1792        let _input = InputSource::Stdin;
1793        // Would need integration test
1794    }
1795
1796    #[test]
1797    fn test_type_checking_stage() {
1798        use std::collections::HashMap;
1799        let mut type_checks = HashMap::new();
1800        type_checks.insert("level".to_string(), "string".to_string());
1801        type_checks.insert("count".to_string(), "number".to_string());
1802
1803        let mut stage = TypeChecking::new(type_checks);
1804
1805        // Valid event
1806        let valid_event = Event {
1807            data: serde_json::json!({"level": "info", "count": 42}),
1808            metadata: None,
1809        };
1810        let result = stage.execute(valid_event).unwrap();
1811        assert!(result.is_some());
1812
1813        // Invalid event - wrong type
1814        let invalid_event = Event {
1815            data: serde_json::json!({"level": 123, "count": "not_a_number"}),
1816            metadata: None,
1817        };
1818        let result = stage.execute(invalid_event);
1819        assert!(result.is_err());
1820        assert_eq!(result.unwrap_err().category(), "Validation");
1821    }
1822
1823    #[test]
1824    fn test_value_constraints_stage() {
1825        use std::collections::HashMap;
1826        let mut constraints = HashMap::new();
1827        constraints.insert(
1828            "count".to_string(),
1829            Box::new(|v: &serde_json::Value| v.as_i64().map(|n| n >= 0).unwrap_or(false))
1830                as Box<dyn Fn(&serde_json::Value) -> bool>,
1831        );
1832
1833        let mut stage = ValueConstraints::new(constraints);
1834
1835        // Valid event
1836        let valid_event = Event {
1837            data: serde_json::json!({"count": 10}),
1838            metadata: None,
1839        };
1840        let result = stage.execute(valid_event).unwrap();
1841        assert!(result.is_some());
1842
1843        // Invalid event - constraint violation
1844        let invalid_event = Event {
1845            data: serde_json::json!({"count": -5}),
1846            metadata: None,
1847        };
1848        let result = stage.execute(invalid_event);
1849        assert!(result.is_err());
1850        assert_eq!(result.unwrap_err().category(), "Validation");
1851    }
1852
1853    #[test]
1854    fn test_input_source_directory_error() {
1855        let mut pipeline = Pipeline::new();
1856        pipeline.add_stage(Box::new(StdoutOutput::new()));
1857        let mut input = InputSource::Directory(PathBuf::from("/nonexistent/directory"));
1858        let mut deadletter: Option<&mut dyn Stage> = None;
1859        let result = input.process_input(&mut pipeline, &mut deadletter);
1860        assert!(result.is_err());
1861        assert_eq!(result.unwrap_err().category(), "System");
1862    }
1863
1864    #[test]
1865    fn test_pipeline_metrics_json_export() {
1866        let mut pipeline = Pipeline::new();
1867        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1868        let event = Event {
1869            data: serde_json::json!({"level": "info", "message": "test"}),
1870            metadata: None,
1871        };
1872        pipeline.process_event(event).unwrap();
1873        let json_logs = pipeline.export_json_logs();
1874        assert!(!json_logs.is_empty());
1875        // Check that JSON logs contain expected structure
1876        let first_log: serde_json::Value = serde_json::from_str(&json_logs[0]).unwrap();
1877        assert_eq!(first_log["metric"], "events_processed");
1878        assert!(first_log["value"].is_number());
1879    }
1880
1881    #[test]
1882    fn test_derived_fields_stage() {
1883        use std::collections::HashMap;
1884        let mut derivations = HashMap::new();
1885        derivations.insert(
1886            "derived_field".to_string(),
1887            Box::new(|event: &Event| event.get_string("base_field").unwrap_or("default").into())
1888                as Box<dyn Fn(&Event) -> serde_json::Value>,
1889        );
1890
1891        let mut stage = DerivedFields::new(derivations);
1892        let event = Event {
1893            data: serde_json::json!({"base_field": "test_value"}),
1894            metadata: None,
1895        };
1896        let result = stage.execute(event).unwrap().unwrap();
1897        assert_eq!(result.get_string("derived_field"), Some("test_value"));
1898        assert_eq!(result.get_string("base_field"), Some("test_value"));
1899    }
1900
1901    #[test]
1902    fn test_pipeline_error_display() {
1903        let parse_err = PipelineError::Parse(ParseError {
1904            stage: "test".to_string(),
1905            code: ParseErrorCode::Test,
1906            message: "test message".to_string(),
1907        });
1908        let display = format!("{}", parse_err);
1909        assert!(display.contains("Parse error: test message"));
1910
1911        let transform_err = PipelineError::Transform(TransformError {
1912            stage: "test".to_string(),
1913            code: TransformErrorCode::Test,
1914            message: "transform message".to_string(),
1915        });
1916        let display = format!("{}", transform_err);
1917        assert!(display.contains("Transform error: transform message"));
1918    }
1919
1920    #[test]
1921    fn test_input_source_directory() {
1922        use std::fs;
1923        use std::io::Write;
1924        let temp_dir = "test_dir";
1925        fs::create_dir(temp_dir).unwrap();
1926        let temp_file = format!("{}/test.ndjson", temp_dir);
1927        let mut file = fs::File::create(&temp_file).unwrap();
1928        writeln!(file, r#"{{"level": "info"}}"#).unwrap();
1929        drop(file);
1930
1931        let mut pipeline = Pipeline::new();
1932        pipeline.add_stage(Box::new(StdoutOutput::new()));
1933        let mut input = InputSource::Directory(temp_dir.into());
1934        let mut deadletter: Option<&mut dyn Stage> = None;
1935        let result = input.process_input(&mut pipeline, &mut deadletter);
1936        assert!(result.is_ok());
1937
1938        // Cleanup
1939        fs::remove_file(&temp_file).unwrap();
1940        fs::remove_dir(temp_dir).unwrap();
1941    }
1942
1943    #[test]
1944    fn test_event_get_number() {
1945        let event = Event {
1946            data: serde_json::json!({"count": 42, "rate": 3.15, "text": "not_a_number"}),
1947            metadata: None,
1948        };
1949
1950        assert_eq!(event.get_number("count"), Some(42.0));
1951        assert_eq!(event.get_number("rate"), Some(3.15));
1952        assert_eq!(event.get_number("text"), None);
1953        assert_eq!(event.get_number("missing"), None);
1954    }
1955
1956    #[test]
1957    fn test_ndjson_parser() {
1958        let parser = NDJSONParser;
1959        let valid_json = r#"{"level": "info", "message": "test"}"#;
1960        let result = parser.parse(valid_json.as_bytes()).unwrap();
1961        assert_eq!(result.get_string("level"), Some("info"));
1962        assert_eq!(result.get_string("message"), Some("test"));
1963
1964        let invalid_utf8 = &[0xFF, 0xFF, 0xFF, 0xFF];
1965        let result = parser.parse(invalid_utf8);
1966        assert!(result.is_err());
1967        assert_eq!(result.unwrap_err().category(), "Parse");
1968
1969        let invalid_json = "not valid json";
1970        let result = parser.parse(invalid_json.as_bytes());
1971        assert!(result.is_err());
1972        assert_eq!(result.unwrap_err().category(), "Parse");
1973    }
1974
1975    #[test]
1976    fn test_json_array_parser() {
1977        let parser = JSONArrayParser;
1978        let valid_array = r#"[{"item": 1}, {"item": 2}]"#;
1979        let result = parser.parse(valid_array.as_bytes()).unwrap();
1980        assert!(result.data.is_array());
1981
1982        let invalid_utf8 = &[0xFF, 0xFF, 0xFF, 0xFF];
1983        let result = parser.parse(invalid_utf8);
1984        assert!(result.is_err());
1985        assert_eq!(result.unwrap_err().category(), "Parse");
1986
1987        let invalid_json = "not valid json";
1988        let result = parser.parse(invalid_json.as_bytes());
1989        assert!(result.is_err());
1990        assert_eq!(result.unwrap_err().category(), "Parse");
1991    }
1992
1993    #[test]
1994    fn test_syslog_parser() {
1995        let parser = SyslogParser;
1996        let syslog_message = "<13>2024-01-01T10:00:00Z myhost test message";
1997        let result = parser.parse(syslog_message.as_bytes()).unwrap();
1998        assert_eq!(result.get_string("message"), Some(syslog_message));
1999
2000        let invalid_utf8 = &[0xFF, 0xFF, 0xFF, 0xFF];
2001        let result = parser.parse(invalid_utf8);
2002        assert!(result.is_err());
2003        assert_eq!(result.unwrap_err().category(), "Parse");
2004    }
2005
2006    #[test]
2007    fn test_field_remap_stage() {
2008        use std::collections::HashMap;
2009        let mut mappings = HashMap::new();
2010        mappings.insert("old_field".to_string(), "new_field".to_string());
2011        mappings.insert("another_old".to_string(), "another_new".to_string());
2012
2013        let mut stage = FieldRemap::new(mappings);
2014        let event = Event {
2015            data: serde_json::json!({"old_field": "value1", "another_old": "value2", "keep": "value3"}),
2016            metadata: None,
2017        };
2018
2019        let result = stage.execute(event).unwrap().unwrap();
2020        assert_eq!(result.get_string("new_field"), Some("value1"));
2021        assert_eq!(result.get_string("another_new"), Some("value2"));
2022        assert_eq!(result.get_string("keep"), Some("value3"));
2023        assert_eq!(result.get_string("old_field"), None);
2024        assert_eq!(result.get_string("another_old"), None);
2025        assert_eq!(stage.name(), "FieldRemap");
2026
2027        let non_object_event = Event {
2028            data: serde_json::json!("just a string"),
2029            metadata: None,
2030        };
2031        let result = stage.execute(non_object_event).unwrap().unwrap();
2032        assert_eq!(result.data, serde_json::json!("just a string"));
2033    }
2034
2035    #[test]
2036    fn test_config_from_yaml() {
2037        let valid_yaml = "version: 1";
2038        let config = Config::from_yaml(valid_yaml).unwrap();
2039        assert_eq!(config.version, 1);
2040
2041        let invalid_version = "version: 2";
2042        let result = Config::from_yaml(invalid_version);
2043        assert!(result.is_err());
2044        assert!(result
2045            .unwrap_err()
2046            .to_string()
2047            .contains("Unsupported version"));
2048
2049        let invalid_yaml = "invalid: yaml: structure: [";
2050        let result = Config::from_yaml(invalid_yaml);
2051        assert!(result.is_err());
2052
2053        let unknown_fields = "version: 1\nunknown_field: value";
2054        let result = Config::from_yaml(unknown_fields);
2055        assert!(result.is_err());
2056    }
2057
2058    #[test]
2059    fn test_plugin_registry() {
2060        let mut registry = PluginRegistry::new();
2061        registry.register(
2062            "test_stage".to_string(),
2063            Box::new(|| Box::new(StdoutOutput::new())),
2064        );
2065
2066        let stage = registry.get_stage("test_stage");
2067        assert!(stage.is_some());
2068        assert_eq!(stage.unwrap().name(), "StdoutOutput");
2069
2070        let missing = registry.get_stage("missing_stage");
2071        assert!(missing.is_none());
2072    }
2073
2074    #[test]
2075    fn test_drop_reason_display() {
2076        assert_eq!(format!("{}", DropReason::Filtered), "filtered");
2077    }
2078
2079    #[test]
2080    fn test_pipeline_error_std_error_trait() {
2081        let error = PipelineError::Parse(ParseError {
2082            stage: "test".to_string(),
2083            code: ParseErrorCode::Test,
2084            message: "test error".to_string(),
2085        });
2086
2087        let _: &dyn std::error::Error = &error;
2088    }
2089
2090    #[test]
2091    fn test_input_source_file_with_deadletter() {
2092        use std::fs;
2093        use std::io::Write;
2094        let temp_file = "test_input_deadletter.ndjson";
2095        let deadletter_file = "test_deadletter_file.ndjson";
2096        let mut file = fs::File::create(temp_file).unwrap();
2097        writeln!(file, "invalid json line").unwrap();
2098        writeln!(file, r#"{{"level": "info"}}"#).unwrap();
2099        drop(file);
2100
2101        let mut pipeline = Pipeline::new();
2102        pipeline.add_stage(Box::new(RequiredFields::new(vec![
2103            "missing_field".to_string()
2104        ])));
2105
2106        let mut deadletter_stage = Deadletter::new(deadletter_file.into());
2107        let mut deadletter: Option<&mut dyn Stage> = Some(&mut deadletter_stage);
2108
2109        let mut input = InputSource::File(temp_file.into());
2110        let result = input.process_input(&mut pipeline, &mut deadletter);
2111        assert!(result.is_ok());
2112
2113        let deadletter_content = fs::read_to_string(deadletter_file).unwrap();
2114        assert!(deadletter_content.contains("parse"));
2115        assert!(deadletter_content.contains("pipeline"));
2116
2117        // Cleanup
2118        fs::remove_file(temp_file).unwrap();
2119        fs::remove_file(deadletter_file).unwrap();
2120    }
2121
2122    #[test]
2123    fn test_complete_pipeline_with_transforms_and_validators() {
2124        let mut pipeline = Pipeline::new();
2125
2126        let mut field_mappings = std::collections::HashMap::new();
2127        field_mappings.insert("msg".to_string(), "message".to_string());
2128        pipeline.add_stage(Box::new(FieldRemap::new(field_mappings)));
2129
2130        pipeline.add_stage(Box::new(FieldSelect::new(vec![
2131            "level".to_string(),
2132            "message".to_string(),
2133        ])));
2134
2135        pipeline.add_stage(Box::new(RequiredFields::new(vec![
2136            "level".to_string(),
2137            "message".to_string(),
2138        ])));
2139
2140        let mut type_checks = std::collections::HashMap::new();
2141        type_checks.insert("level".to_string(), "string".to_string());
2142        pipeline.add_stage(Box::new(TypeChecking::new(type_checks)));
2143
2144        pipeline.add_stage(Box::new(StdoutOutput::new()));
2145
2146        let event = Event {
2147            data: serde_json::json!({"level": "info", "msg": "test message", "extra": "ignored"}),
2148            metadata: None,
2149        };
2150
2151        let result = pipeline.process_event(event).unwrap();
2152        assert!(result.is_none());
2153
2154        let prometheus = pipeline.export_prometheus();
2155        assert!(prometheus.contains("feedme_events_processed_total 1"));
2156        assert!(prometheus.contains("feedme_stage_latency_ms"));
2157    }
2158
2159    #[test]
2160    fn test_metrics_export_formats() {
2161        let mut metrics = Metrics::new();
2162        metrics.increment_processed();
2163        metrics.increment_dropped(DropReason::Filtered);
2164        metrics.increment_errors();
2165        metrics.record_latency("test_stage", 100.5);
2166
2167        let prometheus = metrics.to_prometheus();
2168        assert!(prometheus.contains("feedme_events_processed_total 1"));
2169        assert!(prometheus.contains("feedme_events_dropped_total 1"));
2170        assert!(prometheus.contains("feedme_errors_total 1"));
2171        assert!(prometheus.contains("feedme_stage_latency_ms_sum{stage=\"test_stage\"} 100.5"));
2172        assert!(prometheus.contains("feedme_drop_reasons_total{reason=\"filtered\"} 1"));
2173
2174        let json_logs = metrics.to_json_logs();
2175        assert!(json_logs.len() >= 4);
2176
2177        let processed_log = json_logs
2178            .iter()
2179            .find(|log| log.contains("events_processed"))
2180            .unwrap();
2181        let log_json: serde_json::Value = serde_json::from_str(processed_log).unwrap();
2182        assert_eq!(log_json["metric"], "events_processed");
2183        assert_eq!(log_json["value"], 1);
2184    }
2185
2186    #[test]
2187    fn test_input_source_directory_with_deadletter() {
2188        use std::fs;
2189        use tempfile::TempDir;
2190
2191        let temp_dir = TempDir::new().unwrap();
2192        let file1 = temp_dir.path().join("file1.ndjson");
2193        let file2 = temp_dir.path().join("file2.ndjson");
2194
2195        fs::write(&file1, "invalid json\n").unwrap();
2196        fs::write(&file2, r#"{"level": "info"}"#).unwrap();
2197
2198        let deadletter_file = temp_dir.path().join("deadletter.ndjson");
2199        let mut deadletter_stage = Deadletter::new(deadletter_file.clone());
2200        let mut deadletter: Option<&mut dyn Stage> = Some(&mut deadletter_stage);
2201
2202        let mut pipeline = Pipeline::new();
2203        pipeline.add_stage(Box::new(StdoutOutput::new()));
2204
2205        let mut input = InputSource::Directory(temp_dir.path().to_path_buf());
2206        let result = input.process_input(&mut pipeline, &mut deadletter);
2207        assert!(result.is_ok());
2208
2209        let deadletter_content = fs::read_to_string(&deadletter_file).unwrap();
2210        assert!(deadletter_content.contains("parse"));
2211        assert!(deadletter_content.contains("PARSE_ERROR"));
2212    }
2213
2214    #[test]
2215    fn test_stage_is_output_default() {
2216        let stage = FieldSelect::new(vec!["test".to_string()]);
2217        assert!(!stage.is_output());
2218
2219        let output_stage = StdoutOutput::new();
2220        assert!(output_stage.is_output());
2221    }
2222
2223    #[test]
2224    fn test_latency_stats_edge_cases() {
2225        let mut stats = LatencyStats::new();
2226        assert_eq!(stats.count, 0);
2227        assert_eq!(stats.sum, 0.0);
2228        assert_eq!(stats.min, f64::INFINITY);
2229        assert_eq!(stats.max, f64::NEG_INFINITY);
2230
2231        stats.record(5.0);
2232        assert_eq!(stats.count, 1);
2233        assert_eq!(stats.min, 5.0);
2234        assert_eq!(stats.max, 5.0);
2235
2236        stats.record(3.0);
2237        assert_eq!(stats.min, 3.0);
2238        assert_eq!(stats.max, 5.0);
2239
2240        stats.record(7.0);
2241        assert_eq!(stats.min, 3.0);
2242        assert_eq!(stats.max, 7.0);
2243    }
2244
2245    #[test]
2246    fn test_pipeline_stage_filtering() {
2247        let mut pipeline = Pipeline::new();
2248        pipeline.add_stage(Box::new(Filter::new(Box::new(|e| {
2249            e.get_string("level") == Some("error")
2250        }))));
2251        pipeline.add_stage(Box::new(StdoutOutput::new()));
2252
2253        let info_event = Event {
2254            data: serde_json::json!({"level": "info", "message": "test"}),
2255            metadata: None,
2256        };
2257        let result = pipeline.process_event(info_event).unwrap();
2258        assert!(result.is_none());
2259
2260        let error_event = Event {
2261            data: serde_json::json!({"level": "error", "message": "test"}),
2262            metadata: None,
2263        };
2264        let result = pipeline.process_event(error_event).unwrap();
2265        assert!(result.is_none());
2266
2267        let prometheus = pipeline.export_prometheus();
2268        assert!(prometheus.contains("feedme_events_processed_total 2"));
2269        assert!(prometheus.contains("feedme_events_dropped_total 1"));
2270    }
2271
2272    #[test]
2273    fn test_all_pipeline_error_display_variants() {
2274        let validation_err = PipelineError::Validation(ValidationError {
2275            stage: "test".to_string(),
2276            code: ValidationErrorCode::Test,
2277            message: "validation test".to_string(),
2278        });
2279        assert_eq!(
2280            format!("{}", validation_err),
2281            "Validation error: validation test"
2282        );
2283
2284        let output_err = PipelineError::Output(OutputError {
2285            stage: "test".to_string(),
2286            code: OutputErrorCode::Test,
2287            message: "output test".to_string(),
2288        });
2289        assert_eq!(format!("{}", output_err), "Output error: output test");
2290
2291        let system_err = PipelineError::System(SystemError {
2292            stage: "test".to_string(),
2293            code: SystemErrorCode::Test,
2294            message: "system test".to_string(),
2295        });
2296        assert_eq!(format!("{}", system_err), "System error: system test");
2297    }
2298
2299    #[test]
2300    fn test_input_source_stdin_processing() {
2301        let input_stdin = InputSource::Stdin;
2302        match input_stdin {
2303            InputSource::Stdin => {
2304                // Test passes if we reach here
2305            }
2306            _ => panic!("Should be Stdin variant"),
2307        }
2308    }
2309
2310    #[test]
2311    fn test_file_output_io_errors() {
2312        use std::path::PathBuf;
2313        let invalid_path = PathBuf::from("/invalid/path/that/should/not/exist/output.json");
2314        let mut stage = FileOutput::new(invalid_path);
2315
2316        let event = Event {
2317            data: serde_json::json!({"test": "data"}),
2318            metadata: None,
2319        };
2320
2321        let result = stage.execute(event);
2322        assert!(result.is_err());
2323        assert_eq!(result.unwrap_err().category(), "Output");
2324    }
2325
2326    #[test]
2327    fn test_deadletter_io_errors() {
2328        use std::path::PathBuf;
2329        let invalid_path = PathBuf::from("/invalid/path/that/should/not/exist/deadletter.json");
2330        let mut stage = Deadletter::new(invalid_path);
2331
2332        let event = Event {
2333            data: serde_json::json!({"error": "test"}),
2334            metadata: None,
2335        };
2336
2337        let result = stage.execute(event);
2338        assert!(result.is_err());
2339        assert_eq!(result.unwrap_err().category(), "Output");
2340    }
2341
2342    #[test]
2343    fn test_metrics_empty_latency_stats() {
2344        let metrics = Metrics::new();
2345        let prometheus = metrics.to_prometheus();
2346
2347        assert!(prometheus.contains("feedme_events_processed_total 0"));
2348        assert!(prometheus.contains("feedme_events_dropped_total 0"));
2349        assert!(prometheus.contains("feedme_errors_total 0"));
2350
2351        let json_logs = metrics.to_json_logs();
2352        assert!(json_logs.len() >= 3);
2353    }
2354
2355    #[test]
2356    fn test_stage_names() {
2357        assert_eq!(FieldSelect::new(vec![]).name(), "FieldSelect");
2358        assert_eq!(
2359            FieldRemap::new(std::collections::HashMap::new()).name(),
2360            "FieldRemap"
2361        );
2362        assert_eq!(PIIRedaction::new(vec![]).name(), "PIIRedaction");
2363        assert_eq!(
2364            DerivedFields::new(std::collections::HashMap::new()).name(),
2365            "DerivedFields"
2366        );
2367        assert_eq!(Filter::new(Box::new(|_| true)).name(), "Filter");
2368        assert_eq!(RequiredFields::new(vec![]).name(), "RequiredFields");
2369        assert_eq!(
2370            TypeChecking::new(std::collections::HashMap::new()).name(),
2371            "TypeChecking"
2372        );
2373        assert_eq!(
2374            ValueConstraints::new(std::collections::HashMap::new()).name(),
2375            "ValueConstraints"
2376        );
2377        assert_eq!(StdoutOutput::new().name(), "StdoutOutput");
2378        assert_eq!(FileOutput::new("/tmp/test".into()).name(), "FileOutput");
2379        assert_eq!(Deadletter::new("/tmp/test".into()).name(), "Deadletter");
2380    }
2381
2382    #[test]
2383    fn test_stages_is_output() {
2384        assert!(!FieldSelect::new(vec![]).is_output());
2385        assert!(!FieldRemap::new(std::collections::HashMap::new()).is_output());
2386        assert!(!PIIRedaction::new(vec![]).is_output());
2387        assert!(!DerivedFields::new(std::collections::HashMap::new()).is_output());
2388        assert!(!Filter::new(Box::new(|_| true)).is_output());
2389        assert!(!RequiredFields::new(vec![]).is_output());
2390        assert!(!TypeChecking::new(std::collections::HashMap::new()).is_output());
2391        assert!(!ValueConstraints::new(std::collections::HashMap::new()).is_output());
2392        assert!(StdoutOutput::new().is_output());
2393        assert!(FileOutput::new("/tmp/test".into()).is_output());
2394        assert!(Deadletter::new("/tmp/test".into()).is_output());
2395    }
2396
2397    #[test]
2398    fn test_stdout_output_serialization_error() {
2399        let mut stage = StdoutOutput::new();
2400
2401        let event_with_infinite = Event {
2402            data: serde_json::json!({"value": f64::INFINITY}),
2403            metadata: None,
2404        };
2405
2406        let result = stage.execute(event_with_infinite);
2407        match result {
2408            Ok(_) => {
2409                // JSON serialization should handle INFINITY as null, so this is OK
2410            }
2411            Err(e) => {
2412                assert_eq!(e.category(), "Output");
2413                assert_eq!(e.code(), "SERIALIZE_ERROR");
2414            }
2415        }
2416    }
2417
2418    #[test]
2419    fn test_event_with_metadata() {
2420        use std::collections::BTreeMap;
2421        let mut metadata = BTreeMap::new();
2422        metadata.insert("source".to_string(), serde_json::json!("test"));
2423        metadata.insert("timestamp".to_string(), serde_json::json!(1234567890));
2424
2425        let event = Event {
2426            data: serde_json::json!({"message": "test"}),
2427            metadata: Some(metadata),
2428        };
2429
2430        assert_eq!(event.get_string("message"), Some("test"));
2431        assert!(event.metadata.is_some());
2432        assert_eq!(
2433            event.metadata.as_ref().unwrap().get("source").unwrap(),
2434            &serde_json::json!("test")
2435        );
2436    }
2437
2438    #[test]
2439    fn test_complete_error_handling_pipeline() {
2440        use std::fs;
2441        use std::io::Write;
2442        use tempfile::TempDir;
2443
2444        let temp_dir = TempDir::new().unwrap();
2445        let input_file = temp_dir.path().join("input.ndjson");
2446        let deadletter_file = temp_dir.path().join("deadletter.ndjson");
2447
2448        let mut file = fs::File::create(&input_file).unwrap();
2449        writeln!(file, "invalid json line").unwrap();
2450        writeln!(file, r#"{{"level": "info", "message": "valid"}}"#).unwrap();
2451        writeln!(file, r#"{{"level": "warn"}}"#).unwrap();
2452        drop(file);
2453
2454        let mut pipeline = Pipeline::new();
2455        pipeline.add_stage(Box::new(RequiredFields::new(vec![
2456            "level".to_string(),
2457            "message".to_string(),
2458        ])));
2459        pipeline.add_stage(Box::new(StdoutOutput::new()));
2460
2461        let mut deadletter_stage = Deadletter::new(deadletter_file.clone());
2462        let mut deadletter: Option<&mut dyn Stage> = Some(&mut deadletter_stage);
2463
2464        let mut input = InputSource::File(input_file);
2465        let result = input.process_input(&mut pipeline, &mut deadletter);
2466        assert!(result.is_ok());
2467
2468        let deadletter_content = fs::read_to_string(&deadletter_file).unwrap();
2469        assert!(deadletter_content.contains("PARSE_ERROR"));
2470        assert!(deadletter_content.contains("MISSING_FIELD"));
2471
2472        let lines: Vec<&str> = deadletter_content.lines().collect();
2473        assert_eq!(lines.len(), 2);
2474    }
2475
2476    #[test]
2477    fn test_type_checking_all_types() {
2478        use std::collections::HashMap;
2479        let mut type_checks = HashMap::new();
2480        type_checks.insert("str_field".to_string(), "string".to_string());
2481        type_checks.insert("num_field".to_string(), "number".to_string());
2482        type_checks.insert("bool_field".to_string(), "boolean".to_string());
2483        type_checks.insert("obj_field".to_string(), "object".to_string());
2484        type_checks.insert("arr_field".to_string(), "array".to_string());
2485        type_checks.insert("null_field".to_string(), "null".to_string());
2486
2487        let mut stage = TypeChecking::new(type_checks);
2488
2489        let valid_event = Event {
2490            data: serde_json::json!({
2491                "str_field": "hello",
2492                "num_field": 42,
2493                "bool_field": true,
2494                "obj_field": {"nested": "value"},
2495                "arr_field": [1, 2, 3],
2496                "null_field": null
2497            }),
2498            metadata: None,
2499        };
2500
2501        let result = stage.execute(valid_event).unwrap();
2502        assert!(result.is_some());
2503
2504        let invalid_event = Event {
2505            data: serde_json::json!({
2506                "str_field": 42,
2507                "num_field": "not_a_number"
2508            }),
2509            metadata: None,
2510        };
2511
2512        let result = stage.execute(invalid_event);
2513        assert!(result.is_err());
2514        assert_eq!(result.unwrap_err().category(), "Validation");
2515    }
2516
2517    #[test]
2518    fn test_input_file_io_error() {
2519        use std::path::PathBuf;
2520        let nonexistent_file = PathBuf::from("/nonexistent/path/file.json");
2521        let mut input = InputSource::File(nonexistent_file);
2522
2523        let mut pipeline = Pipeline::new();
2524        pipeline.add_stage(Box::new(StdoutOutput::new()));
2525        let mut deadletter: Option<&mut dyn Stage> = None;
2526
2527        let result = input.process_input(&mut pipeline, &mut deadletter);
2528        assert!(result.is_err());
2529        let err = result.unwrap_err();
2530        assert_eq!(err.category(), "System");
2531        assert_eq!(err.code(), "IO_ERROR");
2532    }
2533
2534    #[test]
2535    fn test_stdin_input_mock() {
2536        let input = InputSource::Stdin;
2537        match input {
2538            InputSource::Stdin => {
2539                // Test that we can create the variant
2540            }
2541            _ => panic!("Should be Stdin"),
2542        }
2543    }
2544
2545    #[test]
2546    fn test_file_output_serialization_error() {
2547        use std::fs;
2548        use std::path::PathBuf;
2549
2550        let temp_file = "test_serialize_error.json";
2551        let mut stage = FileOutput::new(PathBuf::from(temp_file));
2552
2553        // Create an event that will cause JSON serialization issues
2554        let problematic_event = Event {
2555            data: serde_json::json!({"data": "normal"}),
2556            metadata: None,
2557        };
2558
2559        // This should work fine
2560        let result = stage.execute(problematic_event);
2561        assert!(result.is_ok());
2562
2563        // Clean up
2564        if std::path::Path::new(temp_file).exists() {
2565            fs::remove_file(temp_file).unwrap();
2566        }
2567    }
2568
2569    #[test]
2570    fn test_deadletter_serialization_error() {
2571        use std::fs;
2572        use std::path::PathBuf;
2573
2574        let temp_file = "test_deadletter_serialize.json";
2575        let mut stage = Deadletter::new(PathBuf::from(temp_file));
2576
2577        let event = Event {
2578            data: serde_json::json!({"error": "test"}),
2579            metadata: None,
2580        };
2581
2582        let result = stage.execute(event);
2583        assert!(result.is_ok());
2584
2585        // Clean up
2586        if std::path::Path::new(temp_file).exists() {
2587            fs::remove_file(temp_file).unwrap();
2588        }
2589    }
2590
2591    #[test]
2592    fn test_directory_io_error_during_iteration() {
2593        use std::path::PathBuf;
2594        let mut input = InputSource::Directory(PathBuf::from("/nonexistent/directory"));
2595
2596        let mut pipeline = Pipeline::new();
2597        pipeline.add_stage(Box::new(StdoutOutput::new()));
2598        let mut deadletter: Option<&mut dyn Stage> = None;
2599
2600        let result = input.process_input(&mut pipeline, &mut deadletter);
2601        assert!(result.is_err());
2602        assert_eq!(result.unwrap_err().category(), "System");
2603    }
2604
2605    #[test]
2606    fn test_pipeline_with_error_in_middle_stage() {
2607        let mut pipeline = Pipeline::new();
2608        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
2609        pipeline.add_stage(Box::new(RequiredFields::new(vec![
2610            "missing_field".to_string()
2611        ])));
2612        pipeline.add_stage(Box::new(StdoutOutput::new()));
2613
2614        let event = Event {
2615            data: serde_json::json!({"level": "info", "message": "test"}),
2616            metadata: None,
2617        };
2618
2619        let result = pipeline.process_event(event);
2620        assert!(result.is_err());
2621        assert_eq!(result.unwrap_err().category(), "Validation");
2622
2623        let prometheus = pipeline.export_prometheus();
2624        assert!(prometheus.contains("feedme_events_processed_total 1"));
2625        assert!(prometheus.contains("feedme_errors_total 1"));
2626    }
2627
2628    #[test]
2629    fn test_pii_redaction_non_object() {
2630        let mut stage = PIIRedaction::new(vec![regex::Regex::new(r"\d{3}-\d{2}-\d{4}").unwrap()]);
2631
2632        let non_object_event = Event {
2633            data: serde_json::json!("just a string with SSN 123-45-6789"),
2634            metadata: None,
2635        };
2636
2637        let result = stage.execute(non_object_event).unwrap().unwrap();
2638        assert_eq!(
2639            result.data,
2640            serde_json::json!("just a string with SSN 123-45-6789")
2641        );
2642    }
2643
2644    #[test]
2645    fn test_derived_fields_non_object() {
2646        use std::collections::HashMap;
2647        let mut derivations = HashMap::new();
2648        derivations.insert(
2649            "new_field".to_string(),
2650            Box::new(|_: &Event| serde_json::json!("derived"))
2651                as Box<dyn Fn(&Event) -> serde_json::Value>,
2652        );
2653
2654        let mut stage = DerivedFields::new(derivations);
2655
2656        let non_object_event = Event {
2657            data: serde_json::json!("just a string"),
2658            metadata: None,
2659        };
2660
2661        let result = stage.execute(non_object_event).unwrap().unwrap();
2662        assert_eq!(result.data, serde_json::json!("just a string"));
2663    }
2664
2665    #[test]
2666    fn test_required_fields_non_object() {
2667        let mut stage = RequiredFields::new(vec!["level".to_string()]);
2668
2669        let non_object_event = Event {
2670            data: serde_json::json!("just a string"),
2671            metadata: None,
2672        };
2673
2674        let result = stage.execute(non_object_event).unwrap();
2675        assert!(result.is_some());
2676    }
2677
2678    #[test]
2679    fn test_type_checking_non_object() {
2680        use std::collections::HashMap;
2681        let mut type_checks = HashMap::new();
2682        type_checks.insert("field".to_string(), "string".to_string());
2683
2684        let mut stage = TypeChecking::new(type_checks);
2685
2686        let non_object_event = Event {
2687            data: serde_json::json!("just a string"),
2688            metadata: None,
2689        };
2690
2691        let result = stage.execute(non_object_event).unwrap();
2692        assert!(result.is_some());
2693    }
2694
2695    #[test]
2696    fn test_value_constraints_non_object() {
2697        use std::collections::HashMap;
2698        let mut constraints = HashMap::new();
2699        constraints.insert(
2700            "field".to_string(),
2701            Box::new(|_: &serde_json::Value| true) as Box<dyn Fn(&serde_json::Value) -> bool>,
2702        );
2703
2704        let mut stage = ValueConstraints::new(constraints);
2705
2706        let non_object_event = Event {
2707            data: serde_json::json!("just a string"),
2708            metadata: None,
2709        };
2710
2711        let result = stage.execute(non_object_event).unwrap();
2712        assert!(result.is_some());
2713    }
2714
2715    #[test]
2716    fn test_metrics_record_latency_new_stage() {
2717        let mut metrics = Metrics::new();
2718
2719        metrics.record_latency("stage1", 10.0);
2720        metrics.record_latency("stage2", 20.0);
2721        metrics.record_latency("stage1", 15.0);
2722
2723        let prometheus = metrics.to_prometheus();
2724        assert!(prometheus.contains("stage1"));
2725        assert!(prometheus.contains("stage2"));
2726        assert!(prometheus.contains("25"));
2727        assert!(prometheus.contains("20"));
2728    }
2729
2730    #[test]
2731    fn test_input_source_with_io_error_during_read() {
2732        use std::fs;
2733        use std::io::Write;
2734
2735        let temp_file = "test_io_error.ndjson";
2736        let mut file = fs::File::create(temp_file).unwrap();
2737        writeln!(file, r#"{{"level": "info"}}"#).unwrap();
2738        drop(file);
2739
2740        let mut pipeline = Pipeline::new();
2741        pipeline.add_stage(Box::new(StdoutOutput::new()));
2742
2743        let mut input = InputSource::File(temp_file.into());
2744        let mut deadletter: Option<&mut dyn Stage> = None;
2745
2746        let result = input.process_input(&mut pipeline, &mut deadletter);
2747        assert!(result.is_ok());
2748
2749        fs::remove_file(temp_file).unwrap();
2750    }
2751
2752    #[test]
2753    fn test_pipeline_stage_latency_measurement() {
2754        let mut pipeline = Pipeline::new();
2755        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
2756
2757        let event = Event {
2758            data: serde_json::json!({"level": "info", "extra": "removed"}),
2759            metadata: None,
2760        };
2761
2762        pipeline.process_event(event).unwrap();
2763
2764        let prometheus = pipeline.export_prometheus();
2765        assert!(prometheus.contains("feedme_stage_latency_ms"));
2766        assert!(prometheus.contains("FieldSelect"));
2767    }
2768
2769    #[test]
2770    fn test_complete_input_output_cycle() {
2771        use std::fs;
2772        use std::io::Write;
2773        use tempfile::TempDir;
2774
2775        let temp_dir = TempDir::new().unwrap();
2776        let input_file = temp_dir.path().join("input.ndjson");
2777        let output_file = temp_dir.path().join("output.ndjson");
2778
2779        let mut file = fs::File::create(&input_file).unwrap();
2780        writeln!(file, r#"{{"level": "info", "message": "test"}}"#).unwrap();
2781        writeln!(file, r#"{{"level": "error", "message": "error"}}"#).unwrap();
2782        drop(file);
2783
2784        let mut pipeline = Pipeline::new();
2785        pipeline.add_stage(Box::new(Filter::new(Box::new(|e| {
2786            e.get_string("level") == Some("info")
2787        }))));
2788        pipeline.add_stage(Box::new(FileOutput::new(output_file.clone())));
2789
2790        let mut input = InputSource::File(input_file);
2791        let mut deadletter: Option<&mut dyn Stage> = None;
2792
2793        let result = input.process_input(&mut pipeline, &mut deadletter);
2794        assert!(result.is_ok());
2795
2796        let output_content = fs::read_to_string(&output_file).unwrap();
2797        assert!(output_content.contains("info"));
2798        assert!(!output_content.contains("error"));
2799
2800        let prometheus = pipeline.export_prometheus();
2801        assert!(prometheus.contains("feedme_events_processed_total 2"));
2802        assert!(prometheus.contains("feedme_events_dropped_total 1"));
2803    }
2804
2805    #[test]
2806    fn test_value_constraints_missing_field() {
2807        use std::collections::HashMap;
2808        let mut constraints = HashMap::new();
2809        constraints.insert(
2810            "missing_field".to_string(),
2811            Box::new(|_: &serde_json::Value| true) as Box<dyn Fn(&serde_json::Value) -> bool>,
2812        );
2813
2814        let mut stage = ValueConstraints::new(constraints);
2815
2816        let event = Event {
2817            data: serde_json::json!({"different_field": "value"}),
2818            metadata: None,
2819        };
2820
2821        let result = stage.execute(event).unwrap();
2822        assert!(result.is_some());
2823    }
2824
2825    #[test]
2826    fn test_type_checking_missing_field() {
2827        use std::collections::HashMap;
2828        let mut type_checks = HashMap::new();
2829        type_checks.insert("missing_field".to_string(), "string".to_string());
2830
2831        let mut stage = TypeChecking::new(type_checks);
2832
2833        let event = Event {
2834            data: serde_json::json!({"different_field": "value"}),
2835            metadata: None,
2836        };
2837
2838        let result = stage.execute(event).unwrap();
2839        assert!(result.is_some());
2840    }
2841
2842    #[test]
2843    fn test_replay_execution() {
2844        use tempfile::NamedTempFile;
2845        use crate::replay::{record_execution, replay_execution};
2846
2847        let mut pipeline = Pipeline::new();
2848        pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string(), "message".to_string()])));
2849        pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".to_string()])));
2850
2851        let events = vec![
2852            Event {
2853                data: serde_json::json!({"level": "info", "message": "test1", "extra": "ignored"}),
2854                metadata: None,
2855            },
2856            Event {
2857                data: serde_json::json!({"level": "error", "message": "test2"}),
2858                metadata: None,
2859            },
2860        ];
2861
2862        let temp_file = NamedTempFile::new().unwrap();
2863        let trace_path = temp_file.path();
2864
2865        // Record execution
2866        record_execution(&mut pipeline, &events, trace_path).unwrap();
2867
2868        // Create fresh pipeline for replay
2869        let mut replay_pipeline = Pipeline::new();
2870        replay_pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string(), "message".to_string()])));
2871        replay_pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".to_string()])));
2872
2873        // Replay and verify
2874        replay_execution(&mut replay_pipeline, trace_path).unwrap();
2875    }
2876}