ingest/lib.rs
1//! UCFP Ingest Layer - Content Ingestion and Validation
2//!
3//! This crate provides the entry point to the Universal Content Fingerprinting (UCFP) pipeline,
4//! transforming raw content and metadata into clean, deterministic records suitable for
5//! downstream processing.
6//!
7//! # Overview
8//!
9//! The ingest crate is responsible for:
10//! - **Validation**: Enforcing metadata policies, size limits, and business rules
11//! - **Normalization**: Collapsing whitespace, stripping control characters, sanitizing inputs
12//! - **ID Generation**: Deriving stable document IDs using UUIDv5 when not explicitly provided
13//! - **Multi-modal Support**: Handling text, binary, and structured payloads uniformly
14//! - **Observability**: Structured logging via `tracing` for production debugging
15//!
16//! # Pipeline Position
17//!
18//! ```text
19//! Raw Content ──▶ Ingest ──▶ Canonical ──▶ Perceptual/Semantic ──▶ Index ──▶ Match
20//! ↑
21//! (this crate)
22//! ```
23//!
24//! # Quick Start
25//!
26//! ```rust
27//! use ingest::{
28//! ingest, IngestConfig, RawIngestRecord,
29//! IngestSource, IngestMetadata, IngestPayload
30//! };
31//! use chrono::Utc;
32//!
33//! // Configure (use defaults for quick start)
34//! let config = IngestConfig::default();
35//!
36//! // Create a raw record
37//! let record = RawIngestRecord {
38//! id: "doc-001".to_string(),
39//! source: IngestSource::RawText,
40//! metadata: IngestMetadata {
41//! tenant_id: Some("acme-corp".to_string()),
42//! doc_id: Some("report-q4-2024".to_string()),
43//! received_at: Some(Utc::now()),
44//! original_source: None,
45//! attributes: None,
46//! },
47//! payload: Some(IngestPayload::Text(
48//! " Quarterly report: revenue up 15% YoY. ".to_string()
49//! )),
50//! };
51//!
52//! // Ingest and get canonical record
53//! let canonical = ingest(record, &config).unwrap();
54//!
55//! assert_eq!(canonical.tenant_id, "acme-corp");
56//! // Whitespace normalized: "Quarterly report: revenue up 15% YoY."
57//! ```
58//!
59//! # Core Design Principles
60//!
61//! 1. **Fail Fast**: Validation happens before any transformation
62//! 2. **Deterministic**: Same input always produces same output (critical for fingerprinting)
63//! 3. **Observable**: Every operation is logged with structured tracing
64//! 4. **Safe**: Control characters stripped, sizes bounded, UTF-8 validated
65//!
66//! # Architecture
67//!
68//! The ingest pipeline follows a strict data flow:
69//!
70//! 1. **Payload Requirements Check**: Verify source mandates are met
71//! 2. **Raw Size Validation**: Enforce `max_payload_bytes` limit
72//! 3. **Metadata Normalization**: Apply defaults, validate policies, sanitize
73//! 4. **Payload Normalization**: Decode UTF-8, collapse whitespace, preserve binary
74//! 5. **Normalized Size Validation**: Enforce `max_normalized_bytes` limit
75//! 6. **Canonical Record Construction**: Build deterministic output
76//!
77//! # Module Structure
78//!
79//! - `config`: Configuration types (`IngestConfig`, `MetadataPolicy`)
80//! - `error`: Error types (`IngestError`)
81//! - `types`: Data model (`RawIngestRecord`, `CanonicalIngestRecord`, etc.)
82//! - `metadata`: Metadata normalization and validation logic
83//! - `payload`: Payload validation and transformation utilities
84//!
85//! # Error Handling
86//!
87//! All errors are typed via [`IngestError`] for precise handling:
88//!
89//! ```rust
90//! use ingest::{ingest, IngestError};
91//!
92//! match ingest(record, &config) {
93//! Ok(canonical) => process(canonical),
94//! Err(IngestError::PayloadTooLarge(msg)) => {
95//! eprintln!("Content too large: {}", msg);
96//! }
97//! Err(IngestError::InvalidUtf8(msg)) => {
98//! eprintln!("Invalid encoding: {}", msg);
99//! }
100//! Err(e) => {
101//! eprintln!("Ingest failed: {}", e);
102//! }
103//! }
104//! ```
105//!
106//! # Configuration
107//!
108//! For production use, configure size limits and policies:
109//!
110//! ```rust
111//! use ingest::{IngestConfig, MetadataPolicy, RequiredField};
112//! use uuid::Uuid;
113//!
114//! let config = IngestConfig {
115//! version: 1,
116//! default_tenant_id: "default".to_string(),
117//! doc_id_namespace: Uuid::new_v5(&Uuid::NAMESPACE_DNS, b"myapp.example.com"),
118//! strip_control_chars: true,
119//! metadata_policy: MetadataPolicy {
120//! required_fields: vec![
121//! RequiredField::TenantId,
122//! RequiredField::DocId,
123//! ],
124//! max_attribute_bytes: Some(1024 * 1024), // 1 MB
125//! reject_future_timestamps: true,
126//! },
127//! max_payload_bytes: Some(50 * 1024 * 1024), // 50 MB raw
128//! max_normalized_bytes: Some(10 * 1024 * 1024), // 10 MB normalized
129//! };
130//!
131//! // Validate at startup
132//! config.validate().expect("Invalid configuration");
133//! ```
134//!
135//! # Performance
136//!
137//! - **Base overhead**: ~5-15μs for small payloads
138//! - **Text normalization**: O(n) where n = text length
139//! - **Memory**: Allocates new String during normalization
140//! - **Thread safety**: `ingest()` is pure and safe for parallel processing
141//!
142//! # Examples
143//!
144//! See the `examples/` directory for complete working examples:
145//! - `ingest_demo.rs`: Basic text ingestion
146//! - `batch_ingest.rs`: Processing multiple records
147//! - `size_limit_demo.rs`: Size limit enforcement demonstration
148//!
149//! # See Also
150//!
151//! - [Crate documentation](doc/ingest.md) for comprehensive guides
152//! - `config` module for configuration details
153//! - `types` module for data structure definitions
154
155use std::time::Instant;
156
157use tracing::{info, warn, Level};
158
159mod config;
160mod error;
161mod metadata;
162mod payload;
163mod types;
164
165use crate::metadata::normalize_metadata;
166
167pub use crate::config::{ConfigError, IngestConfig, MetadataPolicy, RequiredField};
168pub use crate::error::IngestError;
169pub use crate::payload::{
170 normalize_payload_option, payload_kind, payload_length, validate_payload_requirements,
171};
172pub use crate::types::{
173 CanonicalIngestRecord, CanonicalPayload, IngestMetadata, IngestPayload, IngestSource,
174 RawIngestRecord,
175};
176
177/// Ingests a raw record and produces a canonical, normalized record.
178///
179/// This is the primary entry point for the ingest pipeline. It validates the raw record,
180/// normalizes metadata and payload, and returns a deterministic `CanonicalIngestRecord`
181/// suitable for downstream processing.
182///
183/// # Arguments
184///
185/// * `raw` - The raw ingest record containing metadata and optional payload
186/// * `cfg` - Runtime configuration controlling validation and normalization behavior
187///
188/// # Returns
189///
190/// * `Ok(CanonicalIngestRecord)` - Successfully ingested and normalized record
191/// * `Err(IngestError)` - Validation or normalization failure with specific error type
192///
193/// # Errors
194///
195/// This function can return various [`IngestError`] variants:
196///
197/// * `MissingPayload` - Source requires a payload but none was provided
198/// * `EmptyBinaryPayload` - Binary payload has zero bytes
199/// * `InvalidMetadata(String)` - Metadata policy violation (required field missing, future timestamp, etc.)
200/// * `InvalidUtf8(String)` - `TextBytes` payload contains invalid UTF-8 sequences
201/// * `EmptyNormalizedText` - Text payload is empty after whitespace normalization
202/// * `PayloadTooLarge(String)` - Payload exceeds configured size limits
203///
204/// # Side Effects
205///
206/// * Emits structured tracing spans for observability
207/// * Records timing metrics for performance monitoring
208///
209/// # Examples
210///
211/// ## Basic Usage
212///
213/// ```rust
214/// use ingest::{
215/// ingest, IngestConfig, RawIngestRecord,
216/// IngestSource, IngestMetadata, IngestPayload
217/// };
218/// use chrono::Utc;
219///
220/// let config = IngestConfig::default();
221/// let record = RawIngestRecord {
222/// id: "my-doc-1".into(),
223/// source: IngestSource::RawText,
224/// metadata: IngestMetadata {
225/// tenant_id: Some("my-tenant".into()),
226/// doc_id: None, // Will be derived
227/// received_at: Some(Utc::now()),
228/// original_source: None,
229/// attributes: None,
230/// },
231/// payload: Some(IngestPayload::Text(
232/// " Some text with extra whitespace. ".into()
233/// )),
234/// };
235///
236/// let canonical = ingest(record, &config).unwrap();
237/// assert_eq!(canonical.tenant_id, "my-tenant");
238/// // Note: doc_id is derived if not provided
239/// ```
240///
241/// ## Error Handling
242///
243/// ```rust
244/// use ingest::{ingest, IngestConfig, IngestError, IngestPayload, IngestSource};
245/// use ingest::{RawIngestRecord, IngestMetadata};
246///
247/// let config = IngestConfig::default();
248///
249/// // Invalid UTF-8 bytes
250/// let record = RawIngestRecord {
251/// id: "test".into(),
252/// source: IngestSource::RawText,
253/// metadata: IngestMetadata {
254/// tenant_id: Some("tenant".into()),
255/// doc_id: Some("doc".into()),
256/// received_at: None,
257/// original_source: None,
258/// attributes: None,
259/// },
260/// payload: Some(IngestPayload::TextBytes(vec![0xff, 0xfe])),
261/// };
262///
263/// match ingest(record, &config) {
264/// Err(IngestError::InvalidUtf8(_)) => println!("Invalid UTF-8 detected"),
265/// _ => println!("Other result"),
266/// }
267/// ```
268///
269/// # Performance
270///
271/// - Small text payloads: ~10-20μs
272/// - Large text payloads: scales linearly with size
273/// - Binary payloads: minimal overhead (size check only)
274pub fn ingest(
275 raw: RawIngestRecord,
276 cfg: &IngestConfig,
277) -> Result<CanonicalIngestRecord, IngestError> {
278 let start = Instant::now();
279 let RawIngestRecord {
280 id,
281 source,
282 metadata,
283 payload,
284 } = raw;
285
286 let tenant_hint = metadata.tenant_id.clone();
287 let doc_hint = metadata.doc_id.clone();
288
289 let record_id = match metadata::sanitize_required_field("id", id, cfg.strip_control_chars) {
290 Ok(id) => id,
291 Err(err) => {
292 let elapsed_micros = start.elapsed().as_micros();
293 warn!(error = %err, elapsed_micros, "ingest_failure");
294 return Err(err);
295 }
296 };
297
298 let span = tracing::span!(
299 Level::INFO,
300 "ingest.ingest",
301 record_id = %record_id,
302 source = ?source
303 );
304 let _guard = span.enter();
305
306 match ingest_inner(record_id.clone(), source, metadata, payload, cfg) {
307 Ok(record) => {
308 let elapsed_micros = start.elapsed().as_micros();
309 info!(
310 tenant_id = %record.tenant_id,
311 doc_id = %record.doc_id,
312 payload_kind = %payload_kind(record.normalized_payload.as_ref()),
313 normalized_len = payload_length(record.normalized_payload.as_ref()),
314 elapsed_micros,
315 "ingest_success"
316 );
317 Ok(record)
318 }
319 Err(err) => {
320 let elapsed_micros = start.elapsed().as_micros();
321 warn!(
322 tenant_id = ?tenant_hint,
323 doc_id = ?doc_hint,
324 error = %err,
325 elapsed_micros,
326 "ingest_failure"
327 );
328 Err(err)
329 }
330 }
331}
332
333/// Core ingest logic: validates payload requirements, normalizes metadata and payload.
334///
335/// This internal function performs the actual ingest work. It is separated from the
336/// public `ingest()` function to facilitate testing and to keep the observability
337/// wrapper clean.
338///
339/// # Arguments
340///
341/// * `record_id` - Sanitized unique identifier for this ingest operation
342/// * `source` - Source type (RawText, File, etc.)
343/// * `metadata` - Raw metadata to be normalized
344/// * `payload` - Optional raw payload
345/// * `cfg` - Configuration for validation and normalization
346///
347/// # Returns
348///
349/// Normalized `CanonicalIngestRecord` on success, `IngestError` on failure
350fn ingest_inner(
351 record_id: String,
352 source: IngestSource,
353 metadata: IngestMetadata,
354 payload: Option<IngestPayload>,
355 cfg: &IngestConfig,
356) -> Result<CanonicalIngestRecord, IngestError> {
357 // Some sources require a payload, so we check for that first.
358 validate_payload_requirements(&source, &payload)?;
359
360 // Reject oversized raw payloads before normalization.
361 if let Some(limit) = cfg.max_payload_bytes {
362 if let Some(ref p) = payload {
363 let len = match p {
364 IngestPayload::Text(s) => s.len(),
365 IngestPayload::TextBytes(b) => b.len(),
366 IngestPayload::Binary(b) => b.len(),
367 };
368 if len > limit {
369 return Err(IngestError::PayloadTooLarge(format!(
370 "raw payload size {len} exceeds limit of {limit}"
371 )));
372 }
373 }
374 }
375
376 // Metadata is normalized and validated against the configured policies.
377 let normalized_metadata = normalize_metadata(metadata, cfg, &record_id)?;
378 // The payload is normalized based on its type (text or binary).
379 let normalized_payload = normalize_payload_option(&source, payload, cfg)?;
380
381 Ok(CanonicalIngestRecord {
382 id: record_id,
383 tenant_id: normalized_metadata.tenant_id,
384 doc_id: normalized_metadata.doc_id,
385 received_at: normalized_metadata.received_at,
386 original_source: normalized_metadata.original_source,
387 source,
388 normalized_payload,
389 attributes: normalized_metadata.attributes,
390 })
391}
392
393/// Normalizes text by collapsing repeated whitespace and trimming edges.
394///
395/// This function performs the following transformations:
396/// - Trims leading and trailing whitespace
397/// - Collapses multiple consecutive whitespace characters (spaces, tabs, newlines) into single spaces
398/// - Preserves Unicode characters (including emojis)
399/// - Handles all Unicode whitespace as defined by `char::is_whitespace()`
400///
401/// # Arguments
402///
403/// * `s` - The input string to normalize
404///
405/// # Returns
406///
407/// A new `String` with whitespace normalized. Returns empty string if input is whitespace-only.
408///
409/// # Examples
410///
411/// ```rust
412/// use ingest::normalize_payload;
413///
414/// // Collapse multiple spaces
415/// let result = normalize_payload(" Hello world ");
416/// assert_eq!(result, "Hello world");
417///
418/// // Handle newlines and tabs
419/// let result = normalize_payload("Line1\n\n\t\tLine2");
420/// assert_eq!(result, "Line1 Line2");
421///
422/// // Preserve Unicode
423/// let result = normalize_payload(" Hello 👋 world ");
424/// assert_eq!(result, "Hello 👋 world");
425///
426/// // Empty result for whitespace-only input
427/// let result = normalize_payload(" \n\t ");
428/// assert_eq!(result, "");
429/// ```
430///
431/// # Performance
432///
433/// - Time complexity: O(n) where n is the length of the input string
434/// - Space complexity: O(n) for the output string
435/// - Pre-allocates capacity equal to input length to minimize reallocations
436///
437/// # Use Cases
438///
439/// - Preparing text for fingerprinting (ensures whitespace differences don't affect matching)
440/// - Normalizing user input for storage
441/// - Cleaning scraped web content
442pub fn normalize_payload(s: &str) -> String {
443 let mut normalized = String::with_capacity(s.len());
444 for segment in s.split_whitespace() {
445 if !normalized.is_empty() {
446 normalized.push(' ');
447 }
448 normalized.push_str(segment);
449 }
450 normalized
451}
452
453#[cfg(test)]
454mod tests {
455 use chrono::{DateTime, Duration, NaiveDate, Utc};
456
457 use super::*;
458
459 fn fixed_timestamp() -> DateTime<Utc> {
460 let Some(date) = NaiveDate::from_ymd_opt(2024, 1, 1) else {
461 panic!("invalid date components");
462 };
463 let Some(date_time) = date.and_hms_opt(0, 0, 0) else {
464 panic!("invalid time components");
465 };
466 DateTime::<Utc>::from_naive_utc_and_offset(date_time, Utc)
467 }
468
469 fn base_metadata() -> IngestMetadata {
470 IngestMetadata {
471 tenant_id: Some("tenant1".into()),
472 doc_id: Some("doc-123".into()),
473 received_at: Some(fixed_timestamp()),
474 original_source: None,
475 attributes: None,
476 }
477 }
478
479 #[test]
480 fn test_normalize_payload() {
481 let cases = [
482 (
483 " Hello\n\n world\t this is\n a test ",
484 "Hello world this is a test",
485 ),
486 ("\n", ""),
487 ("emoji \u{1f600} test ", "emoji \u{1f600} test"),
488 ];
489
490 for (input, expected) in cases {
491 assert_eq!(normalize_payload(input), expected);
492 }
493 }
494
495 #[test]
496 fn test_ingest_rawtext_success() {
497 let record = RawIngestRecord {
498 id: "ingest-1".into(),
499 source: IngestSource::RawText,
500 metadata: base_metadata(),
501 payload: Some(IngestPayload::Text(" Hello world \n ".into())),
502 };
503
504 let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
505 assert_eq!(rec.tenant_id, "tenant1");
506 assert_eq!(rec.doc_id, "doc-123");
507 match rec.normalized_payload {
508 Some(CanonicalPayload::Text(text)) => assert_eq!(text, "Hello world"),
509 _ => panic!("expected text payload"),
510 }
511 }
512
513 #[test]
514 fn test_ingest_missing_payload_for_rawtext() {
515 let record = RawIngestRecord {
516 id: "ingest-2".into(),
517 source: IngestSource::RawText,
518 metadata: base_metadata(),
519 payload: Some(IngestPayload::Text(" ".into())),
520 };
521
522 let res = ingest(record, &IngestConfig::default());
523 assert!(matches!(res, Err(IngestError::EmptyNormalizedText)));
524 }
525
526 #[test]
527 fn test_ingest_file_binary_payload() {
528 let record = RawIngestRecord {
529 id: "ingest-3".into(),
530 source: IngestSource::File {
531 filename: "image.png".into(),
532 content_type: Some("image/png".into()),
533 },
534 metadata: base_metadata(),
535 payload: Some(IngestPayload::Binary(vec![1, 2, 3, 4])),
536 };
537
538 let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
539 match rec.normalized_payload {
540 Some(CanonicalPayload::Binary(bytes)) => assert_eq!(bytes, vec![1, 2, 3, 4]),
541 _ => panic!("expected binary payload"),
542 }
543 }
544
545 #[test]
546 fn test_metadata_preserved() {
547 let record = RawIngestRecord {
548 id: "ingest-4".into(),
549 source: IngestSource::Api,
550 metadata: IngestMetadata {
551 tenant_id: Some("tenant-x".into()),
552 doc_id: Some("doc-y".into()),
553 received_at: Some(fixed_timestamp()),
554 original_source: Some("source-42".into()),
555 attributes: Some(serde_json::json!({"kind": "demo"})),
556 },
557 payload: None,
558 };
559
560 let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
561 assert_eq!(rec.tenant_id, "tenant-x");
562 assert_eq!(rec.doc_id, "doc-y");
563 assert_eq!(rec.original_source.as_deref(), Some("source-42"));
564 assert_eq!(rec.attributes, Some(serde_json::json!({"kind": "demo"})));
565 assert!(rec.normalized_payload.is_none());
566 }
567
568 #[test]
569 fn test_defaults_applied_when_metadata_missing() {
570 let record = RawIngestRecord {
571 id: "ingest-5".into(),
572 source: IngestSource::RawText,
573 metadata: IngestMetadata {
574 tenant_id: None,
575 doc_id: None,
576 received_at: None,
577 original_source: Some("\u{0007}source\n".into()),
578 attributes: None,
579 },
580 payload: Some(IngestPayload::Text("payload".into())),
581 };
582
583 let cfg = IngestConfig {
584 default_tenant_id: "fallback".into(),
585 ..Default::default()
586 };
587
588 let rec = ingest(record, &cfg).expect("ingest should succeed");
589 assert_eq!(rec.tenant_id, "fallback");
590 assert!(!rec.doc_id.is_empty());
591 assert!(rec.original_source.unwrap().contains("source"));
592 }
593
594 #[test]
595 fn test_doc_id_derivation_deterministic() {
596 let metadata = IngestMetadata {
597 tenant_id: None,
598 doc_id: None,
599 received_at: None,
600 original_source: None,
601 attributes: None,
602 };
603
604 let cfg = IngestConfig::default();
605 let record_a = RawIngestRecord {
606 id: "deterministic".into(),
607 source: IngestSource::RawText,
608 metadata: metadata.clone(),
609 payload: Some(IngestPayload::Text("payload".into())),
610 };
611 let record_b = RawIngestRecord {
612 id: "deterministic".into(),
613 source: IngestSource::RawText,
614 metadata,
615 payload: Some(IngestPayload::Text("payload".into())),
616 };
617
618 let rec_a = ingest(record_a, &cfg).expect("first ingest succeeds");
619 let rec_b = ingest(record_b, &cfg).expect("second ingest succeeds");
620
621 assert_eq!(rec_a.doc_id, rec_b.doc_id);
622 }
623
624 #[test]
625 fn test_invalid_utf8_payload_rejected() {
626 let record = RawIngestRecord {
627 id: "ingest-utf8".into(),
628 source: IngestSource::RawText,
629 metadata: base_metadata(),
630 payload: Some(IngestPayload::TextBytes(vec![0xff, 0xfe])),
631 };
632
633 let res = ingest(record, &IngestConfig::default());
634 assert!(matches!(res, Err(IngestError::InvalidUtf8(_))));
635 }
636
637 #[test]
638 fn test_control_chars_removed_from_metadata() {
639 let record = RawIngestRecord {
640 id: "ingest-ctrl".into(),
641 source: IngestSource::Api,
642 metadata: IngestMetadata {
643 tenant_id: Some("tenant\u{0003}".into()),
644 doc_id: Some("doc\n\r".into()),
645 received_at: None,
646 original_source: Some(" source\u{0008} ".into()),
647 attributes: None,
648 },
649 payload: None,
650 };
651
652 let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
653 assert_eq!(rec.tenant_id, "tenant");
654 assert_eq!(rec.doc_id, "doc");
655 assert_eq!(rec.original_source.as_deref(), Some("source"));
656 }
657
658 #[test]
659 fn required_tenant_id_enforced() {
660 let record = RawIngestRecord {
661 id: "ingest-required-tenant".into(),
662 source: IngestSource::RawText,
663 metadata: IngestMetadata {
664 tenant_id: None,
665 doc_id: Some("doc".into()),
666 received_at: Some(fixed_timestamp()),
667 original_source: None,
668 attributes: None,
669 },
670 payload: Some(IngestPayload::Text("payload".into())),
671 };
672
673 let cfg = IngestConfig {
674 metadata_policy: MetadataPolicy {
675 required_fields: vec![RequiredField::TenantId],
676 ..Default::default()
677 },
678 ..Default::default()
679 };
680
681 let res = ingest(record, &cfg);
682 assert!(matches!(res, Err(IngestError::InvalidMetadata(_))));
683 }
684
685 #[test]
686 fn future_timestamp_rejected() {
687 let future = Utc::now() + Duration::days(1);
688 let record = RawIngestRecord {
689 id: "ingest-future-ts".into(),
690 source: IngestSource::Api,
691 metadata: IngestMetadata {
692 tenant_id: Some("tenant".into()),
693 doc_id: Some("doc".into()),
694 received_at: Some(future),
695 original_source: None,
696 attributes: None,
697 },
698 payload: None,
699 };
700
701 let cfg = IngestConfig {
702 metadata_policy: MetadataPolicy {
703 reject_future_timestamps: true,
704 ..Default::default()
705 },
706 ..Default::default()
707 };
708
709 let res = ingest(record, &cfg);
710 assert!(matches!(res, Err(IngestError::InvalidMetadata(msg)) if msg.contains("future")));
711 }
712
713 #[test]
714 fn max_attribute_bytes_enforced() {
715 let record = RawIngestRecord {
716 id: "ingest-attrs".into(),
717 source: IngestSource::Api,
718 metadata: IngestMetadata {
719 tenant_id: Some("tenant".into()),
720 doc_id: Some("doc".into()),
721 received_at: Some(fixed_timestamp()),
722 original_source: None,
723 attributes: Some(serde_json::json!({
724 "blob": "x".repeat(32)
725 })),
726 },
727 payload: None,
728 };
729
730 let cfg = IngestConfig {
731 metadata_policy: MetadataPolicy {
732 max_attribute_bytes: Some(16),
733 ..Default::default()
734 },
735 ..Default::default()
736 };
737
738 let res = ingest(record, &cfg);
739 assert!(
740 matches!(res, Err(IngestError::InvalidMetadata(msg)) if msg.contains("attributes exceed"))
741 );
742 }
743
744 #[test]
745 fn test_ingest_empty_binary_payload() {
746 let record = RawIngestRecord {
747 id: "ingest-empty-binary".into(),
748 source: IngestSource::File {
749 filename: "empty.bin".into(),
750 content_type: Some("application/octet-stream".into()),
751 },
752 metadata: base_metadata(),
753 payload: Some(IngestPayload::Binary(vec![])),
754 };
755
756 let res = ingest(record, &IngestConfig::default());
757 assert!(matches!(res, Err(IngestError::EmptyBinaryPayload)));
758 }
759
760 #[test]
761 fn max_payload_bytes_enforced_text() {
762 let record = RawIngestRecord {
763 id: "ingest-payload-limit".into(),
764 source: IngestSource::RawText,
765 metadata: base_metadata(),
766 payload: Some(IngestPayload::Text("x".repeat(17))),
767 };
768
769 let cfg = IngestConfig {
770 max_payload_bytes: Some(16),
771 ..Default::default()
772 };
773
774 let res = ingest(record, &cfg);
775 assert!(
776 matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("raw payload"))
777 );
778 }
779
780 #[test]
781 fn max_payload_bytes_enforced_bytes() {
782 let record = RawIngestRecord {
783 id: "ingest-payload-limit-bytes".into(),
784 source: IngestSource::RawText,
785 metadata: base_metadata(),
786 payload: Some(IngestPayload::TextBytes(vec![b'x'; 17])),
787 };
788
789 let cfg = IngestConfig {
790 max_payload_bytes: Some(16),
791 ..Default::default()
792 };
793
794 let res = ingest(record, &cfg);
795 assert!(
796 matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("raw payload"))
797 );
798 }
799
800 #[test]
801 fn max_payload_bytes_enforced_binary() {
802 let record = RawIngestRecord {
803 id: "ingest-payload-limit-binary".into(),
804 source: IngestSource::File {
805 filename: "large.bin".into(),
806 content_type: None,
807 },
808 metadata: base_metadata(),
809 payload: Some(IngestPayload::Binary(vec![0; 17])),
810 };
811
812 let cfg = IngestConfig {
813 max_payload_bytes: Some(16),
814 ..Default::default()
815 };
816
817 let res = ingest(record, &cfg);
818 assert!(
819 matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("raw payload"))
820 );
821 }
822
823 #[test]
824 fn max_normalized_bytes_enforced() {
825 let record = RawIngestRecord {
826 id: "ingest-norm-limit".into(),
827 source: IngestSource::RawText,
828 metadata: base_metadata(),
829 payload: Some(IngestPayload::Text("a ".repeat(9))), // Raw: 18 bytes
830 };
831
832 let cfg = IngestConfig {
833 max_payload_bytes: Some(20),
834 max_normalized_bytes: Some(16),
835 ..Default::default()
836 };
837
838 let res = ingest(record, &cfg);
839 // Normalizes to "a a a a a a a a a", which is 17 bytes long
840 assert!(
841 matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("normalized payload"))
842 );
843 }
844
845 #[test]
846 fn payload_size_limits_respected() {
847 let record = RawIngestRecord {
848 id: "ingest-limits-ok".into(),
849 source: IngestSource::RawText,
850 metadata: base_metadata(),
851 payload: Some(IngestPayload::Text(" data data ".into())), // Raw: 11, Normalized: 9
852 };
853
854 let cfg = IngestConfig {
855 max_payload_bytes: Some(12),
856 max_normalized_bytes: Some(10),
857 ..Default::default()
858 };
859
860 let res = ingest(record, &cfg);
861 assert!(res.is_ok());
862 let rec = res.unwrap();
863 assert_eq!(
864 rec.normalized_payload,
865 Some(CanonicalPayload::Text("data data".into()))
866 );
867 }
868}