1use regex::Regex;
68use serde::{Deserialize, Serialize};
69use std::collections::{BTreeMap, HashMap};
70use std::fmt;
71use std::fs;
72use std::io::{self, BufRead};
73use std::path::PathBuf;
74use std::time::Instant;
75
76pub mod invariant_ppt;
77
78#[cfg(test)]
79mod ppt_invariant_contracts;
80
81pub mod replay;
82
83pub(crate) const INVARIANT_PROCESSED_INCREMENTS_ONCE: &str =
84 "processed increments exactly once per process_event";
85pub(crate) const INVARIANT_ERRORS_INCREMENT_ON_ERROR: &str = "errors increment exactly once per error";
86pub(crate) const INVARIANT_DROPPED_ONLY_FOR_NON_OUTPUT_NONE: &str =
87 "dropped increments only when non-output stage returns None";
88pub(crate) const INVARIANT_OUTPUT_NONE_NOT_DROPPED: &str =
89 "output stage returning None does not count as dropped";
90pub(crate) const INVARIANT_LATENCY_RECORDED_ON_SUCCESS: &str =
91 "latency is recorded for each successful stage execution";
92
93pub type EventDerivationFn = Box<dyn Fn(&Event) -> serde_json::Value>;
95pub type ValueConstraintFn = Box<dyn Fn(&serde_json::Value) -> bool>;
96pub type StageFactoryFn = Box<dyn Fn() -> Box<dyn Stage>>;
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Event {
102 pub data: serde_json::Value,
104 pub metadata: Option<BTreeMap<String, serde_json::Value>>,
106}
107
108impl Event {
109 pub fn from_raw_input(input: &str) -> anyhow::Result<Self> {
111 let data: serde_json::Value = serde_json::from_str(input)?;
112 Ok(Event {
113 data,
114 metadata: None,
115 })
116 }
117
118 pub fn get_string(&self, key: &str) -> Option<&str> {
120 self.data.get(key)?.as_str()
121 }
122
123 pub fn get_number(&self, key: &str) -> Option<f64> {
125 self.data.get(key)?.as_f64()
126 }
127
128 }
130
131#[derive(Debug, Clone)]
134pub enum PipelineError {
135 Parse(ParseError),
136 Transform(TransformError),
137 Validation(ValidationError),
138 Output(OutputError),
139 System(SystemError),
140}
141
142impl fmt::Display for PipelineError {
143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
144 match self {
145 PipelineError::Parse(e) => write!(f, "Parse error: {}", e.message),
146 PipelineError::Transform(e) => write!(f, "Transform error: {}", e.message),
147 PipelineError::Validation(e) => write!(f, "Validation error: {}", e.message),
148 PipelineError::Output(e) => write!(f, "Output error: {}", e.message),
149 PipelineError::System(e) => write!(f, "System error: {}", e.message),
150 }
151 }
152}
153
154impl PipelineError {
155 pub fn category(&self) -> &str {
156 match self {
157 PipelineError::Parse(_) => "Parse",
158 PipelineError::Transform(_) => "Transform",
159 PipelineError::Validation(_) => "Validation",
160 PipelineError::Output(_) => "Output",
161 PipelineError::System(_) => "System",
162 }
163 }
164
165 pub fn stage(&self) -> &str {
166 match self {
167 PipelineError::Parse(e) => &e.stage,
168 PipelineError::Transform(e) => &e.stage,
169 PipelineError::Validation(e) => &e.stage,
170 PipelineError::Output(e) => &e.stage,
171 PipelineError::System(e) => &e.stage,
172 }
173 }
174
175 pub fn code(&self) -> String {
176 match self {
177 PipelineError::Parse(e) => e.code.to_string(),
178 PipelineError::Transform(e) => e.code.to_string(),
179 PipelineError::Validation(e) => e.code.to_string(),
180 PipelineError::Output(e) => e.code.to_string(),
181 PipelineError::System(e) => e.code.to_string(),
182 }
183 }
184
185 pub fn message(&self) -> &str {
186 match self {
187 PipelineError::Parse(e) => &e.message,
188 PipelineError::Transform(e) => &e.message,
189 PipelineError::Validation(e) => &e.message,
190 PipelineError::Output(e) => &e.message,
191 PipelineError::System(e) => &e.message,
192 }
193 }
194}
195
196impl std::error::Error for PipelineError {}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub enum ParseErrorCode {
200 ParseError,
201 Utf8Error,
202 JsonError,
203 Test,
204}
205
206impl fmt::Display for ParseErrorCode {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 match self {
209 ParseErrorCode::ParseError => write!(f, "PARSE_ERROR"),
210 ParseErrorCode::Utf8Error => write!(f, "UTF8_ERROR"),
211 ParseErrorCode::JsonError => write!(f, "JSON_ERROR"),
212 ParseErrorCode::Test => write!(f, "TEST"),
213 }
214 }
215}
216
217#[derive(Debug, Clone)]
218pub struct ParseError {
219 pub stage: String,
220 pub code: ParseErrorCode,
221 pub message: String,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub enum TransformErrorCode {
226 MissingField,
227 TypeMismatch,
228 ConstraintViolation,
229 Test,
230}
231
232impl fmt::Display for TransformErrorCode {
233 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
234 match self {
235 TransformErrorCode::MissingField => write!(f, "MISSING_FIELD"),
236 TransformErrorCode::TypeMismatch => write!(f, "TYPE_MISMATCH"),
237 TransformErrorCode::ConstraintViolation => write!(f, "CONSTRAINT_VIOLATION"),
238 TransformErrorCode::Test => write!(f, "TEST"),
239 }
240 }
241}
242
243#[derive(Debug, Clone)]
244pub struct TransformError {
245 pub stage: String,
246 pub code: TransformErrorCode,
247 pub message: String,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub enum ValidationErrorCode {
252 MissingField,
253 TypeMismatch,
254 ConstraintViolation,
255 Test,
256}
257
258impl fmt::Display for ValidationErrorCode {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 match self {
261 ValidationErrorCode::MissingField => write!(f, "MISSING_FIELD"),
262 ValidationErrorCode::TypeMismatch => write!(f, "TYPE_MISMATCH"),
263 ValidationErrorCode::ConstraintViolation => write!(f, "CONSTRAINT_VIOLATION"),
264 ValidationErrorCode::Test => write!(f, "TEST"),
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
270pub struct ValidationError {
271 pub stage: String,
272 pub code: ValidationErrorCode,
273 pub message: String,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub enum OutputErrorCode {
278 SerializeError,
279 IoError,
280 Test,
281}
282
283impl fmt::Display for OutputErrorCode {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 match self {
286 OutputErrorCode::SerializeError => write!(f, "SERIALIZE_ERROR"),
287 OutputErrorCode::IoError => write!(f, "IO_ERROR"),
288 OutputErrorCode::Test => write!(f, "TEST"),
289 }
290 }
291}
292
293#[derive(Debug, Clone)]
294pub struct OutputError {
295 pub stage: String,
296 pub code: OutputErrorCode,
297 pub message: String,
298}
299
300#[derive(Debug, Clone, PartialEq, Eq)]
301pub enum SystemErrorCode {
302 IoError,
303 Test,
304}
305
306impl fmt::Display for SystemErrorCode {
307 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
308 match self {
309 SystemErrorCode::IoError => write!(f, "IO_ERROR"),
310 SystemErrorCode::Test => write!(f, "TEST"),
311 }
312 }
313}
314
315#[derive(Debug, Clone)]
316pub struct SystemError {
317 pub stage: String,
318 pub code: SystemErrorCode,
319 pub message: String,
320}
321
322#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
323pub enum DropReason {
324 Filtered,
325}
326
327impl fmt::Display for DropReason {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329 match self {
330 DropReason::Filtered => write!(f, "filtered"),
331 }
332 }
333}
334
335#[derive(Debug)]
338pub struct Metrics {
339 events_processed: u64,
340 events_dropped: u64,
341 errors: u64,
342 stage_latencies: HashMap<String, LatencyStats>, drop_reasons: HashMap<DropReason, u64>, }
345
346#[derive(Debug, Clone)]
347pub struct LatencyStats {
348 pub count: u64,
349 pub sum: f64,
350 pub min: f64,
351 pub max: f64,
352}
353
354impl LatencyStats {
355 pub fn new() -> Self {
356 LatencyStats {
357 count: 0,
358 sum: 0.0,
359 min: f64::INFINITY,
360 max: f64::NEG_INFINITY,
361 }
362 }
363
364 pub fn record(&mut self, duration: f64) {
365 self.count += 1;
366 self.sum += duration;
367 if duration < self.min {
368 self.min = duration;
369 }
370 if duration > self.max {
371 self.max = duration;
372 }
373 }
374}
375
376impl Default for LatencyStats {
377 fn default() -> Self {
378 Self::new()
379 }
380}
381
382impl Metrics {
383 pub fn new() -> Self {
384 Metrics {
385 events_processed: 0,
386 events_dropped: 0,
387 errors: 0,
388 stage_latencies: HashMap::new(),
389 drop_reasons: HashMap::new(),
390 }
391 }
392
393 pub fn increment_processed(&mut self) {
394 self.events_processed += 1;
395 }
396
397 pub fn increment_dropped(&mut self, reason: DropReason) {
398 self.events_dropped += 1;
399 *self.drop_reasons.entry(reason).or_insert(0) += 1;
400 }
401
402 pub fn increment_errors(&mut self) {
403 self.errors += 1;
404 }
405
406 pub fn record_latency(&mut self, stage: &str, duration: f64) {
407 self.stage_latencies
408 .entry(stage.to_string())
409 .or_default()
410 .record(duration);
411 }
412
413 pub fn to_prometheus(&self) -> String {
414 let mut output = String::new();
415 output.push_str("# HELP feedme_events_processed_total Total events processed\n");
416 output.push_str(&format!(
417 "feedme_events_processed_total {}\n",
418 self.events_processed
419 ));
420 output.push_str("# HELP feedme_events_dropped_total Total events dropped\n");
421 output.push_str(&format!(
422 "feedme_events_dropped_total {}\n",
423 self.events_dropped
424 ));
425 output.push_str("# HELP feedme_errors_total Total errors\n");
426 output.push_str(&format!("feedme_errors_total {}\n", self.errors));
427 output.push_str("# HELP feedme_stage_latency_ms Stage latency in milliseconds\n");
428 output.push_str("# TYPE feedme_stage_latency_ms gauge\n");
429 for (stage, stats) in &self.stage_latencies {
430 if stats.count > 0 {
431 output.push_str(&format!(
432 "feedme_stage_latency_ms_sum{{stage=\"{}\"}} {}\n",
433 stage, stats.sum
434 ));
435 output.push_str(&format!(
436 "feedme_stage_latency_ms_count{{stage=\"{}\"}} {}\n",
437 stage, stats.count
438 ));
439 output.push_str(&format!(
440 "feedme_stage_latency_ms_min{{stage=\"{}\"}} {}\n",
441 stage, stats.min
442 ));
443 output.push_str(&format!(
444 "feedme_stage_latency_ms_max{{stage=\"{}\"}} {}\n",
445 stage, stats.max
446 ));
447 }
448 }
449 output.push_str("# HELP feedme_drop_reasons_total Drop reasons\n");
450 output.push_str("# TYPE feedme_drop_reasons_total counter\n");
451 for (reason, count) in &self.drop_reasons {
452 output.push_str(&format!(
453 "feedme_drop_reasons_total{{reason=\"{}\"}} {}\n",
454 reason, count
455 ));
456 }
457 output
458 }
459
460 pub fn to_json_logs(&self) -> Vec<String> {
461 let mut logs = Vec::new();
462 logs.push(
463 serde_json::json!({
464 "metric": "events_processed",
465 "value": self.events_processed
466 })
467 .to_string(),
468 );
469 logs.push(
470 serde_json::json!({
471 "metric": "events_dropped",
472 "value": self.events_dropped
473 })
474 .to_string(),
475 );
476 logs.push(
477 serde_json::json!({
478 "metric": "errors",
479 "value": self.errors
480 })
481 .to_string(),
482 );
483 for (stage, stats) in &self.stage_latencies {
484 logs.push(
485 serde_json::json!({
486 "metric": "stage_latencies",
487 "stage": stage,
488 "count": stats.count,
489 "sum": stats.sum,
490 "min": stats.min,
491 "max": stats.max
492 })
493 .to_string(),
494 );
495 }
496 for (reason, count) in &self.drop_reasons {
497 logs.push(
498 serde_json::json!({
499 "metric": "drop_reasons",
500 "reason": reason,
501 "count": count
502 })
503 .to_string(),
504 );
505 }
506 logs
507 }
508}
509
510impl Default for Metrics {
511 fn default() -> Self {
512 Self::new()
513 }
514}
515
516pub trait Stage {
519 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError>;
520 fn name(&self) -> &str;
521 fn is_output(&self) -> bool {
522 false
523 }
524}
525
526pub struct Pipeline {
529 stages: Vec<Box<dyn Stage>>,
530 metrics: Metrics,
531}
532
533impl Pipeline {
534 pub fn new() -> Self {
535 Pipeline {
536 stages: Vec::new(),
537 metrics: Metrics::new(),
538 }
539 }
540
541 pub fn add_stage(&mut self, stage: Box<dyn Stage>) {
542 self.stages.push(stage);
543 }
544
545 pub fn process_event(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
546 let prev_processed = self.metrics.events_processed;
547 let prev_errors = self.metrics.errors;
548 let prev_dropped = self.metrics.events_dropped;
549
550 self.metrics.increment_processed();
551 invariant_ppt::assert_invariant(
552 self.metrics.events_processed == prev_processed + 1,
553 INVARIANT_PROCESSED_INCREMENTS_ONCE,
554 Some("Pipeline::process_event"),
555 );
556
557 let mut current = Some(event);
558 for stage in &mut self.stages {
559 if let Some(evt) = current {
560 let start = Instant::now();
561 match stage.execute(evt) {
562 Ok(opt) => {
563 let duration = start.elapsed().as_secs_f64() * 1000.0;
564
565 let prev_stage_count = self
566 .metrics
567 .stage_latencies
568 .get(stage.name())
569 .map(|s| s.count)
570 .unwrap_or(0);
571 self.metrics.record_latency(stage.name(), duration);
572 let new_stage_count = self
573 .metrics
574 .stage_latencies
575 .get(stage.name())
576 .map(|s| s.count)
577 .unwrap_or(0);
578 invariant_ppt::assert_invariant(
579 new_stage_count == prev_stage_count + 1,
580 INVARIANT_LATENCY_RECORDED_ON_SUCCESS,
581 Some("Pipeline::process_event"),
582 );
583
584 current = opt;
585 if current.is_none() {
586 if !stage.is_output() {
587 let dropped_before = self.metrics.events_dropped;
588 let reason_before = *self
589 .metrics
590 .drop_reasons
591 .get(&DropReason::Filtered)
592 .unwrap_or(&0);
593
594 self.metrics.increment_dropped(DropReason::Filtered);
595
596 let reason_after = *self
597 .metrics
598 .drop_reasons
599 .get(&DropReason::Filtered)
600 .unwrap_or(&0);
601
602 invariant_ppt::assert_invariant(
603 self.metrics.events_dropped == dropped_before + 1
604 && reason_after == reason_before + 1,
605 INVARIANT_DROPPED_ONLY_FOR_NON_OUTPUT_NONE,
606 Some("Pipeline::process_event"),
607 );
608 } else {
609 invariant_ppt::assert_invariant(
610 self.metrics.events_dropped == prev_dropped,
611 INVARIANT_OUTPUT_NONE_NOT_DROPPED,
612 Some("Pipeline::process_event"),
613 );
614 }
615 }
616 }
617 Err(e) => {
618 self.metrics.increment_errors();
619 invariant_ppt::assert_invariant(
620 self.metrics.errors == prev_errors + 1,
621 INVARIANT_ERRORS_INCREMENT_ON_ERROR,
622 Some("Pipeline::process_event"),
623 );
624 return Err(e);
625 }
626 }
627 } else {
628 break;
629 }
630 }
631
632 invariant_ppt::assert_invariant(
634 self.metrics.events_processed >= prev_processed
635 && self.metrics.errors >= prev_errors
636 && self.metrics.events_dropped >= prev_dropped,
637 "metrics counters are monotonic",
638 Some("Pipeline::process_event"),
639 );
640 Ok(current)
641 }
642
643 pub fn export_prometheus(&self) -> String {
644 self.metrics.to_prometheus()
645 }
646
647 pub fn export_json_logs(&self) -> Vec<String> {
648 self.metrics.to_json_logs()
649 }
650}
651
652impl Default for Pipeline {
653 fn default() -> Self {
654 Self::new()
655 }
656}
657
658pub enum InputSource {
661 Stdin,
662 File(PathBuf),
663 Directory(PathBuf), }
665
666impl InputSource {
667 pub fn process_input(
668 &mut self,
669 pipeline: &mut Pipeline,
670 deadletter: &mut Option<&mut dyn Stage>,
671 ) -> Result<(), PipelineError> {
672 match self {
673 InputSource::Stdin => {
674 let stdin = io::stdin();
675 let lines = stdin.lines();
676 for line in lines {
677 let line = line.map_err(|e| {
678 PipelineError::System(SystemError {
679 stage: "Input_Stdin".to_string(),
680 code: SystemErrorCode::IoError,
681 message: e.to_string(),
682 })
683 })?;
684 let event = match Event::from_raw_input(&line) {
685 Ok(e) => e,
686 Err(e) => {
687 if let Some(ref mut dl) = *deadletter {
688 let error_event = Event {
689 data: serde_json::json!({
690 "error": "parse",
691 "stage": "Input_Stdin",
692 "code": "PARSE_ERROR",
693 "message": e.to_string(),
694 "raw": line
695 }),
696 metadata: None,
697 };
698 let _ = dl.execute(error_event); } else {
700 return Err(PipelineError::Parse(ParseError {
701 stage: "Input_Stdin".to_string(),
702 code: ParseErrorCode::ParseError,
703 message: e.to_string(),
704 }));
705 }
706 continue;
707 }
708 };
709 match pipeline.process_event(event) {
710 Ok(_) => {}
711 Err(e) => {
712 if let Some(ref mut dl) = *deadletter {
713 let error_event = Event {
714 data: serde_json::json!({
715 "error": "pipeline",
716 "category": e.category(),
717 "stage": e.stage(),
718 "code": e.code(),
719 "message": e.message(),
720 "raw": line
721 }),
722 metadata: None,
723 };
724 let _ = dl.execute(error_event);
725 } else {
726 return Err(e);
727 }
728 }
729 }
730 }
731 Ok(())
732 }
733 InputSource::File(path) => {
734 let file = fs::File::open(path).map_err(|e| {
735 PipelineError::System(SystemError {
736 stage: "Input_File".to_string(),
737 code: SystemErrorCode::IoError,
738 message: e.to_string(),
739 })
740 })?;
741 let lines = io::BufReader::new(file).lines();
742 for line in lines {
743 let line = line.map_err(|e| {
744 PipelineError::System(SystemError {
745 stage: "Input_File".to_string(),
746 code: SystemErrorCode::IoError,
747 message: e.to_string(),
748 })
749 })?;
750 let event = match Event::from_raw_input(&line) {
751 Ok(e) => e,
752 Err(e) => {
753 if let Some(ref mut dl) = *deadletter {
754 let error_event = Event {
755 data: serde_json::json!({
756 "error": "parse",
757 "stage": "Input_File",
758 "code": "PARSE_ERROR",
759 "message": e.to_string(),
760 "raw": line
761 }),
762 metadata: None,
763 };
764 let _ = dl.execute(error_event);
765 } else {
766 return Err(PipelineError::Parse(ParseError {
767 stage: "Input_File".to_string(),
768 code: ParseErrorCode::ParseError,
769 message: e.to_string(),
770 }));
771 }
772 continue;
773 }
774 };
775 match pipeline.process_event(event) {
776 Ok(_) => {}
777 Err(e) => {
778 if let Some(ref mut dl) = *deadletter {
779 let error_event = Event {
780 data: serde_json::json!({
781 "error": "pipeline",
782 "category": e.category(),
783 "stage": e.stage(),
784 "code": e.code(),
785 "message": e.message(),
786 "raw": line
787 }),
788 metadata: None,
789 };
790 let _ = dl.execute(error_event);
791 } else {
792 return Err(e);
793 }
794 }
795 }
796 }
797 Ok(())
798 }
799 InputSource::Directory(dir) => {
800 let entries = fs::read_dir(dir).map_err(|e| {
801 PipelineError::System(SystemError {
802 stage: "Input_Directory".to_string(),
803 code: SystemErrorCode::IoError,
804 message: e.to_string(),
805 })
806 })?;
807 let mut paths: Vec<PathBuf> = Vec::new();
808 for entry in entries {
809 let entry = entry.map_err(|e| {
810 PipelineError::System(SystemError {
811 stage: "Input_Directory".to_string(),
812 code: SystemErrorCode::IoError,
813 message: e.to_string(),
814 })
815 })?;
816 let path = entry.path();
817 if path.is_file() {
818 paths.push(path);
819 }
820 }
822 paths.sort();
823 for path in paths {
824 let mut file_input = InputSource::File(path);
825 file_input.process_input(pipeline, deadletter)?;
826 }
827 Ok(())
828 }
829 }
830 }
831}
832
833pub trait Parser {
836 fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError>;
837}
838
839pub struct NDJSONParser;
840
841impl Parser for NDJSONParser {
842 fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError> {
843 let s = std::str::from_utf8(raw).map_err(|e| {
844 PipelineError::Parse(ParseError {
845 stage: "NDJSON".to_string(),
846 code: ParseErrorCode::Utf8Error,
847 message: e.to_string(),
848 })
849 })?;
850 Event::from_raw_input(s).map_err(|e| {
851 PipelineError::Parse(ParseError {
852 stage: "NDJSON".to_string(),
853 code: ParseErrorCode::JsonError,
854 message: e.to_string(),
855 })
856 })
857 }
858}
859
860pub struct JSONArrayParser;
861
862impl Parser for JSONArrayParser {
863 fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError> {
864 let s = std::str::from_utf8(raw).map_err(|e| {
865 PipelineError::Parse(ParseError {
866 stage: "JSONArray".to_string(),
867 code: ParseErrorCode::Utf8Error,
868 message: e.to_string(),
869 })
870 })?;
871 let value: serde_json::Value = serde_json::from_str(s).map_err(|e| {
872 PipelineError::Parse(ParseError {
873 stage: "JSONArray".to_string(),
874 code: ParseErrorCode::JsonError,
875 message: e.to_string(),
876 })
877 })?;
878 Ok(Event {
880 data: value,
881 metadata: None,
882 })
883 }
884}
885
886pub struct SyslogParser;
887
888impl Parser for SyslogParser {
889 fn parse(&self, raw: &[u8]) -> Result<Event, PipelineError> {
890 let s = std::str::from_utf8(raw).map_err(|e| {
892 PipelineError::Parse(ParseError {
893 stage: "Syslog".to_string(),
894 code: ParseErrorCode::Utf8Error,
895 message: e.to_string(),
896 })
897 })?;
898 Ok(Event {
901 data: serde_json::json!({ "message": s }),
902 metadata: None,
903 })
904 }
905}
906
907pub trait Transform: Stage {}
910
911pub struct FieldSelect {
912 fields: Vec<String>,
913}
914
915impl FieldSelect {
916 pub fn new(fields: Vec<String>) -> Self {
917 FieldSelect { fields }
918 }
919}
920
921impl Stage for FieldSelect {
922 fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
923 if let serde_json::Value::Object(ref mut map) = event.data {
924 let mut new_map = serde_json::Map::new();
925 for field in &self.fields {
926 if let Some(value) = map.remove(field) {
927 new_map.insert(field.clone(), value);
928 }
929 }
930 event.data = serde_json::Value::Object(new_map);
931 }
932 Ok(Some(event))
933 }
934
935 fn name(&self) -> &str {
936 "FieldSelect"
937 }
938}
939
940impl Transform for FieldSelect {}
941
942pub struct FieldRemap {
943 mappings: HashMap<String, String>,
944}
945
946impl FieldRemap {
947 pub fn new(mappings: HashMap<String, String>) -> Self {
948 FieldRemap { mappings }
949 }
950}
951
952impl Stage for FieldRemap {
953 fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
954 if let serde_json::Value::Object(ref mut map) = event.data {
955 for (old_key, new_key) in &self.mappings {
956 if let Some(value) = map.remove(old_key) {
957 map.insert(new_key.clone(), value);
958 }
959 }
960 }
961 Ok(Some(event))
962 }
963
964 fn name(&self) -> &str {
965 "FieldRemap"
966 }
967}
968
969impl Transform for FieldRemap {}
970
971pub struct PIIRedaction {
972 patterns: Vec<Regex>,
973}
974
975impl PIIRedaction {
976 pub fn new(patterns: Vec<Regex>) -> Self {
977 PIIRedaction { patterns }
978 }
979}
980
981impl Stage for PIIRedaction {
982 fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
983 if let serde_json::Value::Object(ref mut map) = event.data {
984 for (_, value) in map.iter_mut() {
985 if let serde_json::Value::String(ref mut s) = value {
986 for pattern in &self.patterns {
987 *s = pattern.replace_all(s, "[REDACTED]").to_string();
988 }
989 }
990 }
991 }
992 Ok(Some(event))
993 }
994
995 fn name(&self) -> &str {
996 "PIIRedaction"
997 }
998}
999
1000impl Transform for PIIRedaction {}
1001
1002pub struct DerivedFields {
1003 derivations: HashMap<String, EventDerivationFn>,
1004}
1005
1006impl DerivedFields {
1007 pub fn new(derivations: HashMap<String, EventDerivationFn>) -> Self {
1008 DerivedFields { derivations }
1009 }
1010}
1011
1012impl Stage for DerivedFields {
1013 fn execute(&mut self, mut event: Event) -> Result<Option<Event>, PipelineError> {
1014 let mut new_values = Vec::new();
1015 for (key, func) in &self.derivations {
1016 new_values.push((key.clone(), func(&event)));
1017 }
1018 if let serde_json::Value::Object(ref mut map) = event.data {
1019 for (key, value) in new_values {
1020 map.insert(key, value);
1021 }
1022 }
1023 Ok(Some(event))
1024 }
1025
1026 fn name(&self) -> &str {
1027 "DerivedFields"
1028 }
1029}
1030
1031impl Transform for DerivedFields {}
1032
1033pub struct Filter {
1034 condition: Box<dyn Fn(&Event) -> bool>,
1035}
1036
1037impl Filter {
1038 pub fn new(condition: Box<dyn Fn(&Event) -> bool>) -> Self {
1039 Filter { condition }
1040 }
1041}
1042
1043impl Stage for Filter {
1044 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1045 if (self.condition)(&event) {
1046 Ok(Some(event))
1047 } else {
1048 Ok(None)
1049 }
1050 }
1051
1052 fn name(&self) -> &str {
1053 "Filter"
1054 }
1055}
1056
1057impl Transform for Filter {}
1058
1059pub trait Validator: Stage {}
1062
1063pub struct RequiredFields {
1064 fields: Vec<String>,
1065}
1066
1067impl RequiredFields {
1068 pub fn new(fields: Vec<String>) -> Self {
1069 RequiredFields { fields }
1070 }
1071}
1072
1073impl Stage for RequiredFields {
1074 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1075 if let serde_json::Value::Object(map) = &event.data {
1076 for field in &self.fields {
1077 if !map.contains_key(field) {
1078 return Err(PipelineError::Validation(ValidationError {
1079 stage: "RequiredFields".to_string(),
1080 code: ValidationErrorCode::MissingField,
1081 message: format!("Missing required field: {}", field),
1082 }));
1083 }
1084 }
1085 }
1086 Ok(Some(event))
1087 }
1088
1089 fn name(&self) -> &str {
1090 "RequiredFields"
1091 }
1092}
1093
1094impl Validator for RequiredFields {}
1095
1096pub struct TypeChecking {
1097 type_checks: HashMap<String, String>, }
1099
1100impl TypeChecking {
1101 pub fn new(type_checks: HashMap<String, String>) -> Self {
1102 TypeChecking { type_checks }
1103 }
1104}
1105
1106impl Stage for TypeChecking {
1107 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1108 if let serde_json::Value::Object(map) = &event.data {
1109 for (field, expected_type) in &self.type_checks {
1110 if let Some(value) = map.get(field) {
1111 let actual_type = match value {
1112 serde_json::Value::String(_) => "string",
1113 serde_json::Value::Number(_) => "number",
1114 serde_json::Value::Bool(_) => "boolean",
1115 serde_json::Value::Object(_) => "object",
1116 serde_json::Value::Array(_) => "array",
1117 serde_json::Value::Null => "null",
1118 };
1119 if actual_type != expected_type {
1120 return Err(PipelineError::Validation(ValidationError {
1121 stage: "TypeChecking".to_string(),
1122 code: ValidationErrorCode::TypeMismatch,
1123 message: format!(
1124 "Field {} expected {} but got {}",
1125 field, expected_type, actual_type
1126 ),
1127 }));
1128 }
1129 }
1130 }
1131 }
1132 Ok(Some(event))
1133 }
1134
1135 fn name(&self) -> &str {
1136 "TypeChecking"
1137 }
1138}
1139
1140impl Validator for TypeChecking {}
1141
1142pub struct ValueConstraints {
1143 constraints: HashMap<String, ValueConstraintFn>,
1144}
1145
1146impl ValueConstraints {
1147 pub fn new(constraints: HashMap<String, ValueConstraintFn>) -> Self {
1148 ValueConstraints { constraints }
1149 }
1150}
1151
1152impl Stage for ValueConstraints {
1153 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1154 if let serde_json::Value::Object(map) = &event.data {
1155 for (field, check) in &self.constraints {
1156 if let Some(value) = map.get(field) {
1157 if !check(value) {
1158 return Err(PipelineError::Validation(ValidationError {
1159 stage: "ValueConstraints".to_string(),
1160 code: ValidationErrorCode::ConstraintViolation,
1161 message: format!("Field {} violates constraint", field),
1162 }));
1163 }
1164 }
1165 }
1166 }
1167 Ok(Some(event))
1168 }
1169
1170 fn name(&self) -> &str {
1171 "ValueConstraints"
1172 }
1173}
1174
1175impl Validator for ValueConstraints {}
1176
1177pub trait Output: Stage {}
1180
1181pub struct StdoutOutput;
1182
1183impl StdoutOutput {
1184 pub fn new() -> Self {
1185 StdoutOutput
1186 }
1187}
1188
1189impl Stage for StdoutOutput {
1190 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1191 println!(
1192 "{}",
1193 serde_json::to_string(&event.data).map_err(|e| PipelineError::Output(OutputError {
1194 stage: "Stdout".to_string(),
1195 code: OutputErrorCode::SerializeError,
1196 message: e.to_string(),
1197 }))?
1198 );
1199 Ok(None) }
1201
1202 fn name(&self) -> &str {
1203 "StdoutOutput"
1204 }
1205
1206 fn is_output(&self) -> bool {
1207 true
1208 }
1209}
1210
1211impl Output for StdoutOutput {}
1212
1213impl Default for StdoutOutput {
1214 fn default() -> Self {
1215 Self::new()
1216 }
1217}
1218
1219pub struct FileOutput {
1220 path: PathBuf,
1221}
1222
1223impl FileOutput {
1224 pub fn new(path: PathBuf) -> Self {
1225 FileOutput { path }
1226 }
1227}
1228
1229impl Stage for FileOutput {
1230 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1231 use std::io::Write;
1232 let mut file = fs::OpenOptions::new()
1233 .append(true)
1234 .create(true)
1235 .open(&self.path)
1236 .map_err(|e| {
1237 PipelineError::Output(OutputError {
1238 stage: "File".to_string(),
1239 code: OutputErrorCode::IoError,
1240 message: e.to_string(),
1241 })
1242 })?;
1243 writeln!(
1244 file,
1245 "{}",
1246 serde_json::to_string(&event.data).map_err(|e| PipelineError::Output(OutputError {
1247 stage: "File".to_string(),
1248 code: OutputErrorCode::SerializeError,
1249 message: e.to_string(),
1250 }))?
1251 )
1252 .map_err(|e| {
1253 PipelineError::Output(OutputError {
1254 stage: "File".to_string(),
1255 code: OutputErrorCode::IoError,
1256 message: e.to_string(),
1257 })
1258 })?;
1259 Ok(None) }
1261
1262 fn name(&self) -> &str {
1263 "FileOutput"
1264 }
1265
1266 fn is_output(&self) -> bool {
1267 true
1268 }
1269}
1270
1271impl Output for FileOutput {}
1272
1273pub struct Deadletter {
1274 path: PathBuf,
1275}
1276
1277impl Deadletter {
1278 pub fn new(path: PathBuf) -> Self {
1279 Deadletter { path }
1280 }
1281}
1282
1283impl Stage for Deadletter {
1284 fn execute(&mut self, event: Event) -> Result<Option<Event>, PipelineError> {
1285 use std::io::Write;
1286 let mut file = fs::OpenOptions::new()
1287 .append(true)
1288 .create(true)
1289 .open(&self.path)
1290 .map_err(|e| {
1291 PipelineError::Output(OutputError {
1292 stage: "Deadletter".to_string(),
1293 code: OutputErrorCode::IoError,
1294 message: e.to_string(),
1295 })
1296 })?;
1297 writeln!(
1298 file,
1299 "{}",
1300 serde_json::to_string(&event).map_err(|e| PipelineError::Output(OutputError {
1301 stage: "Deadletter".to_string(),
1302 code: OutputErrorCode::SerializeError,
1303 message: e.to_string(),
1304 }))?
1305 )
1306 .map_err(|e| {
1307 PipelineError::Output(OutputError {
1308 stage: "Deadletter".to_string(),
1309 code: OutputErrorCode::IoError,
1310 message: e.to_string(),
1311 })
1312 })?;
1313 Ok(None) }
1315
1316 fn name(&self) -> &str {
1317 "Deadletter"
1318 }
1319
1320 fn is_output(&self) -> bool {
1321 true
1322 }
1323}
1324
1325impl Output for Deadletter {}
1326
1327#[derive(Debug, serde::Deserialize)]
1330#[serde(deny_unknown_fields)]
1331pub struct Config {
1332 version: u32,
1333 }
1335
1336impl Config {
1337 pub fn from_yaml(yaml: &str) -> anyhow::Result<Self> {
1338 let config: Config = serde_yaml::from_str(yaml)?;
1339 if config.version != 1 {
1340 return Err(anyhow::anyhow!("Unsupported version: {}", config.version));
1341 }
1342 Ok(config)
1343 }
1344}
1345
1346pub struct PluginRegistry {
1349 plugins: HashMap<String, StageFactoryFn>,
1350}
1351
1352impl PluginRegistry {
1353 pub fn new() -> Self {
1354 PluginRegistry {
1355 plugins: HashMap::new(),
1356 }
1357 }
1358
1359 pub fn register(&mut self, name: String, factory: StageFactoryFn) {
1360 self.plugins.insert(name, factory);
1361 }
1362
1363 pub fn get_stage(&self, name: &str) -> Option<Box<dyn Stage>> {
1364 self.plugins.get(name).map(|f| f())
1365 }
1366}
1367
1368impl Default for PluginRegistry {
1369 fn default() -> Self {
1370 Self::new()
1371 }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376 use super::*;
1377
1378 #[test]
1379 fn test_event_creation() {
1380 let data = serde_json::json!({"key": "value"});
1381 let event = Event {
1382 data,
1383 metadata: None,
1384 };
1385 assert_eq!(event.get_string("key"), Some("value"));
1386 assert_eq!(event.get_string("missing"), None);
1387 }
1388
1389 #[test]
1390 fn test_event_from_raw_input() {
1391 let input = r#"{"level": "info", "message": "test"}"#;
1392 let event = Event::from_raw_input(input).unwrap();
1393 assert_eq!(event.get_string("level"), Some("info"));
1394 assert_eq!(event.get_string("message"), Some("test"));
1395 }
1396
1397 #[test]
1398 fn test_pipeline_creation() {
1399 let pipeline = Pipeline::new();
1400 assert_eq!(pipeline.stages.len(), 0);
1401 }
1402
1403 #[test]
1404 fn test_pipeline_add_stage() {
1405 let mut pipeline = Pipeline::new();
1406 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1407 assert_eq!(pipeline.stages.len(), 1);
1408 }
1409
1410 #[test]
1411 fn test_field_select_stage() {
1412 let mut stage = FieldSelect::new(vec!["level".to_string(), "message".to_string()]);
1413 let event = Event {
1414 data: serde_json::json!({"level": "info", "message": "test", "extra": "value"}),
1415 metadata: None,
1416 };
1417 let result = stage.execute(event).unwrap();
1418 assert!(result.is_some());
1419 let filtered = result.unwrap();
1420 assert_eq!(filtered.data.get("level"), Some(&serde_json::json!("info")));
1421 assert_eq!(
1422 filtered.data.get("message"),
1423 Some(&serde_json::json!("test"))
1424 );
1425 assert_eq!(filtered.data.get("extra"), None);
1426 }
1427
1428 #[test]
1429 fn test_filter_stage() {
1430 let mut filter = Filter::new(Box::new(|e| e.get_string("level") == Some("info")));
1431 let info_event = Event {
1432 data: serde_json::json!({"level": "info"}),
1433 metadata: None,
1434 };
1435 let warn_event = Event {
1436 data: serde_json::json!({"level": "warn"}),
1437 metadata: None,
1438 };
1439 assert!(filter.execute(info_event).unwrap().is_some());
1440 assert!(filter.execute(warn_event).unwrap().is_none());
1441 }
1442
1443 #[test]
1444 fn test_required_fields_stage() {
1445 let mut stage = RequiredFields::new(vec!["level".to_string(), "message".to_string()]);
1446 let valid_event = Event {
1447 data: serde_json::json!({"level": "info", "message": "test"}),
1448 metadata: None,
1449 };
1450 let invalid_event = Event {
1451 data: serde_json::json!({"level": "info"}),
1452 metadata: None,
1453 };
1454 assert!(stage.execute(valid_event).unwrap().is_some());
1455 assert!(stage.execute(invalid_event).is_err());
1456 }
1457
1458 #[test]
1459 fn test_pii_redaction_stage() {
1460 let patterns = vec![regex::Regex::new(r"\b\d{3}-\d{2}-\d{4}\b").unwrap()]; let mut stage = PIIRedaction::new(patterns);
1462 let event = Event {
1463 data: serde_json::json!({"ssn": "123-45-6789", "name": "John"}),
1464 metadata: None,
1465 };
1466 let result = stage.execute(event).unwrap().unwrap();
1467 assert_eq!(result.get_string("ssn"), Some("[REDACTED]"));
1468 assert_eq!(result.get_string("name"), Some("John"));
1469 }
1470
1471 #[test]
1472 fn test_pipeline_execution_success() {
1473 let mut pipeline = Pipeline::new();
1474 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1475 let event = Event {
1476 data: serde_json::json!({"level": "info", "extra": "value"}),
1477 metadata: None,
1478 };
1479 let result = pipeline.process_event(event).unwrap();
1480 assert!(result.is_some());
1481 let processed = result.unwrap();
1482 assert_eq!(processed.get_string("level"), Some("info"));
1483 assert_eq!(processed.get_string("extra"), None);
1484 }
1485
1486 #[test]
1487 fn test_pipeline_execution_error() {
1488 let mut pipeline = Pipeline::new();
1489 pipeline.add_stage(Box::new(RequiredFields::new(vec!["missing".to_string()])));
1490 let event = Event {
1491 data: serde_json::json!({"level": "info"}),
1492 metadata: None,
1493 };
1494 let result = pipeline.process_event(event);
1495 assert!(result.is_err());
1496 if let Err(PipelineError::Validation(_)) = result {
1497 } else {
1499 panic!("Expected Validation error");
1500 }
1501 }
1502
1503 #[test]
1504 fn test_metrics_increment() {
1505 let mut metrics = Metrics::new();
1506 assert_eq!(metrics.events_processed, 0);
1507 metrics.increment_processed();
1508 assert_eq!(metrics.events_processed, 1);
1509 }
1510
1511 #[test]
1512 fn test_metrics_dropped() {
1513 let mut metrics = Metrics::new();
1514 metrics.increment_dropped(DropReason::Filtered);
1515 assert_eq!(metrics.events_dropped, 1);
1516 assert_eq!(metrics.drop_reasons.get(&DropReason::Filtered), Some(&1));
1517 }
1518
1519 #[test]
1520 fn test_latency_stats() {
1521 let mut stats = LatencyStats::new();
1522 stats.record(1.0);
1523 stats.record(3.0);
1524 stats.record(2.0);
1525 assert_eq!(stats.count, 3);
1526 assert_eq!(stats.sum, 6.0);
1527 assert_eq!(stats.min, 1.0);
1528 assert_eq!(stats.max, 3.0);
1529 }
1530
1531 #[test]
1532 fn test_std_output_stage() {
1533 let mut stage = StdoutOutput::new();
1534 let event = Event {
1535 data: serde_json::json!({"test": "data"}),
1536 metadata: None,
1537 };
1538 let result = stage.execute(event).unwrap();
1540 assert!(result.is_none()); assert!(stage.is_output());
1542 }
1543
1544 #[test]
1545 fn test_file_output_stage() {
1546 use std::fs;
1547 use std::path::PathBuf;
1548 let temp_file = PathBuf::from("test_output.ndjson");
1549 let mut stage = FileOutput::new(temp_file.clone());
1550 let event = Event {
1551 data: serde_json::json!({"test": "data"}),
1552 metadata: None,
1553 };
1554 let result = stage.execute(event).unwrap();
1555 assert!(result.is_none());
1556 assert!(stage.is_output());
1557 assert!(temp_file.exists());
1559 let content = fs::read_to_string(&temp_file).unwrap();
1560 assert!(content.contains("test"));
1561 fs::remove_file(temp_file).unwrap();
1563 }
1564
1565 #[test]
1566 fn test_deadletter_stage() {
1567 use std::fs;
1568 use std::path::PathBuf;
1569 let temp_file = PathBuf::from("test_deadletter.ndjson");
1570 let mut stage = Deadletter::new(temp_file.clone());
1571 let event = Event {
1572 data: serde_json::json!({"error": "test", "message": "failed"}),
1573 metadata: None,
1574 };
1575 let result = stage.execute(event).unwrap();
1576 assert!(result.is_none());
1577 assert!(stage.is_output());
1578 assert!(temp_file.exists());
1580 let content = fs::read_to_string(&temp_file).unwrap();
1581 assert!(content.contains("test"));
1582 fs::remove_file(temp_file).unwrap();
1584 }
1585
1586 #[test]
1587 fn test_directory_ingest_determinism() {
1588 use std::fs;
1589 use tempfile::TempDir;
1590 let temp_dir = TempDir::new().unwrap();
1591
1592 let file_z = temp_dir.path().join("z.ndjson");
1594 let file_a = temp_dir.path().join("a.ndjson");
1595 let file_m = temp_dir.path().join("m.ndjson");
1596
1597 fs::write(&file_z, r#"{"file": "z"}"#).unwrap();
1598 fs::write(&file_a, r#"{"file": "a"}"#).unwrap();
1599 fs::write(&file_m, r#"{"file": "m"}"#).unwrap();
1600
1601 let mut pipeline = Pipeline::new();
1603 pipeline.add_stage(Box::new(StdoutOutput::new()));
1604
1605 let mut input_source = InputSource::Directory(temp_dir.path().to_path_buf());
1608 let result = input_source.process_input(&mut pipeline, &mut None);
1609
1610 assert!(result.is_ok());
1612
1613 let prometheus = pipeline.export_prometheus();
1615 assert!(prometheus.contains("feedme_events_processed_total 3"));
1616
1617 assert!(file_a.exists());
1619 assert!(file_m.exists());
1620 assert!(file_z.exists());
1621 }
1622
1623 #[test]
1624 fn test_deadletter_attribution() {
1625 use std::fs;
1626 use std::path::PathBuf;
1627 let temp_file = PathBuf::from("test_deadletter_attr.ndjson");
1628
1629 let mut pipeline = Pipeline::new();
1630 pipeline.add_stage(Box::new(RequiredFields::new(vec![
1631 "missing_field".to_string()
1632 ])));
1633
1634 let mut deadletter = Deadletter::new(temp_file.clone());
1635
1636 let event = Event {
1637 data: serde_json::json!({"existing_field": "value"}),
1638 metadata: None,
1639 };
1640
1641 let result = pipeline.process_event(event);
1643 assert!(result.is_err());
1644
1645 if let Err(e) = result {
1647 let error_event = Event {
1648 data: serde_json::json!({
1649 "error": "pipeline",
1650 "category": e.category(),
1651 "stage": e.stage(),
1652 "code": e.code(),
1653 "message": e.message()
1654 }),
1655 metadata: None,
1656 };
1657 deadletter.execute(error_event).unwrap();
1658 }
1659
1660 assert!(temp_file.exists());
1662 let content = fs::read_to_string(&temp_file).unwrap();
1663 let first_line = content.lines().next().unwrap();
1664 let deadletter_json: serde_json::Value = serde_json::from_str(first_line).unwrap();
1665
1666 assert_eq!(deadletter_json["data"]["error"], "pipeline");
1667 assert_eq!(deadletter_json["data"]["category"], "Validation");
1668 assert_eq!(deadletter_json["data"]["stage"], "RequiredFields");
1669 assert_eq!(deadletter_json["data"]["code"], "MISSING_FIELD");
1670 assert!(deadletter_json["data"]["message"]
1671 .as_str()
1672 .unwrap()
1673 .contains("Missing required field"));
1674
1675 fs::remove_file(temp_file).unwrap();
1677 }
1678
1679 #[test]
1680 fn test_pipeline_metrics_export() {
1681 let mut pipeline = Pipeline::new();
1682 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1683 let event = Event {
1684 data: serde_json::json!({"level": "info"}),
1685 metadata: None,
1686 };
1687 pipeline.process_event(event).unwrap();
1688 let json_logs = pipeline.export_json_logs();
1689 assert!(!json_logs.is_empty());
1690 assert!(json_logs.iter().any(|s| s.contains("events_processed")));
1691 let prometheus = pipeline.export_prometheus();
1692 assert!(prometheus.contains("# HELP feedme_events_processed_total"));
1693 assert!(prometheus.contains("feedme_events_processed_total 1"));
1694 }
1695
1696 #[test]
1697 fn test_error_taxonomy() {
1698 let parse_err = PipelineError::Parse(ParseError {
1699 stage: "test".to_string(),
1700 code: ParseErrorCode::Test,
1701 message: "test".to_string(),
1702 });
1703 assert_eq!(parse_err.category(), "Parse");
1704 assert_eq!(parse_err.stage(), "test");
1705 assert_eq!(parse_err.code(), "TEST");
1706 assert_eq!(parse_err.message(), "test");
1707
1708 let transform_err = PipelineError::Transform(TransformError {
1709 stage: "transform_test".to_string(),
1710 code: TransformErrorCode::Test,
1711 message: "transform test".to_string(),
1712 });
1713 assert_eq!(transform_err.category(), "Transform");
1714 assert_eq!(transform_err.stage(), "transform_test");
1715 assert_eq!(transform_err.code(), "TEST");
1716 assert_eq!(transform_err.message(), "transform test");
1717
1718 let validation_err = PipelineError::Validation(ValidationError {
1719 stage: "validation_test".to_string(),
1720 code: ValidationErrorCode::Test,
1721 message: "validation test".to_string(),
1722 });
1723 assert_eq!(validation_err.category(), "Validation");
1724 assert_eq!(validation_err.stage(), "validation_test");
1725 assert_eq!(validation_err.code(), "TEST");
1726 assert_eq!(validation_err.message(), "validation test");
1727
1728 let output_err = PipelineError::Output(OutputError {
1729 stage: "output_test".to_string(),
1730 code: OutputErrorCode::Test,
1731 message: "output test".to_string(),
1732 });
1733 assert_eq!(output_err.category(), "Output");
1734 assert_eq!(output_err.stage(), "output_test");
1735 assert_eq!(output_err.code(), "TEST");
1736 assert_eq!(output_err.message(), "output test");
1737
1738 let system_err = PipelineError::System(SystemError {
1739 stage: "system_test".to_string(),
1740 code: SystemErrorCode::Test,
1741 message: "system test".to_string(),
1742 });
1743 assert_eq!(system_err.category(), "System");
1744 assert_eq!(system_err.stage(), "system_test");
1745 assert_eq!(system_err.code(), "TEST");
1746 assert_eq!(system_err.message(), "system test");
1747 }
1748
1749 #[test]
1750 fn test_input_source_file() {
1751 use std::fs;
1752 use std::io::Write;
1753 let temp_file = "test_input.ndjson";
1754 let mut file = fs::File::create(temp_file).unwrap();
1755 writeln!(file, r#"{{"level": "info"}}"#).unwrap();
1756 drop(file);
1757
1758 let mut pipeline = Pipeline::new();
1759 pipeline.add_stage(Box::new(StdoutOutput::new()));
1760 let mut input = InputSource::File(temp_file.into());
1761 let mut deadletter: Option<&mut dyn Stage> = None;
1762 let result = input.process_input(&mut pipeline, &mut deadletter);
1763 assert!(result.is_ok());
1764
1765 fs::remove_file(temp_file).unwrap();
1767 }
1768
1769 #[test]
1770 fn test_input_source_file_parse_error() {
1771 use std::fs;
1772 use std::io::Write;
1773 let temp_file = "test_invalid.ndjson";
1774 let mut file = fs::File::create(temp_file).unwrap();
1775 writeln!(file, "invalid json").unwrap();
1776 drop(file);
1777
1778 let mut pipeline = Pipeline::new();
1779 pipeline.add_stage(Box::new(StdoutOutput::new()));
1780 let mut input = InputSource::File(temp_file.into());
1781 let mut deadletter: Option<&mut dyn Stage> = None;
1782 let result = input.process_input(&mut pipeline, &mut deadletter);
1783 assert!(result.is_err()); fs::remove_file(temp_file).unwrap();
1787 }
1788
1789 #[test]
1790 fn test_input_source_stdin() {
1791 let _input = InputSource::Stdin;
1793 }
1795
1796 #[test]
1797 fn test_type_checking_stage() {
1798 use std::collections::HashMap;
1799 let mut type_checks = HashMap::new();
1800 type_checks.insert("level".to_string(), "string".to_string());
1801 type_checks.insert("count".to_string(), "number".to_string());
1802
1803 let mut stage = TypeChecking::new(type_checks);
1804
1805 let valid_event = Event {
1807 data: serde_json::json!({"level": "info", "count": 42}),
1808 metadata: None,
1809 };
1810 let result = stage.execute(valid_event).unwrap();
1811 assert!(result.is_some());
1812
1813 let invalid_event = Event {
1815 data: serde_json::json!({"level": 123, "count": "not_a_number"}),
1816 metadata: None,
1817 };
1818 let result = stage.execute(invalid_event);
1819 assert!(result.is_err());
1820 assert_eq!(result.unwrap_err().category(), "Validation");
1821 }
1822
1823 #[test]
1824 fn test_value_constraints_stage() {
1825 use std::collections::HashMap;
1826 let mut constraints = HashMap::new();
1827 constraints.insert(
1828 "count".to_string(),
1829 Box::new(|v: &serde_json::Value| v.as_i64().map(|n| n >= 0).unwrap_or(false))
1830 as Box<dyn Fn(&serde_json::Value) -> bool>,
1831 );
1832
1833 let mut stage = ValueConstraints::new(constraints);
1834
1835 let valid_event = Event {
1837 data: serde_json::json!({"count": 10}),
1838 metadata: None,
1839 };
1840 let result = stage.execute(valid_event).unwrap();
1841 assert!(result.is_some());
1842
1843 let invalid_event = Event {
1845 data: serde_json::json!({"count": -5}),
1846 metadata: None,
1847 };
1848 let result = stage.execute(invalid_event);
1849 assert!(result.is_err());
1850 assert_eq!(result.unwrap_err().category(), "Validation");
1851 }
1852
1853 #[test]
1854 fn test_input_source_directory_error() {
1855 let mut pipeline = Pipeline::new();
1856 pipeline.add_stage(Box::new(StdoutOutput::new()));
1857 let mut input = InputSource::Directory(PathBuf::from("/nonexistent/directory"));
1858 let mut deadletter: Option<&mut dyn Stage> = None;
1859 let result = input.process_input(&mut pipeline, &mut deadletter);
1860 assert!(result.is_err());
1861 assert_eq!(result.unwrap_err().category(), "System");
1862 }
1863
1864 #[test]
1865 fn test_pipeline_metrics_json_export() {
1866 let mut pipeline = Pipeline::new();
1867 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
1868 let event = Event {
1869 data: serde_json::json!({"level": "info", "message": "test"}),
1870 metadata: None,
1871 };
1872 pipeline.process_event(event).unwrap();
1873 let json_logs = pipeline.export_json_logs();
1874 assert!(!json_logs.is_empty());
1875 let first_log: serde_json::Value = serde_json::from_str(&json_logs[0]).unwrap();
1877 assert_eq!(first_log["metric"], "events_processed");
1878 assert!(first_log["value"].is_number());
1879 }
1880
1881 #[test]
1882 fn test_derived_fields_stage() {
1883 use std::collections::HashMap;
1884 let mut derivations = HashMap::new();
1885 derivations.insert(
1886 "derived_field".to_string(),
1887 Box::new(|event: &Event| event.get_string("base_field").unwrap_or("default").into())
1888 as Box<dyn Fn(&Event) -> serde_json::Value>,
1889 );
1890
1891 let mut stage = DerivedFields::new(derivations);
1892 let event = Event {
1893 data: serde_json::json!({"base_field": "test_value"}),
1894 metadata: None,
1895 };
1896 let result = stage.execute(event).unwrap().unwrap();
1897 assert_eq!(result.get_string("derived_field"), Some("test_value"));
1898 assert_eq!(result.get_string("base_field"), Some("test_value"));
1899 }
1900
1901 #[test]
1902 fn test_pipeline_error_display() {
1903 let parse_err = PipelineError::Parse(ParseError {
1904 stage: "test".to_string(),
1905 code: ParseErrorCode::Test,
1906 message: "test message".to_string(),
1907 });
1908 let display = format!("{}", parse_err);
1909 assert!(display.contains("Parse error: test message"));
1910
1911 let transform_err = PipelineError::Transform(TransformError {
1912 stage: "test".to_string(),
1913 code: TransformErrorCode::Test,
1914 message: "transform message".to_string(),
1915 });
1916 let display = format!("{}", transform_err);
1917 assert!(display.contains("Transform error: transform message"));
1918 }
1919
1920 #[test]
1921 fn test_input_source_directory() {
1922 use std::fs;
1923 use std::io::Write;
1924 let temp_dir = "test_dir";
1925 fs::create_dir(temp_dir).unwrap();
1926 let temp_file = format!("{}/test.ndjson", temp_dir);
1927 let mut file = fs::File::create(&temp_file).unwrap();
1928 writeln!(file, r#"{{"level": "info"}}"#).unwrap();
1929 drop(file);
1930
1931 let mut pipeline = Pipeline::new();
1932 pipeline.add_stage(Box::new(StdoutOutput::new()));
1933 let mut input = InputSource::Directory(temp_dir.into());
1934 let mut deadletter: Option<&mut dyn Stage> = None;
1935 let result = input.process_input(&mut pipeline, &mut deadletter);
1936 assert!(result.is_ok());
1937
1938 fs::remove_file(&temp_file).unwrap();
1940 fs::remove_dir(temp_dir).unwrap();
1941 }
1942
1943 #[test]
1944 fn test_event_get_number() {
1945 let event = Event {
1946 data: serde_json::json!({"count": 42, "rate": 3.15, "text": "not_a_number"}),
1947 metadata: None,
1948 };
1949
1950 assert_eq!(event.get_number("count"), Some(42.0));
1951 assert_eq!(event.get_number("rate"), Some(3.15));
1952 assert_eq!(event.get_number("text"), None);
1953 assert_eq!(event.get_number("missing"), None);
1954 }
1955
1956 #[test]
1957 fn test_ndjson_parser() {
1958 let parser = NDJSONParser;
1959 let valid_json = r#"{"level": "info", "message": "test"}"#;
1960 let result = parser.parse(valid_json.as_bytes()).unwrap();
1961 assert_eq!(result.get_string("level"), Some("info"));
1962 assert_eq!(result.get_string("message"), Some("test"));
1963
1964 let invalid_utf8 = &[0xFF, 0xFF, 0xFF, 0xFF];
1965 let result = parser.parse(invalid_utf8);
1966 assert!(result.is_err());
1967 assert_eq!(result.unwrap_err().category(), "Parse");
1968
1969 let invalid_json = "not valid json";
1970 let result = parser.parse(invalid_json.as_bytes());
1971 assert!(result.is_err());
1972 assert_eq!(result.unwrap_err().category(), "Parse");
1973 }
1974
1975 #[test]
1976 fn test_json_array_parser() {
1977 let parser = JSONArrayParser;
1978 let valid_array = r#"[{"item": 1}, {"item": 2}]"#;
1979 let result = parser.parse(valid_array.as_bytes()).unwrap();
1980 assert!(result.data.is_array());
1981
1982 let invalid_utf8 = &[0xFF, 0xFF, 0xFF, 0xFF];
1983 let result = parser.parse(invalid_utf8);
1984 assert!(result.is_err());
1985 assert_eq!(result.unwrap_err().category(), "Parse");
1986
1987 let invalid_json = "not valid json";
1988 let result = parser.parse(invalid_json.as_bytes());
1989 assert!(result.is_err());
1990 assert_eq!(result.unwrap_err().category(), "Parse");
1991 }
1992
1993 #[test]
1994 fn test_syslog_parser() {
1995 let parser = SyslogParser;
1996 let syslog_message = "<13>2024-01-01T10:00:00Z myhost test message";
1997 let result = parser.parse(syslog_message.as_bytes()).unwrap();
1998 assert_eq!(result.get_string("message"), Some(syslog_message));
1999
2000 let invalid_utf8 = &[0xFF, 0xFF, 0xFF, 0xFF];
2001 let result = parser.parse(invalid_utf8);
2002 assert!(result.is_err());
2003 assert_eq!(result.unwrap_err().category(), "Parse");
2004 }
2005
2006 #[test]
2007 fn test_field_remap_stage() {
2008 use std::collections::HashMap;
2009 let mut mappings = HashMap::new();
2010 mappings.insert("old_field".to_string(), "new_field".to_string());
2011 mappings.insert("another_old".to_string(), "another_new".to_string());
2012
2013 let mut stage = FieldRemap::new(mappings);
2014 let event = Event {
2015 data: serde_json::json!({"old_field": "value1", "another_old": "value2", "keep": "value3"}),
2016 metadata: None,
2017 };
2018
2019 let result = stage.execute(event).unwrap().unwrap();
2020 assert_eq!(result.get_string("new_field"), Some("value1"));
2021 assert_eq!(result.get_string("another_new"), Some("value2"));
2022 assert_eq!(result.get_string("keep"), Some("value3"));
2023 assert_eq!(result.get_string("old_field"), None);
2024 assert_eq!(result.get_string("another_old"), None);
2025 assert_eq!(stage.name(), "FieldRemap");
2026
2027 let non_object_event = Event {
2028 data: serde_json::json!("just a string"),
2029 metadata: None,
2030 };
2031 let result = stage.execute(non_object_event).unwrap().unwrap();
2032 assert_eq!(result.data, serde_json::json!("just a string"));
2033 }
2034
2035 #[test]
2036 fn test_config_from_yaml() {
2037 let valid_yaml = "version: 1";
2038 let config = Config::from_yaml(valid_yaml).unwrap();
2039 assert_eq!(config.version, 1);
2040
2041 let invalid_version = "version: 2";
2042 let result = Config::from_yaml(invalid_version);
2043 assert!(result.is_err());
2044 assert!(result
2045 .unwrap_err()
2046 .to_string()
2047 .contains("Unsupported version"));
2048
2049 let invalid_yaml = "invalid: yaml: structure: [";
2050 let result = Config::from_yaml(invalid_yaml);
2051 assert!(result.is_err());
2052
2053 let unknown_fields = "version: 1\nunknown_field: value";
2054 let result = Config::from_yaml(unknown_fields);
2055 assert!(result.is_err());
2056 }
2057
2058 #[test]
2059 fn test_plugin_registry() {
2060 let mut registry = PluginRegistry::new();
2061 registry.register(
2062 "test_stage".to_string(),
2063 Box::new(|| Box::new(StdoutOutput::new())),
2064 );
2065
2066 let stage = registry.get_stage("test_stage");
2067 assert!(stage.is_some());
2068 assert_eq!(stage.unwrap().name(), "StdoutOutput");
2069
2070 let missing = registry.get_stage("missing_stage");
2071 assert!(missing.is_none());
2072 }
2073
2074 #[test]
2075 fn test_drop_reason_display() {
2076 assert_eq!(format!("{}", DropReason::Filtered), "filtered");
2077 }
2078
2079 #[test]
2080 fn test_pipeline_error_std_error_trait() {
2081 let error = PipelineError::Parse(ParseError {
2082 stage: "test".to_string(),
2083 code: ParseErrorCode::Test,
2084 message: "test error".to_string(),
2085 });
2086
2087 let _: &dyn std::error::Error = &error;
2088 }
2089
2090 #[test]
2091 fn test_input_source_file_with_deadletter() {
2092 use std::fs;
2093 use std::io::Write;
2094 let temp_file = "test_input_deadletter.ndjson";
2095 let deadletter_file = "test_deadletter_file.ndjson";
2096 let mut file = fs::File::create(temp_file).unwrap();
2097 writeln!(file, "invalid json line").unwrap();
2098 writeln!(file, r#"{{"level": "info"}}"#).unwrap();
2099 drop(file);
2100
2101 let mut pipeline = Pipeline::new();
2102 pipeline.add_stage(Box::new(RequiredFields::new(vec![
2103 "missing_field".to_string()
2104 ])));
2105
2106 let mut deadletter_stage = Deadletter::new(deadletter_file.into());
2107 let mut deadletter: Option<&mut dyn Stage> = Some(&mut deadletter_stage);
2108
2109 let mut input = InputSource::File(temp_file.into());
2110 let result = input.process_input(&mut pipeline, &mut deadletter);
2111 assert!(result.is_ok());
2112
2113 let deadletter_content = fs::read_to_string(deadletter_file).unwrap();
2114 assert!(deadletter_content.contains("parse"));
2115 assert!(deadletter_content.contains("pipeline"));
2116
2117 fs::remove_file(temp_file).unwrap();
2119 fs::remove_file(deadletter_file).unwrap();
2120 }
2121
2122 #[test]
2123 fn test_complete_pipeline_with_transforms_and_validators() {
2124 let mut pipeline = Pipeline::new();
2125
2126 let mut field_mappings = std::collections::HashMap::new();
2127 field_mappings.insert("msg".to_string(), "message".to_string());
2128 pipeline.add_stage(Box::new(FieldRemap::new(field_mappings)));
2129
2130 pipeline.add_stage(Box::new(FieldSelect::new(vec![
2131 "level".to_string(),
2132 "message".to_string(),
2133 ])));
2134
2135 pipeline.add_stage(Box::new(RequiredFields::new(vec![
2136 "level".to_string(),
2137 "message".to_string(),
2138 ])));
2139
2140 let mut type_checks = std::collections::HashMap::new();
2141 type_checks.insert("level".to_string(), "string".to_string());
2142 pipeline.add_stage(Box::new(TypeChecking::new(type_checks)));
2143
2144 pipeline.add_stage(Box::new(StdoutOutput::new()));
2145
2146 let event = Event {
2147 data: serde_json::json!({"level": "info", "msg": "test message", "extra": "ignored"}),
2148 metadata: None,
2149 };
2150
2151 let result = pipeline.process_event(event).unwrap();
2152 assert!(result.is_none());
2153
2154 let prometheus = pipeline.export_prometheus();
2155 assert!(prometheus.contains("feedme_events_processed_total 1"));
2156 assert!(prometheus.contains("feedme_stage_latency_ms"));
2157 }
2158
2159 #[test]
2160 fn test_metrics_export_formats() {
2161 let mut metrics = Metrics::new();
2162 metrics.increment_processed();
2163 metrics.increment_dropped(DropReason::Filtered);
2164 metrics.increment_errors();
2165 metrics.record_latency("test_stage", 100.5);
2166
2167 let prometheus = metrics.to_prometheus();
2168 assert!(prometheus.contains("feedme_events_processed_total 1"));
2169 assert!(prometheus.contains("feedme_events_dropped_total 1"));
2170 assert!(prometheus.contains("feedme_errors_total 1"));
2171 assert!(prometheus.contains("feedme_stage_latency_ms_sum{stage=\"test_stage\"} 100.5"));
2172 assert!(prometheus.contains("feedme_drop_reasons_total{reason=\"filtered\"} 1"));
2173
2174 let json_logs = metrics.to_json_logs();
2175 assert!(json_logs.len() >= 4);
2176
2177 let processed_log = json_logs
2178 .iter()
2179 .find(|log| log.contains("events_processed"))
2180 .unwrap();
2181 let log_json: serde_json::Value = serde_json::from_str(processed_log).unwrap();
2182 assert_eq!(log_json["metric"], "events_processed");
2183 assert_eq!(log_json["value"], 1);
2184 }
2185
2186 #[test]
2187 fn test_input_source_directory_with_deadletter() {
2188 use std::fs;
2189 use tempfile::TempDir;
2190
2191 let temp_dir = TempDir::new().unwrap();
2192 let file1 = temp_dir.path().join("file1.ndjson");
2193 let file2 = temp_dir.path().join("file2.ndjson");
2194
2195 fs::write(&file1, "invalid json\n").unwrap();
2196 fs::write(&file2, r#"{"level": "info"}"#).unwrap();
2197
2198 let deadletter_file = temp_dir.path().join("deadletter.ndjson");
2199 let mut deadletter_stage = Deadletter::new(deadletter_file.clone());
2200 let mut deadletter: Option<&mut dyn Stage> = Some(&mut deadletter_stage);
2201
2202 let mut pipeline = Pipeline::new();
2203 pipeline.add_stage(Box::new(StdoutOutput::new()));
2204
2205 let mut input = InputSource::Directory(temp_dir.path().to_path_buf());
2206 let result = input.process_input(&mut pipeline, &mut deadletter);
2207 assert!(result.is_ok());
2208
2209 let deadletter_content = fs::read_to_string(&deadletter_file).unwrap();
2210 assert!(deadletter_content.contains("parse"));
2211 assert!(deadletter_content.contains("PARSE_ERROR"));
2212 }
2213
2214 #[test]
2215 fn test_stage_is_output_default() {
2216 let stage = FieldSelect::new(vec!["test".to_string()]);
2217 assert!(!stage.is_output());
2218
2219 let output_stage = StdoutOutput::new();
2220 assert!(output_stage.is_output());
2221 }
2222
2223 #[test]
2224 fn test_latency_stats_edge_cases() {
2225 let mut stats = LatencyStats::new();
2226 assert_eq!(stats.count, 0);
2227 assert_eq!(stats.sum, 0.0);
2228 assert_eq!(stats.min, f64::INFINITY);
2229 assert_eq!(stats.max, f64::NEG_INFINITY);
2230
2231 stats.record(5.0);
2232 assert_eq!(stats.count, 1);
2233 assert_eq!(stats.min, 5.0);
2234 assert_eq!(stats.max, 5.0);
2235
2236 stats.record(3.0);
2237 assert_eq!(stats.min, 3.0);
2238 assert_eq!(stats.max, 5.0);
2239
2240 stats.record(7.0);
2241 assert_eq!(stats.min, 3.0);
2242 assert_eq!(stats.max, 7.0);
2243 }
2244
2245 #[test]
2246 fn test_pipeline_stage_filtering() {
2247 let mut pipeline = Pipeline::new();
2248 pipeline.add_stage(Box::new(Filter::new(Box::new(|e| {
2249 e.get_string("level") == Some("error")
2250 }))));
2251 pipeline.add_stage(Box::new(StdoutOutput::new()));
2252
2253 let info_event = Event {
2254 data: serde_json::json!({"level": "info", "message": "test"}),
2255 metadata: None,
2256 };
2257 let result = pipeline.process_event(info_event).unwrap();
2258 assert!(result.is_none());
2259
2260 let error_event = Event {
2261 data: serde_json::json!({"level": "error", "message": "test"}),
2262 metadata: None,
2263 };
2264 let result = pipeline.process_event(error_event).unwrap();
2265 assert!(result.is_none());
2266
2267 let prometheus = pipeline.export_prometheus();
2268 assert!(prometheus.contains("feedme_events_processed_total 2"));
2269 assert!(prometheus.contains("feedme_events_dropped_total 1"));
2270 }
2271
2272 #[test]
2273 fn test_all_pipeline_error_display_variants() {
2274 let validation_err = PipelineError::Validation(ValidationError {
2275 stage: "test".to_string(),
2276 code: ValidationErrorCode::Test,
2277 message: "validation test".to_string(),
2278 });
2279 assert_eq!(
2280 format!("{}", validation_err),
2281 "Validation error: validation test"
2282 );
2283
2284 let output_err = PipelineError::Output(OutputError {
2285 stage: "test".to_string(),
2286 code: OutputErrorCode::Test,
2287 message: "output test".to_string(),
2288 });
2289 assert_eq!(format!("{}", output_err), "Output error: output test");
2290
2291 let system_err = PipelineError::System(SystemError {
2292 stage: "test".to_string(),
2293 code: SystemErrorCode::Test,
2294 message: "system test".to_string(),
2295 });
2296 assert_eq!(format!("{}", system_err), "System error: system test");
2297 }
2298
2299 #[test]
2300 fn test_input_source_stdin_processing() {
2301 let input_stdin = InputSource::Stdin;
2302 match input_stdin {
2303 InputSource::Stdin => {
2304 }
2306 _ => panic!("Should be Stdin variant"),
2307 }
2308 }
2309
2310 #[test]
2311 fn test_file_output_io_errors() {
2312 use std::path::PathBuf;
2313 let invalid_path = PathBuf::from("/invalid/path/that/should/not/exist/output.json");
2314 let mut stage = FileOutput::new(invalid_path);
2315
2316 let event = Event {
2317 data: serde_json::json!({"test": "data"}),
2318 metadata: None,
2319 };
2320
2321 let result = stage.execute(event);
2322 assert!(result.is_err());
2323 assert_eq!(result.unwrap_err().category(), "Output");
2324 }
2325
2326 #[test]
2327 fn test_deadletter_io_errors() {
2328 use std::path::PathBuf;
2329 let invalid_path = PathBuf::from("/invalid/path/that/should/not/exist/deadletter.json");
2330 let mut stage = Deadletter::new(invalid_path);
2331
2332 let event = Event {
2333 data: serde_json::json!({"error": "test"}),
2334 metadata: None,
2335 };
2336
2337 let result = stage.execute(event);
2338 assert!(result.is_err());
2339 assert_eq!(result.unwrap_err().category(), "Output");
2340 }
2341
2342 #[test]
2343 fn test_metrics_empty_latency_stats() {
2344 let metrics = Metrics::new();
2345 let prometheus = metrics.to_prometheus();
2346
2347 assert!(prometheus.contains("feedme_events_processed_total 0"));
2348 assert!(prometheus.contains("feedme_events_dropped_total 0"));
2349 assert!(prometheus.contains("feedme_errors_total 0"));
2350
2351 let json_logs = metrics.to_json_logs();
2352 assert!(json_logs.len() >= 3);
2353 }
2354
2355 #[test]
2356 fn test_stage_names() {
2357 assert_eq!(FieldSelect::new(vec![]).name(), "FieldSelect");
2358 assert_eq!(
2359 FieldRemap::new(std::collections::HashMap::new()).name(),
2360 "FieldRemap"
2361 );
2362 assert_eq!(PIIRedaction::new(vec![]).name(), "PIIRedaction");
2363 assert_eq!(
2364 DerivedFields::new(std::collections::HashMap::new()).name(),
2365 "DerivedFields"
2366 );
2367 assert_eq!(Filter::new(Box::new(|_| true)).name(), "Filter");
2368 assert_eq!(RequiredFields::new(vec![]).name(), "RequiredFields");
2369 assert_eq!(
2370 TypeChecking::new(std::collections::HashMap::new()).name(),
2371 "TypeChecking"
2372 );
2373 assert_eq!(
2374 ValueConstraints::new(std::collections::HashMap::new()).name(),
2375 "ValueConstraints"
2376 );
2377 assert_eq!(StdoutOutput::new().name(), "StdoutOutput");
2378 assert_eq!(FileOutput::new("/tmp/test".into()).name(), "FileOutput");
2379 assert_eq!(Deadletter::new("/tmp/test".into()).name(), "Deadletter");
2380 }
2381
2382 #[test]
2383 fn test_stages_is_output() {
2384 assert!(!FieldSelect::new(vec![]).is_output());
2385 assert!(!FieldRemap::new(std::collections::HashMap::new()).is_output());
2386 assert!(!PIIRedaction::new(vec![]).is_output());
2387 assert!(!DerivedFields::new(std::collections::HashMap::new()).is_output());
2388 assert!(!Filter::new(Box::new(|_| true)).is_output());
2389 assert!(!RequiredFields::new(vec![]).is_output());
2390 assert!(!TypeChecking::new(std::collections::HashMap::new()).is_output());
2391 assert!(!ValueConstraints::new(std::collections::HashMap::new()).is_output());
2392 assert!(StdoutOutput::new().is_output());
2393 assert!(FileOutput::new("/tmp/test".into()).is_output());
2394 assert!(Deadletter::new("/tmp/test".into()).is_output());
2395 }
2396
2397 #[test]
2398 fn test_stdout_output_serialization_error() {
2399 let mut stage = StdoutOutput::new();
2400
2401 let event_with_infinite = Event {
2402 data: serde_json::json!({"value": f64::INFINITY}),
2403 metadata: None,
2404 };
2405
2406 let result = stage.execute(event_with_infinite);
2407 match result {
2408 Ok(_) => {
2409 }
2411 Err(e) => {
2412 assert_eq!(e.category(), "Output");
2413 assert_eq!(e.code(), "SERIALIZE_ERROR");
2414 }
2415 }
2416 }
2417
2418 #[test]
2419 fn test_event_with_metadata() {
2420 use std::collections::BTreeMap;
2421 let mut metadata = BTreeMap::new();
2422 metadata.insert("source".to_string(), serde_json::json!("test"));
2423 metadata.insert("timestamp".to_string(), serde_json::json!(1234567890));
2424
2425 let event = Event {
2426 data: serde_json::json!({"message": "test"}),
2427 metadata: Some(metadata),
2428 };
2429
2430 assert_eq!(event.get_string("message"), Some("test"));
2431 assert!(event.metadata.is_some());
2432 assert_eq!(
2433 event.metadata.as_ref().unwrap().get("source").unwrap(),
2434 &serde_json::json!("test")
2435 );
2436 }
2437
2438 #[test]
2439 fn test_complete_error_handling_pipeline() {
2440 use std::fs;
2441 use std::io::Write;
2442 use tempfile::TempDir;
2443
2444 let temp_dir = TempDir::new().unwrap();
2445 let input_file = temp_dir.path().join("input.ndjson");
2446 let deadletter_file = temp_dir.path().join("deadletter.ndjson");
2447
2448 let mut file = fs::File::create(&input_file).unwrap();
2449 writeln!(file, "invalid json line").unwrap();
2450 writeln!(file, r#"{{"level": "info", "message": "valid"}}"#).unwrap();
2451 writeln!(file, r#"{{"level": "warn"}}"#).unwrap();
2452 drop(file);
2453
2454 let mut pipeline = Pipeline::new();
2455 pipeline.add_stage(Box::new(RequiredFields::new(vec![
2456 "level".to_string(),
2457 "message".to_string(),
2458 ])));
2459 pipeline.add_stage(Box::new(StdoutOutput::new()));
2460
2461 let mut deadletter_stage = Deadletter::new(deadletter_file.clone());
2462 let mut deadletter: Option<&mut dyn Stage> = Some(&mut deadletter_stage);
2463
2464 let mut input = InputSource::File(input_file);
2465 let result = input.process_input(&mut pipeline, &mut deadletter);
2466 assert!(result.is_ok());
2467
2468 let deadletter_content = fs::read_to_string(&deadletter_file).unwrap();
2469 assert!(deadletter_content.contains("PARSE_ERROR"));
2470 assert!(deadletter_content.contains("MISSING_FIELD"));
2471
2472 let lines: Vec<&str> = deadletter_content.lines().collect();
2473 assert_eq!(lines.len(), 2);
2474 }
2475
2476 #[test]
2477 fn test_type_checking_all_types() {
2478 use std::collections::HashMap;
2479 let mut type_checks = HashMap::new();
2480 type_checks.insert("str_field".to_string(), "string".to_string());
2481 type_checks.insert("num_field".to_string(), "number".to_string());
2482 type_checks.insert("bool_field".to_string(), "boolean".to_string());
2483 type_checks.insert("obj_field".to_string(), "object".to_string());
2484 type_checks.insert("arr_field".to_string(), "array".to_string());
2485 type_checks.insert("null_field".to_string(), "null".to_string());
2486
2487 let mut stage = TypeChecking::new(type_checks);
2488
2489 let valid_event = Event {
2490 data: serde_json::json!({
2491 "str_field": "hello",
2492 "num_field": 42,
2493 "bool_field": true,
2494 "obj_field": {"nested": "value"},
2495 "arr_field": [1, 2, 3],
2496 "null_field": null
2497 }),
2498 metadata: None,
2499 };
2500
2501 let result = stage.execute(valid_event).unwrap();
2502 assert!(result.is_some());
2503
2504 let invalid_event = Event {
2505 data: serde_json::json!({
2506 "str_field": 42,
2507 "num_field": "not_a_number"
2508 }),
2509 metadata: None,
2510 };
2511
2512 let result = stage.execute(invalid_event);
2513 assert!(result.is_err());
2514 assert_eq!(result.unwrap_err().category(), "Validation");
2515 }
2516
2517 #[test]
2518 fn test_input_file_io_error() {
2519 use std::path::PathBuf;
2520 let nonexistent_file = PathBuf::from("/nonexistent/path/file.json");
2521 let mut input = InputSource::File(nonexistent_file);
2522
2523 let mut pipeline = Pipeline::new();
2524 pipeline.add_stage(Box::new(StdoutOutput::new()));
2525 let mut deadletter: Option<&mut dyn Stage> = None;
2526
2527 let result = input.process_input(&mut pipeline, &mut deadletter);
2528 assert!(result.is_err());
2529 let err = result.unwrap_err();
2530 assert_eq!(err.category(), "System");
2531 assert_eq!(err.code(), "IO_ERROR");
2532 }
2533
2534 #[test]
2535 fn test_stdin_input_mock() {
2536 let input = InputSource::Stdin;
2537 match input {
2538 InputSource::Stdin => {
2539 }
2541 _ => panic!("Should be Stdin"),
2542 }
2543 }
2544
2545 #[test]
2546 fn test_file_output_serialization_error() {
2547 use std::fs;
2548 use std::path::PathBuf;
2549
2550 let temp_file = "test_serialize_error.json";
2551 let mut stage = FileOutput::new(PathBuf::from(temp_file));
2552
2553 let problematic_event = Event {
2555 data: serde_json::json!({"data": "normal"}),
2556 metadata: None,
2557 };
2558
2559 let result = stage.execute(problematic_event);
2561 assert!(result.is_ok());
2562
2563 if std::path::Path::new(temp_file).exists() {
2565 fs::remove_file(temp_file).unwrap();
2566 }
2567 }
2568
2569 #[test]
2570 fn test_deadletter_serialization_error() {
2571 use std::fs;
2572 use std::path::PathBuf;
2573
2574 let temp_file = "test_deadletter_serialize.json";
2575 let mut stage = Deadletter::new(PathBuf::from(temp_file));
2576
2577 let event = Event {
2578 data: serde_json::json!({"error": "test"}),
2579 metadata: None,
2580 };
2581
2582 let result = stage.execute(event);
2583 assert!(result.is_ok());
2584
2585 if std::path::Path::new(temp_file).exists() {
2587 fs::remove_file(temp_file).unwrap();
2588 }
2589 }
2590
2591 #[test]
2592 fn test_directory_io_error_during_iteration() {
2593 use std::path::PathBuf;
2594 let mut input = InputSource::Directory(PathBuf::from("/nonexistent/directory"));
2595
2596 let mut pipeline = Pipeline::new();
2597 pipeline.add_stage(Box::new(StdoutOutput::new()));
2598 let mut deadletter: Option<&mut dyn Stage> = None;
2599
2600 let result = input.process_input(&mut pipeline, &mut deadletter);
2601 assert!(result.is_err());
2602 assert_eq!(result.unwrap_err().category(), "System");
2603 }
2604
2605 #[test]
2606 fn test_pipeline_with_error_in_middle_stage() {
2607 let mut pipeline = Pipeline::new();
2608 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
2609 pipeline.add_stage(Box::new(RequiredFields::new(vec![
2610 "missing_field".to_string()
2611 ])));
2612 pipeline.add_stage(Box::new(StdoutOutput::new()));
2613
2614 let event = Event {
2615 data: serde_json::json!({"level": "info", "message": "test"}),
2616 metadata: None,
2617 };
2618
2619 let result = pipeline.process_event(event);
2620 assert!(result.is_err());
2621 assert_eq!(result.unwrap_err().category(), "Validation");
2622
2623 let prometheus = pipeline.export_prometheus();
2624 assert!(prometheus.contains("feedme_events_processed_total 1"));
2625 assert!(prometheus.contains("feedme_errors_total 1"));
2626 }
2627
2628 #[test]
2629 fn test_pii_redaction_non_object() {
2630 let mut stage = PIIRedaction::new(vec![regex::Regex::new(r"\d{3}-\d{2}-\d{4}").unwrap()]);
2631
2632 let non_object_event = Event {
2633 data: serde_json::json!("just a string with SSN 123-45-6789"),
2634 metadata: None,
2635 };
2636
2637 let result = stage.execute(non_object_event).unwrap().unwrap();
2638 assert_eq!(
2639 result.data,
2640 serde_json::json!("just a string with SSN 123-45-6789")
2641 );
2642 }
2643
2644 #[test]
2645 fn test_derived_fields_non_object() {
2646 use std::collections::HashMap;
2647 let mut derivations = HashMap::new();
2648 derivations.insert(
2649 "new_field".to_string(),
2650 Box::new(|_: &Event| serde_json::json!("derived"))
2651 as Box<dyn Fn(&Event) -> serde_json::Value>,
2652 );
2653
2654 let mut stage = DerivedFields::new(derivations);
2655
2656 let non_object_event = Event {
2657 data: serde_json::json!("just a string"),
2658 metadata: None,
2659 };
2660
2661 let result = stage.execute(non_object_event).unwrap().unwrap();
2662 assert_eq!(result.data, serde_json::json!("just a string"));
2663 }
2664
2665 #[test]
2666 fn test_required_fields_non_object() {
2667 let mut stage = RequiredFields::new(vec!["level".to_string()]);
2668
2669 let non_object_event = Event {
2670 data: serde_json::json!("just a string"),
2671 metadata: None,
2672 };
2673
2674 let result = stage.execute(non_object_event).unwrap();
2675 assert!(result.is_some());
2676 }
2677
2678 #[test]
2679 fn test_type_checking_non_object() {
2680 use std::collections::HashMap;
2681 let mut type_checks = HashMap::new();
2682 type_checks.insert("field".to_string(), "string".to_string());
2683
2684 let mut stage = TypeChecking::new(type_checks);
2685
2686 let non_object_event = Event {
2687 data: serde_json::json!("just a string"),
2688 metadata: None,
2689 };
2690
2691 let result = stage.execute(non_object_event).unwrap();
2692 assert!(result.is_some());
2693 }
2694
2695 #[test]
2696 fn test_value_constraints_non_object() {
2697 use std::collections::HashMap;
2698 let mut constraints = HashMap::new();
2699 constraints.insert(
2700 "field".to_string(),
2701 Box::new(|_: &serde_json::Value| true) as Box<dyn Fn(&serde_json::Value) -> bool>,
2702 );
2703
2704 let mut stage = ValueConstraints::new(constraints);
2705
2706 let non_object_event = Event {
2707 data: serde_json::json!("just a string"),
2708 metadata: None,
2709 };
2710
2711 let result = stage.execute(non_object_event).unwrap();
2712 assert!(result.is_some());
2713 }
2714
2715 #[test]
2716 fn test_metrics_record_latency_new_stage() {
2717 let mut metrics = Metrics::new();
2718
2719 metrics.record_latency("stage1", 10.0);
2720 metrics.record_latency("stage2", 20.0);
2721 metrics.record_latency("stage1", 15.0);
2722
2723 let prometheus = metrics.to_prometheus();
2724 assert!(prometheus.contains("stage1"));
2725 assert!(prometheus.contains("stage2"));
2726 assert!(prometheus.contains("25"));
2727 assert!(prometheus.contains("20"));
2728 }
2729
2730 #[test]
2731 fn test_input_source_with_io_error_during_read() {
2732 use std::fs;
2733 use std::io::Write;
2734
2735 let temp_file = "test_io_error.ndjson";
2736 let mut file = fs::File::create(temp_file).unwrap();
2737 writeln!(file, r#"{{"level": "info"}}"#).unwrap();
2738 drop(file);
2739
2740 let mut pipeline = Pipeline::new();
2741 pipeline.add_stage(Box::new(StdoutOutput::new()));
2742
2743 let mut input = InputSource::File(temp_file.into());
2744 let mut deadletter: Option<&mut dyn Stage> = None;
2745
2746 let result = input.process_input(&mut pipeline, &mut deadletter);
2747 assert!(result.is_ok());
2748
2749 fs::remove_file(temp_file).unwrap();
2750 }
2751
2752 #[test]
2753 fn test_pipeline_stage_latency_measurement() {
2754 let mut pipeline = Pipeline::new();
2755 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string()])));
2756
2757 let event = Event {
2758 data: serde_json::json!({"level": "info", "extra": "removed"}),
2759 metadata: None,
2760 };
2761
2762 pipeline.process_event(event).unwrap();
2763
2764 let prometheus = pipeline.export_prometheus();
2765 assert!(prometheus.contains("feedme_stage_latency_ms"));
2766 assert!(prometheus.contains("FieldSelect"));
2767 }
2768
2769 #[test]
2770 fn test_complete_input_output_cycle() {
2771 use std::fs;
2772 use std::io::Write;
2773 use tempfile::TempDir;
2774
2775 let temp_dir = TempDir::new().unwrap();
2776 let input_file = temp_dir.path().join("input.ndjson");
2777 let output_file = temp_dir.path().join("output.ndjson");
2778
2779 let mut file = fs::File::create(&input_file).unwrap();
2780 writeln!(file, r#"{{"level": "info", "message": "test"}}"#).unwrap();
2781 writeln!(file, r#"{{"level": "error", "message": "error"}}"#).unwrap();
2782 drop(file);
2783
2784 let mut pipeline = Pipeline::new();
2785 pipeline.add_stage(Box::new(Filter::new(Box::new(|e| {
2786 e.get_string("level") == Some("info")
2787 }))));
2788 pipeline.add_stage(Box::new(FileOutput::new(output_file.clone())));
2789
2790 let mut input = InputSource::File(input_file);
2791 let mut deadletter: Option<&mut dyn Stage> = None;
2792
2793 let result = input.process_input(&mut pipeline, &mut deadletter);
2794 assert!(result.is_ok());
2795
2796 let output_content = fs::read_to_string(&output_file).unwrap();
2797 assert!(output_content.contains("info"));
2798 assert!(!output_content.contains("error"));
2799
2800 let prometheus = pipeline.export_prometheus();
2801 assert!(prometheus.contains("feedme_events_processed_total 2"));
2802 assert!(prometheus.contains("feedme_events_dropped_total 1"));
2803 }
2804
2805 #[test]
2806 fn test_value_constraints_missing_field() {
2807 use std::collections::HashMap;
2808 let mut constraints = HashMap::new();
2809 constraints.insert(
2810 "missing_field".to_string(),
2811 Box::new(|_: &serde_json::Value| true) as Box<dyn Fn(&serde_json::Value) -> bool>,
2812 );
2813
2814 let mut stage = ValueConstraints::new(constraints);
2815
2816 let event = Event {
2817 data: serde_json::json!({"different_field": "value"}),
2818 metadata: None,
2819 };
2820
2821 let result = stage.execute(event).unwrap();
2822 assert!(result.is_some());
2823 }
2824
2825 #[test]
2826 fn test_type_checking_missing_field() {
2827 use std::collections::HashMap;
2828 let mut type_checks = HashMap::new();
2829 type_checks.insert("missing_field".to_string(), "string".to_string());
2830
2831 let mut stage = TypeChecking::new(type_checks);
2832
2833 let event = Event {
2834 data: serde_json::json!({"different_field": "value"}),
2835 metadata: None,
2836 };
2837
2838 let result = stage.execute(event).unwrap();
2839 assert!(result.is_some());
2840 }
2841
2842 #[test]
2843 fn test_replay_execution() {
2844 use tempfile::NamedTempFile;
2845 use crate::replay::{record_execution, replay_execution};
2846
2847 let mut pipeline = Pipeline::new();
2848 pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string(), "message".to_string()])));
2849 pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".to_string()])));
2850
2851 let events = vec![
2852 Event {
2853 data: serde_json::json!({"level": "info", "message": "test1", "extra": "ignored"}),
2854 metadata: None,
2855 },
2856 Event {
2857 data: serde_json::json!({"level": "error", "message": "test2"}),
2858 metadata: None,
2859 },
2860 ];
2861
2862 let temp_file = NamedTempFile::new().unwrap();
2863 let trace_path = temp_file.path();
2864
2865 record_execution(&mut pipeline, &events, trace_path).unwrap();
2867
2868 let mut replay_pipeline = Pipeline::new();
2870 replay_pipeline.add_stage(Box::new(FieldSelect::new(vec!["level".to_string(), "message".to_string()])));
2871 replay_pipeline.add_stage(Box::new(RequiredFields::new(vec!["level".to_string()])));
2872
2873 replay_execution(&mut replay_pipeline, trace_path).unwrap();
2875 }
2876}