Skip to main content

wrapper_events/
ingest.rs

1use std::io::Read;
2
3use serde_json::Value;
4
5use crate::config::{CaptureRaw, ErrorDetailCapture, IngestConfig};
6use crate::error::{AdapterErrorCode, CapturedRaw, ErrorDetail, LineRecord, LineRecordError};
7use crate::line_parser::{ClassifiedParserError, LineInput, LineParser};
8use crate::reader::{BoundedLine, SyncBoundedLineReader};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RawCaptureBudget {
12    remaining_bytes: Option<usize>,
13}
14
15impl RawCaptureBudget {
16    pub fn new(limit: Option<usize>) -> Self {
17        Self {
18            remaining_bytes: limit,
19        }
20    }
21
22    fn can_spend(&self, bytes: usize) -> bool {
23        match self.remaining_bytes {
24            None => true,
25            Some(rem) => bytes <= rem,
26        }
27    }
28
29    fn spend(&mut self, bytes: usize) {
30        if let Some(rem) = self.remaining_bytes {
31            self.remaining_bytes = Some(rem.saturating_sub(bytes));
32        }
33    }
34}
35
36pub struct LineIngestor<R: Read, P: LineParser> {
37    reader: SyncBoundedLineReader<R>,
38    parser: P,
39    config: IngestConfig,
40    budget: RawCaptureBudget,
41    adapter_name: &'static str,
42}
43
44impl<R: Read, P: LineParser> LineIngestor<R, P> {
45    pub fn new(reader: R, parser: P, config: IngestConfig, adapter_name: &'static str) -> Self {
46        let budget = RawCaptureBudget::new(config.limits.max_raw_bytes_total);
47        Self {
48            reader: SyncBoundedLineReader::new(reader, config.limits.max_line_bytes),
49            parser,
50            config,
51            budget,
52            adapter_name,
53        }
54    }
55
56    pub fn into_parser(self) -> P {
57        self.parser
58    }
59
60    fn record_error<T>(&self, line_number: usize, err: LineRecordError) -> LineRecord<T> {
61        LineRecord {
62            line_number,
63            captured_raw: None,
64            outcome: Err(err),
65        }
66    }
67
68    fn normalize_line(line: &str) -> &str {
69        line.strip_suffix('\r').unwrap_or(line)
70    }
71
72    fn line_is_blank(line: &str) -> bool {
73        line.chars().all(|ch| ch.is_whitespace())
74    }
75
76    fn maybe_capture_line(&mut self, line: &str) -> Option<String> {
77        if !matches!(self.config.capture_raw, CaptureRaw::Line | CaptureRaw::Both) {
78            return None;
79        }
80        let bytes = line.len();
81        if !self.budget.can_spend(bytes) {
82            return None;
83        }
84        self.budget.spend(bytes);
85        Some(line.to_string())
86    }
87
88    fn maybe_capture_json(&mut self, line: &str) -> Option<Value> {
89        if !matches!(self.config.capture_raw, CaptureRaw::Json | CaptureRaw::Both) {
90            return None;
91        }
92        let value: Value = serde_json::from_str(line).ok()?;
93        let bytes = serde_json::to_vec(&value).ok()?.len();
94        if !self.budget.can_spend(bytes) {
95            return None;
96        }
97        self.budget.spend(bytes);
98        Some(value)
99    }
100
101    fn capture_raw(&mut self, line: &str) -> Option<CapturedRaw> {
102        match self.config.capture_raw {
103            CaptureRaw::None => None,
104            CaptureRaw::Line => self.maybe_capture_line(line).map(|line| CapturedRaw {
105                line: Some(line),
106                json: None,
107            }),
108            CaptureRaw::Json => self.maybe_capture_json(line).map(|json| CapturedRaw {
109                line: None,
110                json: Some(json),
111            }),
112            CaptureRaw::Both => {
113                let line_cap = self.maybe_capture_line(line);
114                let json_cap = self.maybe_capture_json(line);
115                if line_cap.is_none() && json_cap.is_none() {
116                    None
117                } else {
118                    Some(CapturedRaw {
119                        line: line_cap,
120                        json: json_cap,
121                    })
122                }
123            }
124        }
125    }
126
127    fn adapter_error_record<T>(
128        &mut self,
129        line_number: usize,
130        captured_raw: Option<CapturedRaw>,
131        code: AdapterErrorCode,
132        summary: String,
133        full_details: String,
134    ) -> LineRecord<T> {
135        if self.config.error_detail_capture == ErrorDetailCapture::FullDetails {
136            if let Some(sink) = self.config.error_sink.as_mut() {
137                sink.on_error(ErrorDetail {
138                    line_number,
139                    code,
140                    adapter: self.adapter_name,
141                    details: full_details,
142                });
143            }
144        }
145        LineRecord {
146            line_number,
147            captured_raw,
148            outcome: Err(LineRecordError::Adapter { code, summary }),
149        }
150    }
151}
152
153impl<R: Read, P: LineParser> Iterator for LineIngestor<R, P> {
154    type Item = LineRecord<P::Event>;
155
156    fn next(&mut self) -> Option<Self::Item> {
157        loop {
158            let next = self.reader.next()?;
159            match next {
160                BoundedLine::IoError { line_number } => {
161                    return Some(self.record_error(line_number, LineRecordError::Io));
162                }
163                BoundedLine::LineTooLong {
164                    line_number,
165                    observed_bytes,
166                    max_line_bytes,
167                } => {
168                    return Some(self.record_error(
169                        line_number,
170                        LineRecordError::LineTooLong {
171                            observed_bytes,
172                            max_line_bytes,
173                        },
174                    ));
175                }
176                BoundedLine::Line { line_number, bytes } => {
177                    let Ok(raw_line) = String::from_utf8(bytes) else {
178                        return Some(self.record_error(line_number, LineRecordError::InvalidUtf8));
179                    };
180                    let line = Self::normalize_line(&raw_line);
181                    if Self::line_is_blank(line) {
182                        continue;
183                    }
184
185                    let captured_raw = self.capture_raw(line);
186                    let json_capture = captured_raw.as_ref().and_then(|raw| raw.json.as_ref());
187                    let input = LineInput { line, json_capture };
188
189                    match self.parser.parse_line(input) {
190                        Ok(None) => continue,
191                        Ok(Some(event)) => {
192                            return Some(LineRecord {
193                                line_number,
194                                captured_raw,
195                                outcome: Ok(event),
196                            });
197                        }
198                        Err(err) => {
199                            return Some(self.adapter_error_record(
200                                line_number,
201                                captured_raw,
202                                err.code(),
203                                err.redacted_summary(),
204                                err.full_details(),
205                            ));
206                        }
207                    }
208                }
209            }
210        }
211    }
212}
213
214#[cfg(feature = "tokio")]
215mod tokio_ingest {
216    use serde_json::Value;
217    use tokio::io::AsyncRead;
218
219    use crate::config::{CaptureRaw, ErrorDetailCapture, IngestConfig};
220    use crate::error::{AdapterErrorCode, CapturedRaw, ErrorDetail, LineRecord, LineRecordError};
221    use crate::line_parser::{ClassifiedParserError, LineInput, LineParser};
222    use crate::reader::{AsyncBoundedLineReader, AsyncBoundedLineResult};
223    use crate::RawCaptureBudget;
224
225    pub struct AsyncLineIngestor<R: AsyncRead + Unpin, P: LineParser> {
226        reader: AsyncBoundedLineReader<R>,
227        parser: P,
228        config: IngestConfig,
229        budget: RawCaptureBudget,
230        adapter_name: &'static str,
231    }
232
233    impl<R: AsyncRead + Unpin, P: LineParser> AsyncLineIngestor<R, P> {
234        pub fn new(reader: R, parser: P, config: IngestConfig, adapter_name: &'static str) -> Self {
235            let budget = RawCaptureBudget::new(config.limits.max_raw_bytes_total);
236            Self {
237                reader: AsyncBoundedLineReader::new(reader, config.limits.max_line_bytes),
238                parser,
239                config,
240                budget,
241                adapter_name,
242            }
243        }
244
245        fn record_error<T>(&self, line_number: usize, err: LineRecordError) -> LineRecord<T> {
246            LineRecord {
247                line_number,
248                captured_raw: None,
249                outcome: Err(err),
250            }
251        }
252
253        fn normalize_line(line: &str) -> &str {
254            line.strip_suffix('\r').unwrap_or(line)
255        }
256
257        fn line_is_blank(line: &str) -> bool {
258            line.chars().all(|ch| ch.is_whitespace())
259        }
260
261        fn maybe_capture_line(&mut self, line: &str) -> Option<String> {
262            if !matches!(self.config.capture_raw, CaptureRaw::Line | CaptureRaw::Both) {
263                return None;
264            }
265            let bytes = line.len();
266            if !self.budget.can_spend(bytes) {
267                return None;
268            }
269            self.budget.spend(bytes);
270            Some(line.to_string())
271        }
272
273        fn maybe_capture_json(&mut self, line: &str) -> Option<Value> {
274            if !matches!(self.config.capture_raw, CaptureRaw::Json | CaptureRaw::Both) {
275                return None;
276            }
277            let value: Value = serde_json::from_str(line).ok()?;
278            let bytes = serde_json::to_vec(&value).ok()?.len();
279            if !self.budget.can_spend(bytes) {
280                return None;
281            }
282            self.budget.spend(bytes);
283            Some(value)
284        }
285
286        fn capture_raw(&mut self, line: &str) -> Option<CapturedRaw> {
287            match self.config.capture_raw {
288                CaptureRaw::None => None,
289                CaptureRaw::Line => self.maybe_capture_line(line).map(|line| CapturedRaw {
290                    line: Some(line),
291                    json: None,
292                }),
293                CaptureRaw::Json => self.maybe_capture_json(line).map(|json| CapturedRaw {
294                    line: None,
295                    json: Some(json),
296                }),
297                CaptureRaw::Both => {
298                    let line_cap = self.maybe_capture_line(line);
299                    let json_cap = self.maybe_capture_json(line);
300                    if line_cap.is_none() && json_cap.is_none() {
301                        None
302                    } else {
303                        Some(CapturedRaw {
304                            line: line_cap,
305                            json: json_cap,
306                        })
307                    }
308                }
309            }
310        }
311
312        fn adapter_error_record<T>(
313            &mut self,
314            line_number: usize,
315            captured_raw: Option<CapturedRaw>,
316            code: AdapterErrorCode,
317            summary: String,
318            full_details: String,
319        ) -> LineRecord<T> {
320            if self.config.error_detail_capture == ErrorDetailCapture::FullDetails {
321                if let Some(sink) = self.config.error_sink.as_mut() {
322                    sink.on_error(ErrorDetail {
323                        line_number,
324                        code,
325                        adapter: self.adapter_name,
326                        details: full_details,
327                    });
328                }
329            }
330            LineRecord {
331                line_number,
332                captured_raw,
333                outcome: Err(LineRecordError::Adapter { code, summary }),
334            }
335        }
336
337        pub async fn next_record(&mut self) -> Option<LineRecord<P::Event>> {
338            loop {
339                let next = self.reader.next_line().await?;
340                match next {
341                    AsyncBoundedLineResult::IoError { line_number } => {
342                        return Some(self.record_error(line_number, LineRecordError::Io));
343                    }
344                    AsyncBoundedLineResult::LineTooLong {
345                        line_number,
346                        observed_bytes,
347                        max_line_bytes,
348                    } => {
349                        return Some(self.record_error(
350                            line_number,
351                            LineRecordError::LineTooLong {
352                                observed_bytes,
353                                max_line_bytes,
354                            },
355                        ));
356                    }
357                    AsyncBoundedLineResult::Line { line_number, bytes } => {
358                        let Ok(raw_line) = String::from_utf8(bytes) else {
359                            return Some(
360                                self.record_error(line_number, LineRecordError::InvalidUtf8),
361                            );
362                        };
363                        let line = Self::normalize_line(&raw_line);
364                        if Self::line_is_blank(line) {
365                            continue;
366                        }
367
368                        let captured_raw = self.capture_raw(line);
369                        let json_capture = captured_raw.as_ref().and_then(|raw| raw.json.as_ref());
370                        let input = LineInput { line, json_capture };
371
372                        match self.parser.parse_line(input) {
373                            Ok(None) => continue,
374                            Ok(Some(event)) => {
375                                return Some(LineRecord {
376                                    line_number,
377                                    captured_raw,
378                                    outcome: Ok(event),
379                                });
380                            }
381                            Err(err) => {
382                                return Some(self.adapter_error_record(
383                                    line_number,
384                                    captured_raw,
385                                    err.code(),
386                                    err.redacted_summary(),
387                                    err.full_details(),
388                                ));
389                            }
390                        }
391                    }
392                }
393            }
394        }
395    }
396
397    #[cfg(test)]
398    mod tests {
399        use super::*;
400
401        #[derive(Default)]
402        struct TestParser;
403
404        #[derive(Default)]
405        struct FailingParser;
406
407        #[derive(Debug, thiserror::Error)]
408        #[error("boom")]
409        struct TestErr;
410
411        impl crate::line_parser::ClassifiedParserError for TestErr {
412            fn code(&self) -> AdapterErrorCode {
413                AdapterErrorCode::Unknown
414            }
415
416            fn redacted_summary(&self) -> String {
417                "boom".to_string()
418            }
419
420            fn full_details(&self) -> String {
421                "boom details".to_string()
422            }
423        }
424
425        impl crate::LineParser for TestParser {
426            type Event = String;
427            type Error = TestErr;
428
429            fn reset(&mut self) {}
430
431            fn parse_line(
432                &mut self,
433                input: crate::LineInput<'_>,
434            ) -> Result<Option<Self::Event>, Self::Error> {
435                Ok(Some(input.line.to_string()))
436            }
437        }
438
439        impl crate::LineParser for FailingParser {
440            type Event = String;
441            type Error = TestErr;
442
443            fn reset(&mut self) {}
444
445            fn parse_line(
446                &mut self,
447                _input: crate::LineInput<'_>,
448            ) -> Result<Option<Self::Event>, Self::Error> {
449                Err(TestErr)
450            }
451        }
452
453        #[tokio::test]
454        async fn budget_skips_capture_deterministically() {
455            let data = b"{\"k\":1}\n";
456            let config = IngestConfig {
457                capture_raw: CaptureRaw::Both,
458                limits: crate::IngestLimits {
459                    max_raw_bytes_total: Some(2),
460                    ..Default::default()
461                },
462                ..Default::default()
463            };
464
465            let mut ingestor =
466                AsyncLineIngestor::new(std::io::Cursor::new(data), TestParser, config, "test");
467
468            let rec = ingestor.next_record().await.unwrap();
469            assert!(rec.captured_raw.is_none());
470            assert!(rec.outcome.is_ok());
471        }
472
473        #[tokio::test]
474        async fn preserves_line_capture_on_adapter_error() {
475            let data = b"hello\n";
476            let config = IngestConfig {
477                capture_raw: CaptureRaw::Line,
478                ..Default::default()
479            };
480
481            let mut ingestor =
482                AsyncLineIngestor::new(std::io::Cursor::new(data), FailingParser, config, "test");
483
484            let rec = ingestor.next_record().await.unwrap();
485            assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
486            assert_eq!(
487                rec.captured_raw.as_ref().and_then(|r| r.line.as_deref()),
488                Some("hello")
489            );
490        }
491
492        #[tokio::test]
493        async fn preserves_json_capture_on_adapter_error() {
494            let data = br#"{"k":1}"#;
495            let mut bytes = Vec::new();
496            bytes.extend_from_slice(data);
497            bytes.extend_from_slice(b"\n");
498
499            let config = IngestConfig {
500                capture_raw: CaptureRaw::Json,
501                ..Default::default()
502            };
503
504            let mut ingestor =
505                AsyncLineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
506
507            let rec = ingestor.next_record().await.unwrap();
508            assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
509            let captured = rec.captured_raw.expect("expected captured_raw");
510            assert_eq!(captured.line, None);
511            assert_eq!(
512                captured
513                    .json
514                    .as_ref()
515                    .and_then(|v| v.get("k"))
516                    .and_then(|v| v.as_i64()),
517                Some(1)
518            );
519        }
520
521        #[tokio::test]
522        async fn preserves_both_capture_on_adapter_error() {
523            let data = br#"{"k":1}"#;
524            let mut bytes = Vec::new();
525            bytes.extend_from_slice(data);
526            bytes.extend_from_slice(b"\n");
527
528            let config = IngestConfig {
529                capture_raw: CaptureRaw::Both,
530                ..Default::default()
531            };
532
533            let mut ingestor =
534                AsyncLineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
535
536            let rec = ingestor.next_record().await.unwrap();
537            assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
538            let captured = rec.captured_raw.expect("expected captured_raw");
539            assert_eq!(captured.line.as_deref(), Some("{\"k\":1}"));
540            assert_eq!(
541                captured
542                    .json
543                    .as_ref()
544                    .and_then(|v| v.get("k"))
545                    .and_then(|v| v.as_i64()),
546                Some(1)
547            );
548        }
549    }
550}
551
552#[cfg(feature = "tokio")]
553pub use tokio_ingest::AsyncLineIngestor;
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558
559    #[derive(Default)]
560    struct TestParser;
561
562    #[derive(Default)]
563    struct FailingParser;
564
565    #[derive(Debug, thiserror::Error)]
566    #[error("boom")]
567    struct TestErr;
568
569    impl crate::line_parser::ClassifiedParserError for TestErr {
570        fn code(&self) -> AdapterErrorCode {
571            AdapterErrorCode::Unknown
572        }
573
574        fn redacted_summary(&self) -> String {
575            "boom".to_string()
576        }
577
578        fn full_details(&self) -> String {
579            "boom details".to_string()
580        }
581    }
582
583    impl LineParser for TestParser {
584        type Event = String;
585        type Error = TestErr;
586
587        fn reset(&mut self) {}
588
589        fn parse_line(&mut self, input: LineInput<'_>) -> Result<Option<Self::Event>, Self::Error> {
590            Ok(Some(input.line.to_string()))
591        }
592    }
593
594    impl LineParser for FailingParser {
595        type Event = String;
596        type Error = TestErr;
597
598        fn reset(&mut self) {}
599
600        fn parse_line(
601            &mut self,
602            _input: LineInput<'_>,
603        ) -> Result<Option<Self::Event>, Self::Error> {
604            Err(TestErr)
605        }
606    }
607
608    #[test]
609    fn captures_line_before_parsing() {
610        let data = b"hello\n";
611        let config = IngestConfig {
612            capture_raw: CaptureRaw::Line,
613            limits: crate::IngestLimits {
614                max_raw_bytes_total: Some(32),
615                ..Default::default()
616            },
617            ..Default::default()
618        };
619
620        let mut ingestor =
621            LineIngestor::new(std::io::Cursor::new(data), TestParser, config, "test");
622        let rec = ingestor.next().unwrap();
623        assert_eq!(
624            rec.captured_raw.as_ref().and_then(|r| r.line.as_deref()),
625            Some("hello")
626        );
627        assert!(rec.outcome.is_ok());
628    }
629
630    #[test]
631    fn preserves_line_capture_on_adapter_error() {
632        let data = b"hello\n";
633        let config = IngestConfig {
634            capture_raw: CaptureRaw::Line,
635            ..Default::default()
636        };
637
638        let mut ingestor =
639            LineIngestor::new(std::io::Cursor::new(data), FailingParser, config, "test");
640
641        let rec = ingestor.next().unwrap();
642        assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
643        assert_eq!(
644            rec.captured_raw.as_ref().and_then(|r| r.line.as_deref()),
645            Some("hello")
646        );
647    }
648
649    #[test]
650    fn preserves_json_capture_on_adapter_error() {
651        let data = br#"{"k":1}"#;
652        let mut bytes = Vec::new();
653        bytes.extend_from_slice(data);
654        bytes.extend_from_slice(b"\n");
655
656        let config = IngestConfig {
657            capture_raw: CaptureRaw::Json,
658            ..Default::default()
659        };
660
661        let mut ingestor =
662            LineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
663
664        let rec = ingestor.next().unwrap();
665        assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
666        let captured = rec.captured_raw.expect("expected captured_raw");
667        assert_eq!(captured.line, None);
668        assert_eq!(
669            captured
670                .json
671                .as_ref()
672                .and_then(|v| v.get("k"))
673                .and_then(|v| v.as_i64()),
674            Some(1)
675        );
676    }
677
678    #[test]
679    fn preserves_both_capture_on_adapter_error() {
680        let data = br#"{"k":1}"#;
681        let mut bytes = Vec::new();
682        bytes.extend_from_slice(data);
683        bytes.extend_from_slice(b"\n");
684
685        let config = IngestConfig {
686            capture_raw: CaptureRaw::Both,
687            ..Default::default()
688        };
689
690        let mut ingestor =
691            LineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
692
693        let rec = ingestor.next().unwrap();
694        assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
695        let captured = rec.captured_raw.expect("expected captured_raw");
696        assert_eq!(captured.line.as_deref(), Some("{\"k\":1}"));
697        assert_eq!(
698            captured
699                .json
700                .as_ref()
701                .and_then(|v| v.get("k"))
702                .and_then(|v| v.as_i64()),
703            Some(1)
704        );
705    }
706}