1use std::io::Read;
2
3use serde_json::Value;
4
5use crate::config::{CaptureRaw, ErrorDetailCapture, IngestConfig};
6use crate::error::{AdapterErrorCode, CapturedRaw, ErrorDetail, LineRecord, LineRecordError};
7use crate::line_parser::{ClassifiedParserError, LineInput, LineParser};
8use crate::reader::{BoundedLine, SyncBoundedLineReader};
9
10#[derive(Debug, Clone, Copy)]
11pub struct RawCaptureBudget {
12 remaining_bytes: Option<usize>,
13}
14
15impl RawCaptureBudget {
16 pub fn new(limit: Option<usize>) -> Self {
17 Self {
18 remaining_bytes: limit,
19 }
20 }
21
22 fn can_spend(&self, bytes: usize) -> bool {
23 match self.remaining_bytes {
24 None => true,
25 Some(rem) => bytes <= rem,
26 }
27 }
28
29 fn spend(&mut self, bytes: usize) {
30 if let Some(rem) = self.remaining_bytes {
31 self.remaining_bytes = Some(rem.saturating_sub(bytes));
32 }
33 }
34}
35
36pub struct LineIngestor<R: Read, P: LineParser> {
37 reader: SyncBoundedLineReader<R>,
38 parser: P,
39 config: IngestConfig,
40 budget: RawCaptureBudget,
41 adapter_name: &'static str,
42}
43
44impl<R: Read, P: LineParser> LineIngestor<R, P> {
45 pub fn new(reader: R, parser: P, config: IngestConfig, adapter_name: &'static str) -> Self {
46 let budget = RawCaptureBudget::new(config.limits.max_raw_bytes_total);
47 Self {
48 reader: SyncBoundedLineReader::new(reader, config.limits.max_line_bytes),
49 parser,
50 config,
51 budget,
52 adapter_name,
53 }
54 }
55
56 pub fn into_parser(self) -> P {
57 self.parser
58 }
59
60 fn record_error<T>(&self, line_number: usize, err: LineRecordError) -> LineRecord<T> {
61 LineRecord {
62 line_number,
63 captured_raw: None,
64 outcome: Err(err),
65 }
66 }
67
68 fn normalize_line(line: &str) -> &str {
69 line.strip_suffix('\r').unwrap_or(line)
70 }
71
72 fn line_is_blank(line: &str) -> bool {
73 line.chars().all(|ch| ch.is_whitespace())
74 }
75
76 fn maybe_capture_line(&mut self, line: &str) -> Option<String> {
77 if !matches!(self.config.capture_raw, CaptureRaw::Line | CaptureRaw::Both) {
78 return None;
79 }
80 let bytes = line.len();
81 if !self.budget.can_spend(bytes) {
82 return None;
83 }
84 self.budget.spend(bytes);
85 Some(line.to_string())
86 }
87
88 fn maybe_capture_json(&mut self, line: &str) -> Option<Value> {
89 if !matches!(self.config.capture_raw, CaptureRaw::Json | CaptureRaw::Both) {
90 return None;
91 }
92 let value: Value = serde_json::from_str(line).ok()?;
93 let bytes = serde_json::to_vec(&value).ok()?.len();
94 if !self.budget.can_spend(bytes) {
95 return None;
96 }
97 self.budget.spend(bytes);
98 Some(value)
99 }
100
101 fn capture_raw(&mut self, line: &str) -> Option<CapturedRaw> {
102 match self.config.capture_raw {
103 CaptureRaw::None => None,
104 CaptureRaw::Line => self.maybe_capture_line(line).map(|line| CapturedRaw {
105 line: Some(line),
106 json: None,
107 }),
108 CaptureRaw::Json => self.maybe_capture_json(line).map(|json| CapturedRaw {
109 line: None,
110 json: Some(json),
111 }),
112 CaptureRaw::Both => {
113 let line_cap = self.maybe_capture_line(line);
114 let json_cap = self.maybe_capture_json(line);
115 if line_cap.is_none() && json_cap.is_none() {
116 None
117 } else {
118 Some(CapturedRaw {
119 line: line_cap,
120 json: json_cap,
121 })
122 }
123 }
124 }
125 }
126
127 fn adapter_error_record<T>(
128 &mut self,
129 line_number: usize,
130 captured_raw: Option<CapturedRaw>,
131 code: AdapterErrorCode,
132 summary: String,
133 full_details: String,
134 ) -> LineRecord<T> {
135 if self.config.error_detail_capture == ErrorDetailCapture::FullDetails {
136 if let Some(sink) = self.config.error_sink.as_mut() {
137 sink.on_error(ErrorDetail {
138 line_number,
139 code,
140 adapter: self.adapter_name,
141 details: full_details,
142 });
143 }
144 }
145 LineRecord {
146 line_number,
147 captured_raw,
148 outcome: Err(LineRecordError::Adapter { code, summary }),
149 }
150 }
151}
152
153impl<R: Read, P: LineParser> Iterator for LineIngestor<R, P> {
154 type Item = LineRecord<P::Event>;
155
156 fn next(&mut self) -> Option<Self::Item> {
157 loop {
158 let next = self.reader.next()?;
159 match next {
160 BoundedLine::IoError { line_number } => {
161 return Some(self.record_error(line_number, LineRecordError::Io));
162 }
163 BoundedLine::LineTooLong {
164 line_number,
165 observed_bytes,
166 max_line_bytes,
167 } => {
168 return Some(self.record_error(
169 line_number,
170 LineRecordError::LineTooLong {
171 observed_bytes,
172 max_line_bytes,
173 },
174 ));
175 }
176 BoundedLine::Line { line_number, bytes } => {
177 let Ok(raw_line) = String::from_utf8(bytes) else {
178 return Some(self.record_error(line_number, LineRecordError::InvalidUtf8));
179 };
180 let line = Self::normalize_line(&raw_line);
181 if Self::line_is_blank(line) {
182 continue;
183 }
184
185 let captured_raw = self.capture_raw(line);
186 let json_capture = captured_raw.as_ref().and_then(|raw| raw.json.as_ref());
187 let input = LineInput { line, json_capture };
188
189 match self.parser.parse_line(input) {
190 Ok(None) => continue,
191 Ok(Some(event)) => {
192 return Some(LineRecord {
193 line_number,
194 captured_raw,
195 outcome: Ok(event),
196 });
197 }
198 Err(err) => {
199 return Some(self.adapter_error_record(
200 line_number,
201 captured_raw,
202 err.code(),
203 err.redacted_summary(),
204 err.full_details(),
205 ));
206 }
207 }
208 }
209 }
210 }
211 }
212}
213
214#[cfg(feature = "tokio")]
215mod tokio_ingest {
216 use serde_json::Value;
217 use tokio::io::AsyncRead;
218
219 use crate::config::{CaptureRaw, ErrorDetailCapture, IngestConfig};
220 use crate::error::{AdapterErrorCode, CapturedRaw, ErrorDetail, LineRecord, LineRecordError};
221 use crate::line_parser::{ClassifiedParserError, LineInput, LineParser};
222 use crate::reader::{AsyncBoundedLineReader, AsyncBoundedLineResult};
223 use crate::RawCaptureBudget;
224
225 pub struct AsyncLineIngestor<R: AsyncRead + Unpin, P: LineParser> {
226 reader: AsyncBoundedLineReader<R>,
227 parser: P,
228 config: IngestConfig,
229 budget: RawCaptureBudget,
230 adapter_name: &'static str,
231 }
232
233 impl<R: AsyncRead + Unpin, P: LineParser> AsyncLineIngestor<R, P> {
234 pub fn new(reader: R, parser: P, config: IngestConfig, adapter_name: &'static str) -> Self {
235 let budget = RawCaptureBudget::new(config.limits.max_raw_bytes_total);
236 Self {
237 reader: AsyncBoundedLineReader::new(reader, config.limits.max_line_bytes),
238 parser,
239 config,
240 budget,
241 adapter_name,
242 }
243 }
244
245 fn record_error<T>(&self, line_number: usize, err: LineRecordError) -> LineRecord<T> {
246 LineRecord {
247 line_number,
248 captured_raw: None,
249 outcome: Err(err),
250 }
251 }
252
253 fn normalize_line(line: &str) -> &str {
254 line.strip_suffix('\r').unwrap_or(line)
255 }
256
257 fn line_is_blank(line: &str) -> bool {
258 line.chars().all(|ch| ch.is_whitespace())
259 }
260
261 fn maybe_capture_line(&mut self, line: &str) -> Option<String> {
262 if !matches!(self.config.capture_raw, CaptureRaw::Line | CaptureRaw::Both) {
263 return None;
264 }
265 let bytes = line.len();
266 if !self.budget.can_spend(bytes) {
267 return None;
268 }
269 self.budget.spend(bytes);
270 Some(line.to_string())
271 }
272
273 fn maybe_capture_json(&mut self, line: &str) -> Option<Value> {
274 if !matches!(self.config.capture_raw, CaptureRaw::Json | CaptureRaw::Both) {
275 return None;
276 }
277 let value: Value = serde_json::from_str(line).ok()?;
278 let bytes = serde_json::to_vec(&value).ok()?.len();
279 if !self.budget.can_spend(bytes) {
280 return None;
281 }
282 self.budget.spend(bytes);
283 Some(value)
284 }
285
286 fn capture_raw(&mut self, line: &str) -> Option<CapturedRaw> {
287 match self.config.capture_raw {
288 CaptureRaw::None => None,
289 CaptureRaw::Line => self.maybe_capture_line(line).map(|line| CapturedRaw {
290 line: Some(line),
291 json: None,
292 }),
293 CaptureRaw::Json => self.maybe_capture_json(line).map(|json| CapturedRaw {
294 line: None,
295 json: Some(json),
296 }),
297 CaptureRaw::Both => {
298 let line_cap = self.maybe_capture_line(line);
299 let json_cap = self.maybe_capture_json(line);
300 if line_cap.is_none() && json_cap.is_none() {
301 None
302 } else {
303 Some(CapturedRaw {
304 line: line_cap,
305 json: json_cap,
306 })
307 }
308 }
309 }
310 }
311
312 fn adapter_error_record<T>(
313 &mut self,
314 line_number: usize,
315 captured_raw: Option<CapturedRaw>,
316 code: AdapterErrorCode,
317 summary: String,
318 full_details: String,
319 ) -> LineRecord<T> {
320 if self.config.error_detail_capture == ErrorDetailCapture::FullDetails {
321 if let Some(sink) = self.config.error_sink.as_mut() {
322 sink.on_error(ErrorDetail {
323 line_number,
324 code,
325 adapter: self.adapter_name,
326 details: full_details,
327 });
328 }
329 }
330 LineRecord {
331 line_number,
332 captured_raw,
333 outcome: Err(LineRecordError::Adapter { code, summary }),
334 }
335 }
336
337 pub async fn next_record(&mut self) -> Option<LineRecord<P::Event>> {
338 loop {
339 let next = self.reader.next_line().await?;
340 match next {
341 AsyncBoundedLineResult::IoError { line_number } => {
342 return Some(self.record_error(line_number, LineRecordError::Io));
343 }
344 AsyncBoundedLineResult::LineTooLong {
345 line_number,
346 observed_bytes,
347 max_line_bytes,
348 } => {
349 return Some(self.record_error(
350 line_number,
351 LineRecordError::LineTooLong {
352 observed_bytes,
353 max_line_bytes,
354 },
355 ));
356 }
357 AsyncBoundedLineResult::Line { line_number, bytes } => {
358 let Ok(raw_line) = String::from_utf8(bytes) else {
359 return Some(
360 self.record_error(line_number, LineRecordError::InvalidUtf8),
361 );
362 };
363 let line = Self::normalize_line(&raw_line);
364 if Self::line_is_blank(line) {
365 continue;
366 }
367
368 let captured_raw = self.capture_raw(line);
369 let json_capture = captured_raw.as_ref().and_then(|raw| raw.json.as_ref());
370 let input = LineInput { line, json_capture };
371
372 match self.parser.parse_line(input) {
373 Ok(None) => continue,
374 Ok(Some(event)) => {
375 return Some(LineRecord {
376 line_number,
377 captured_raw,
378 outcome: Ok(event),
379 });
380 }
381 Err(err) => {
382 return Some(self.adapter_error_record(
383 line_number,
384 captured_raw,
385 err.code(),
386 err.redacted_summary(),
387 err.full_details(),
388 ));
389 }
390 }
391 }
392 }
393 }
394 }
395 }
396
397 #[cfg(test)]
398 mod tests {
399 use super::*;
400
401 #[derive(Default)]
402 struct TestParser;
403
404 #[derive(Default)]
405 struct FailingParser;
406
407 #[derive(Debug, thiserror::Error)]
408 #[error("boom")]
409 struct TestErr;
410
411 impl crate::line_parser::ClassifiedParserError for TestErr {
412 fn code(&self) -> AdapterErrorCode {
413 AdapterErrorCode::Unknown
414 }
415
416 fn redacted_summary(&self) -> String {
417 "boom".to_string()
418 }
419
420 fn full_details(&self) -> String {
421 "boom details".to_string()
422 }
423 }
424
425 impl crate::LineParser for TestParser {
426 type Event = String;
427 type Error = TestErr;
428
429 fn reset(&mut self) {}
430
431 fn parse_line(
432 &mut self,
433 input: crate::LineInput<'_>,
434 ) -> Result<Option<Self::Event>, Self::Error> {
435 Ok(Some(input.line.to_string()))
436 }
437 }
438
439 impl crate::LineParser for FailingParser {
440 type Event = String;
441 type Error = TestErr;
442
443 fn reset(&mut self) {}
444
445 fn parse_line(
446 &mut self,
447 _input: crate::LineInput<'_>,
448 ) -> Result<Option<Self::Event>, Self::Error> {
449 Err(TestErr)
450 }
451 }
452
453 #[tokio::test]
454 async fn budget_skips_capture_deterministically() {
455 let data = b"{\"k\":1}\n";
456 let config = IngestConfig {
457 capture_raw: CaptureRaw::Both,
458 limits: crate::IngestLimits {
459 max_raw_bytes_total: Some(2),
460 ..Default::default()
461 },
462 ..Default::default()
463 };
464
465 let mut ingestor =
466 AsyncLineIngestor::new(std::io::Cursor::new(data), TestParser, config, "test");
467
468 let rec = ingestor.next_record().await.unwrap();
469 assert!(rec.captured_raw.is_none());
470 assert!(rec.outcome.is_ok());
471 }
472
473 #[tokio::test]
474 async fn preserves_line_capture_on_adapter_error() {
475 let data = b"hello\n";
476 let config = IngestConfig {
477 capture_raw: CaptureRaw::Line,
478 ..Default::default()
479 };
480
481 let mut ingestor =
482 AsyncLineIngestor::new(std::io::Cursor::new(data), FailingParser, config, "test");
483
484 let rec = ingestor.next_record().await.unwrap();
485 assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
486 assert_eq!(
487 rec.captured_raw.as_ref().and_then(|r| r.line.as_deref()),
488 Some("hello")
489 );
490 }
491
492 #[tokio::test]
493 async fn preserves_json_capture_on_adapter_error() {
494 let data = br#"{"k":1}"#;
495 let mut bytes = Vec::new();
496 bytes.extend_from_slice(data);
497 bytes.extend_from_slice(b"\n");
498
499 let config = IngestConfig {
500 capture_raw: CaptureRaw::Json,
501 ..Default::default()
502 };
503
504 let mut ingestor =
505 AsyncLineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
506
507 let rec = ingestor.next_record().await.unwrap();
508 assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
509 let captured = rec.captured_raw.expect("expected captured_raw");
510 assert_eq!(captured.line, None);
511 assert_eq!(
512 captured
513 .json
514 .as_ref()
515 .and_then(|v| v.get("k"))
516 .and_then(|v| v.as_i64()),
517 Some(1)
518 );
519 }
520
521 #[tokio::test]
522 async fn preserves_both_capture_on_adapter_error() {
523 let data = br#"{"k":1}"#;
524 let mut bytes = Vec::new();
525 bytes.extend_from_slice(data);
526 bytes.extend_from_slice(b"\n");
527
528 let config = IngestConfig {
529 capture_raw: CaptureRaw::Both,
530 ..Default::default()
531 };
532
533 let mut ingestor =
534 AsyncLineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
535
536 let rec = ingestor.next_record().await.unwrap();
537 assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
538 let captured = rec.captured_raw.expect("expected captured_raw");
539 assert_eq!(captured.line.as_deref(), Some("{\"k\":1}"));
540 assert_eq!(
541 captured
542 .json
543 .as_ref()
544 .and_then(|v| v.get("k"))
545 .and_then(|v| v.as_i64()),
546 Some(1)
547 );
548 }
549 }
550}
551
552#[cfg(feature = "tokio")]
553pub use tokio_ingest::AsyncLineIngestor;
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558
559 #[derive(Default)]
560 struct TestParser;
561
562 #[derive(Default)]
563 struct FailingParser;
564
565 #[derive(Debug, thiserror::Error)]
566 #[error("boom")]
567 struct TestErr;
568
569 impl crate::line_parser::ClassifiedParserError for TestErr {
570 fn code(&self) -> AdapterErrorCode {
571 AdapterErrorCode::Unknown
572 }
573
574 fn redacted_summary(&self) -> String {
575 "boom".to_string()
576 }
577
578 fn full_details(&self) -> String {
579 "boom details".to_string()
580 }
581 }
582
583 impl LineParser for TestParser {
584 type Event = String;
585 type Error = TestErr;
586
587 fn reset(&mut self) {}
588
589 fn parse_line(&mut self, input: LineInput<'_>) -> Result<Option<Self::Event>, Self::Error> {
590 Ok(Some(input.line.to_string()))
591 }
592 }
593
594 impl LineParser for FailingParser {
595 type Event = String;
596 type Error = TestErr;
597
598 fn reset(&mut self) {}
599
600 fn parse_line(
601 &mut self,
602 _input: LineInput<'_>,
603 ) -> Result<Option<Self::Event>, Self::Error> {
604 Err(TestErr)
605 }
606 }
607
608 #[test]
609 fn captures_line_before_parsing() {
610 let data = b"hello\n";
611 let config = IngestConfig {
612 capture_raw: CaptureRaw::Line,
613 limits: crate::IngestLimits {
614 max_raw_bytes_total: Some(32),
615 ..Default::default()
616 },
617 ..Default::default()
618 };
619
620 let mut ingestor =
621 LineIngestor::new(std::io::Cursor::new(data), TestParser, config, "test");
622 let rec = ingestor.next().unwrap();
623 assert_eq!(
624 rec.captured_raw.as_ref().and_then(|r| r.line.as_deref()),
625 Some("hello")
626 );
627 assert!(rec.outcome.is_ok());
628 }
629
630 #[test]
631 fn preserves_line_capture_on_adapter_error() {
632 let data = b"hello\n";
633 let config = IngestConfig {
634 capture_raw: CaptureRaw::Line,
635 ..Default::default()
636 };
637
638 let mut ingestor =
639 LineIngestor::new(std::io::Cursor::new(data), FailingParser, config, "test");
640
641 let rec = ingestor.next().unwrap();
642 assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
643 assert_eq!(
644 rec.captured_raw.as_ref().and_then(|r| r.line.as_deref()),
645 Some("hello")
646 );
647 }
648
649 #[test]
650 fn preserves_json_capture_on_adapter_error() {
651 let data = br#"{"k":1}"#;
652 let mut bytes = Vec::new();
653 bytes.extend_from_slice(data);
654 bytes.extend_from_slice(b"\n");
655
656 let config = IngestConfig {
657 capture_raw: CaptureRaw::Json,
658 ..Default::default()
659 };
660
661 let mut ingestor =
662 LineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
663
664 let rec = ingestor.next().unwrap();
665 assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
666 let captured = rec.captured_raw.expect("expected captured_raw");
667 assert_eq!(captured.line, None);
668 assert_eq!(
669 captured
670 .json
671 .as_ref()
672 .and_then(|v| v.get("k"))
673 .and_then(|v| v.as_i64()),
674 Some(1)
675 );
676 }
677
678 #[test]
679 fn preserves_both_capture_on_adapter_error() {
680 let data = br#"{"k":1}"#;
681 let mut bytes = Vec::new();
682 bytes.extend_from_slice(data);
683 bytes.extend_from_slice(b"\n");
684
685 let config = IngestConfig {
686 capture_raw: CaptureRaw::Both,
687 ..Default::default()
688 };
689
690 let mut ingestor =
691 LineIngestor::new(std::io::Cursor::new(bytes), FailingParser, config, "test");
692
693 let rec = ingestor.next().unwrap();
694 assert!(matches!(rec.outcome, Err(LineRecordError::Adapter { .. })));
695 let captured = rec.captured_raw.expect("expected captured_raw");
696 assert_eq!(captured.line.as_deref(), Some("{\"k\":1}"));
697 assert_eq!(
698 captured
699 .json
700 .as_ref()
701 .and_then(|v| v.get("k"))
702 .and_then(|v| v.as_i64()),
703 Some(1)
704 );
705 }
706}