Skip to main content

ingest/
lib.rs

1//! UCFP Ingest Layer - Content Ingestion and Validation
2//!
3//! This crate provides the entry point to the Universal Content Fingerprinting (UCFP) pipeline,
4//! transforming raw content and metadata into clean, deterministic records suitable for
5//! downstream processing.
6//!
7//! # Overview
8//!
9//! The ingest crate is responsible for:
10//! - **Validation**: Enforcing metadata policies, size limits, and business rules
11//! - **Normalization**: Collapsing whitespace, stripping control characters, sanitizing inputs
12//! - **ID Generation**: Deriving stable document IDs using UUIDv5 when not explicitly provided
13//! - **Multi-modal Support**: Handling text, binary, and structured payloads uniformly
14//! - **Observability**: Structured logging via `tracing` for production debugging
15//!
16//! # Pipeline Position
17//!
18//! ```text
19//! Raw Content ──▶ Ingest ──▶ Canonical ──▶ Perceptual/Semantic ──▶ Index ──▶ Match
20//!                    ↑
21//!                 (this crate)
22//! ```
23//!
24//! # Quick Start
25//!
26//! ```rust
27//! use ingest::{
28//!     ingest, IngestConfig, RawIngestRecord,
29//!     IngestSource, IngestMetadata, IngestPayload
30//! };
31//! use chrono::Utc;
32//!
33//! // Configure (use defaults for quick start)
34//! let config = IngestConfig::default();
35//!
36//! // Create a raw record
37//! let record = RawIngestRecord {
38//!     id: "doc-001".to_string(),
39//!     source: IngestSource::RawText,
40//!     metadata: IngestMetadata {
41//!         tenant_id: Some("acme-corp".to_string()),
42//!         doc_id: Some("report-q4-2024".to_string()),
43//!         received_at: Some(Utc::now()),
44//!         original_source: None,
45//!         attributes: None,
46//!     },
47//!     payload: Some(IngestPayload::Text(
48//!         "  Quarterly report: revenue up 15% YoY.   ".to_string()
49//!     )),
50//! };
51//!
52//! // Ingest and get canonical record
53//! let canonical = ingest(record, &config).unwrap();
54//!
55//! assert_eq!(canonical.tenant_id, "acme-corp");
56//! // Whitespace normalized: "Quarterly report: revenue up 15% YoY."
57//! ```
58//!
59//! # Core Design Principles
60//!
61//! 1. **Fail Fast**: Validation happens before any transformation
62//! 2. **Deterministic**: Same input always produces same output (critical for fingerprinting)
63//! 3. **Observable**: Every operation is logged with structured tracing
64//! 4. **Safe**: Control characters stripped, sizes bounded, UTF-8 validated
65//!
66//! # Architecture
67//!
68//! The ingest pipeline follows a strict data flow:
69//!
70//! 1. **Payload Requirements Check**: Verify source mandates are met
71//! 2. **Raw Size Validation**: Enforce `max_payload_bytes` limit
72//! 3. **Metadata Normalization**: Apply defaults, validate policies, sanitize
73//! 4. **Payload Normalization**: Decode UTF-8, collapse whitespace, preserve binary
74//! 5. **Normalized Size Validation**: Enforce `max_normalized_bytes` limit
75//! 6. **Canonical Record Construction**: Build deterministic output
76//!
77//! # Module Structure
78//!
79//! - `config`: Configuration types (`IngestConfig`, `MetadataPolicy`)
80//! - `error`: Error types (`IngestError`)
81//! - `types`: Data model (`RawIngestRecord`, `CanonicalIngestRecord`, etc.)
82//! - `metadata`: Metadata normalization and validation logic
83//! - `payload`: Payload validation and transformation utilities
84//!
85//! # Error Handling
86//!
87//! All errors are typed via [`IngestError`] for precise handling:
88//!
89//! ```rust
90//! use ingest::{ingest, IngestError};
91//!
92//! match ingest(record, &config) {
93//!     Ok(canonical) => process(canonical),
94//!     Err(IngestError::PayloadTooLarge(msg)) => {
95//!         eprintln!("Content too large: {}", msg);
96//!     }
97//!     Err(IngestError::InvalidUtf8(msg)) => {
98//!         eprintln!("Invalid encoding: {}", msg);
99//!     }
100//!     Err(e) => {
101//!         eprintln!("Ingest failed: {}", e);
102//!     }
103//! }
104//! ```
105//!
106//! # Configuration
107//!
108//! For production use, configure size limits and policies:
109//!
110//! ```rust
111//! use ingest::{IngestConfig, MetadataPolicy, RequiredField};
112//! use uuid::Uuid;
113//!
114//! let config = IngestConfig {
115//!     version: 1,
116//!     default_tenant_id: "default".to_string(),
117//!     doc_id_namespace: Uuid::new_v5(&Uuid::NAMESPACE_DNS, b"myapp.example.com"),
118//!     strip_control_chars: true,
119//!     metadata_policy: MetadataPolicy {
120//!         required_fields: vec![
121//!             RequiredField::TenantId,
122//!             RequiredField::DocId,
123//!         ],
124//!         max_attribute_bytes: Some(1024 * 1024), // 1 MB
125//!         reject_future_timestamps: true,
126//!     },
127//!     max_payload_bytes: Some(50 * 1024 * 1024),      // 50 MB raw
128//!     max_normalized_bytes: Some(10 * 1024 * 1024),   // 10 MB normalized
129//! };
130//!
131//! // Validate at startup
132//! config.validate().expect("Invalid configuration");
133//! ```
134//!
135//! # Performance
136//!
137//! - **Base overhead**: ~5-15μs for small payloads
138//! - **Text normalization**: O(n) where n = text length
139//! - **Memory**: Allocates new String during normalization
140//! - **Thread safety**: `ingest()` is pure and safe for parallel processing
141//!
142//! # Examples
143//!
144//! See the `examples/` directory for complete working examples:
145//! - `ingest_demo.rs`: Basic text ingestion
146//! - `batch_ingest.rs`: Processing multiple records
147//! - `size_limit_demo.rs`: Size limit enforcement demonstration
148//!
149//! # See Also
150//!
151//! - [Crate documentation](doc/ingest.md) for comprehensive guides
152//! - `config` module for configuration details
153//! - `types` module for data structure definitions
154
155use std::time::Instant;
156
157use tracing::{info, warn, Level};
158
159mod config;
160mod error;
161mod metadata;
162mod payload;
163mod types;
164
165use crate::metadata::normalize_metadata;
166
167pub use crate::config::{ConfigError, IngestConfig, MetadataPolicy, RequiredField};
168pub use crate::error::IngestError;
169pub use crate::payload::{
170    normalize_payload_option, payload_kind, payload_length, validate_payload_requirements,
171};
172pub use crate::types::{
173    CanonicalIngestRecord, CanonicalPayload, IngestMetadata, IngestPayload, IngestSource,
174    RawIngestRecord,
175};
176
177/// Ingests a raw record and produces a canonical, normalized record.
178///
179/// This is the primary entry point for the ingest pipeline. It validates the raw record,
180/// normalizes metadata and payload, and returns a deterministic `CanonicalIngestRecord`
181/// suitable for downstream processing.
182///
183/// # Arguments
184///
185/// * `raw` - The raw ingest record containing metadata and optional payload
186/// * `cfg` - Runtime configuration controlling validation and normalization behavior
187///
188/// # Returns
189///
190/// * `Ok(CanonicalIngestRecord)` - Successfully ingested and normalized record
191/// * `Err(IngestError)` - Validation or normalization failure with specific error type
192///
193/// # Errors
194///
195/// This function can return various [`IngestError`] variants:
196///
197/// * `MissingPayload` - Source requires a payload but none was provided
198/// * `EmptyBinaryPayload` - Binary payload has zero bytes
199/// * `InvalidMetadata(String)` - Metadata policy violation (required field missing, future timestamp, etc.)
200/// * `InvalidUtf8(String)` - `TextBytes` payload contains invalid UTF-8 sequences
201/// * `EmptyNormalizedText` - Text payload is empty after whitespace normalization
202/// * `PayloadTooLarge(String)` - Payload exceeds configured size limits
203///
204/// # Side Effects
205///
206/// * Emits structured tracing spans for observability
207/// * Records timing metrics for performance monitoring
208///
209/// # Examples
210///
211/// ## Basic Usage
212///
213/// ```rust
214/// use ingest::{
215///     ingest, IngestConfig, RawIngestRecord,
216///     IngestSource, IngestMetadata, IngestPayload
217/// };
218/// use chrono::Utc;
219///
220/// let config = IngestConfig::default();
221/// let record = RawIngestRecord {
222///     id: "my-doc-1".into(),
223///     source: IngestSource::RawText,
224///     metadata: IngestMetadata {
225///         tenant_id: Some("my-tenant".into()),
226///         doc_id: None, // Will be derived
227///         received_at: Some(Utc::now()),
228///         original_source: None,
229///         attributes: None,
230///     },
231///     payload: Some(IngestPayload::Text(
232///         "  Some text with   extra whitespace.  ".into()
233///     )),
234/// };
235///
236/// let canonical = ingest(record, &config).unwrap();
237/// assert_eq!(canonical.tenant_id, "my-tenant");
238/// // Note: doc_id is derived if not provided
239/// ```
240///
241/// ## Error Handling
242///
243/// ```rust
244/// use ingest::{ingest, IngestConfig, IngestError, IngestPayload, IngestSource};
245/// use ingest::{RawIngestRecord, IngestMetadata};
246///
247/// let config = IngestConfig::default();
248///
249/// // Invalid UTF-8 bytes
250/// let record = RawIngestRecord {
251///     id: "test".into(),
252///     source: IngestSource::RawText,
253///     metadata: IngestMetadata {
254///         tenant_id: Some("tenant".into()),
255///         doc_id: Some("doc".into()),
256///         received_at: None,
257///         original_source: None,
258///         attributes: None,
259///     },
260///     payload: Some(IngestPayload::TextBytes(vec![0xff, 0xfe])),
261/// };
262///
263/// match ingest(record, &config) {
264///     Err(IngestError::InvalidUtf8(_)) => println!("Invalid UTF-8 detected"),
265///     _ => println!("Other result"),
266/// }
267/// ```
268///
269/// # Performance
270///
271/// - Small text payloads: ~10-20μs
272/// - Large text payloads: scales linearly with size
273/// - Binary payloads: minimal overhead (size check only)
274pub fn ingest(
275    raw: RawIngestRecord,
276    cfg: &IngestConfig,
277) -> Result<CanonicalIngestRecord, IngestError> {
278    let start = Instant::now();
279    let RawIngestRecord {
280        id,
281        source,
282        metadata,
283        payload,
284    } = raw;
285
286    let tenant_hint = metadata.tenant_id.clone();
287    let doc_hint = metadata.doc_id.clone();
288
289    let record_id = match metadata::sanitize_required_field("id", id, cfg.strip_control_chars) {
290        Ok(id) => id,
291        Err(err) => {
292            let elapsed_micros = start.elapsed().as_micros();
293            warn!(error = %err, elapsed_micros, "ingest_failure");
294            return Err(err);
295        }
296    };
297
298    let span = tracing::span!(
299        Level::INFO,
300        "ingest.ingest",
301        record_id = %record_id,
302        source = ?source
303    );
304    let _guard = span.enter();
305
306    match ingest_inner(record_id.clone(), source, metadata, payload, cfg) {
307        Ok(record) => {
308            let elapsed_micros = start.elapsed().as_micros();
309            info!(
310                tenant_id = %record.tenant_id,
311                doc_id = %record.doc_id,
312                payload_kind = %payload_kind(record.normalized_payload.as_ref()),
313                normalized_len = payload_length(record.normalized_payload.as_ref()),
314                elapsed_micros,
315                "ingest_success"
316            );
317            Ok(record)
318        }
319        Err(err) => {
320            let elapsed_micros = start.elapsed().as_micros();
321            warn!(
322                tenant_id = ?tenant_hint,
323                doc_id = ?doc_hint,
324                error = %err,
325                elapsed_micros,
326                "ingest_failure"
327            );
328            Err(err)
329        }
330    }
331}
332
333/// Core ingest logic: validates payload requirements, normalizes metadata and payload.
334///
335/// This internal function performs the actual ingest work. It is separated from the
336/// public `ingest()` function to facilitate testing and to keep the observability
337/// wrapper clean.
338///
339/// # Arguments
340///
341/// * `record_id` - Sanitized unique identifier for this ingest operation
342/// * `source` - Source type (RawText, File, etc.)
343/// * `metadata` - Raw metadata to be normalized
344/// * `payload` - Optional raw payload
345/// * `cfg` - Configuration for validation and normalization
346///
347/// # Returns
348///
349/// Normalized `CanonicalIngestRecord` on success, `IngestError` on failure
350fn ingest_inner(
351    record_id: String,
352    source: IngestSource,
353    metadata: IngestMetadata,
354    payload: Option<IngestPayload>,
355    cfg: &IngestConfig,
356) -> Result<CanonicalIngestRecord, IngestError> {
357    // Some sources require a payload, so we check for that first.
358    validate_payload_requirements(&source, &payload)?;
359
360    // Reject oversized raw payloads before normalization.
361    if let Some(limit) = cfg.max_payload_bytes {
362        if let Some(ref p) = payload {
363            let len = match p {
364                IngestPayload::Text(s) => s.len(),
365                IngestPayload::TextBytes(b) => b.len(),
366                IngestPayload::Binary(b) => b.len(),
367            };
368            if len > limit {
369                return Err(IngestError::PayloadTooLarge(format!(
370                    "raw payload size {len} exceeds limit of {limit}"
371                )));
372            }
373        }
374    }
375
376    // Metadata is normalized and validated against the configured policies.
377    let normalized_metadata = normalize_metadata(metadata, cfg, &record_id)?;
378    // The payload is normalized based on its type (text or binary).
379    let normalized_payload = normalize_payload_option(&source, payload, cfg)?;
380
381    Ok(CanonicalIngestRecord {
382        id: record_id,
383        tenant_id: normalized_metadata.tenant_id,
384        doc_id: normalized_metadata.doc_id,
385        received_at: normalized_metadata.received_at,
386        original_source: normalized_metadata.original_source,
387        source,
388        normalized_payload,
389        attributes: normalized_metadata.attributes,
390    })
391}
392
393/// Normalizes text by collapsing repeated whitespace and trimming edges.
394///
395/// This function performs the following transformations:
396/// - Trims leading and trailing whitespace
397/// - Collapses multiple consecutive whitespace characters (spaces, tabs, newlines) into single spaces
398/// - Preserves Unicode characters (including emojis)
399/// - Handles all Unicode whitespace as defined by `char::is_whitespace()`
400///
401/// # Arguments
402///
403/// * `s` - The input string to normalize
404///
405/// # Returns
406///
407/// A new `String` with whitespace normalized. Returns empty string if input is whitespace-only.
408///
409/// # Examples
410///
411/// ```rust
412/// use ingest::normalize_payload;
413///
414/// // Collapse multiple spaces
415/// let result = normalize_payload("  Hello   world  ");
416/// assert_eq!(result, "Hello world");
417///
418/// // Handle newlines and tabs
419/// let result = normalize_payload("Line1\n\n\t\tLine2");
420/// assert_eq!(result, "Line1 Line2");
421///
422/// // Preserve Unicode
423/// let result = normalize_payload("  Hello 👋  world  ");
424/// assert_eq!(result, "Hello 👋 world");
425///
426/// // Empty result for whitespace-only input
427/// let result = normalize_payload("   \n\t   ");
428/// assert_eq!(result, "");
429/// ```
430///
431/// # Performance
432///
433/// - Time complexity: O(n) where n is the length of the input string
434/// - Space complexity: O(n) for the output string
435/// - Pre-allocates capacity equal to input length to minimize reallocations
436///
437/// # Use Cases
438///
439/// - Preparing text for fingerprinting (ensures whitespace differences don't affect matching)
440/// - Normalizing user input for storage
441/// - Cleaning scraped web content
442pub fn normalize_payload(s: &str) -> String {
443    let mut normalized = String::with_capacity(s.len());
444    for segment in s.split_whitespace() {
445        if !normalized.is_empty() {
446            normalized.push(' ');
447        }
448        normalized.push_str(segment);
449    }
450    normalized
451}
452
453#[cfg(test)]
454mod tests {
455    use chrono::{DateTime, Duration, NaiveDate, Utc};
456
457    use super::*;
458
459    fn fixed_timestamp() -> DateTime<Utc> {
460        let Some(date) = NaiveDate::from_ymd_opt(2024, 1, 1) else {
461            panic!("invalid date components");
462        };
463        let Some(date_time) = date.and_hms_opt(0, 0, 0) else {
464            panic!("invalid time components");
465        };
466        DateTime::<Utc>::from_naive_utc_and_offset(date_time, Utc)
467    }
468
469    fn base_metadata() -> IngestMetadata {
470        IngestMetadata {
471            tenant_id: Some("tenant1".into()),
472            doc_id: Some("doc-123".into()),
473            received_at: Some(fixed_timestamp()),
474            original_source: None,
475            attributes: None,
476        }
477    }
478
479    #[test]
480    fn test_normalize_payload() {
481        let cases = [
482            (
483                "  Hello\n\n   world\t this  is\n a test  ",
484                "Hello world this is a test",
485            ),
486            ("\n", ""),
487            ("emoji \u{1f600} test ", "emoji \u{1f600} test"),
488        ];
489
490        for (input, expected) in cases {
491            assert_eq!(normalize_payload(input), expected);
492        }
493    }
494
495    #[test]
496    fn test_ingest_rawtext_success() {
497        let record = RawIngestRecord {
498            id: "ingest-1".into(),
499            source: IngestSource::RawText,
500            metadata: base_metadata(),
501            payload: Some(IngestPayload::Text(" Hello   world \n ".into())),
502        };
503
504        let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
505        assert_eq!(rec.tenant_id, "tenant1");
506        assert_eq!(rec.doc_id, "doc-123");
507        match rec.normalized_payload {
508            Some(CanonicalPayload::Text(text)) => assert_eq!(text, "Hello world"),
509            _ => panic!("expected text payload"),
510        }
511    }
512
513    #[test]
514    fn test_ingest_missing_payload_for_rawtext() {
515        let record = RawIngestRecord {
516            id: "ingest-2".into(),
517            source: IngestSource::RawText,
518            metadata: base_metadata(),
519            payload: Some(IngestPayload::Text("   ".into())),
520        };
521
522        let res = ingest(record, &IngestConfig::default());
523        assert!(matches!(res, Err(IngestError::EmptyNormalizedText)));
524    }
525
526    #[test]
527    fn test_ingest_file_binary_payload() {
528        let record = RawIngestRecord {
529            id: "ingest-3".into(),
530            source: IngestSource::File {
531                filename: "image.png".into(),
532                content_type: Some("image/png".into()),
533            },
534            metadata: base_metadata(),
535            payload: Some(IngestPayload::Binary(vec![1, 2, 3, 4])),
536        };
537
538        let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
539        match rec.normalized_payload {
540            Some(CanonicalPayload::Binary(bytes)) => assert_eq!(bytes, vec![1, 2, 3, 4]),
541            _ => panic!("expected binary payload"),
542        }
543    }
544
545    #[test]
546    fn test_metadata_preserved() {
547        let record = RawIngestRecord {
548            id: "ingest-4".into(),
549            source: IngestSource::Api,
550            metadata: IngestMetadata {
551                tenant_id: Some("tenant-x".into()),
552                doc_id: Some("doc-y".into()),
553                received_at: Some(fixed_timestamp()),
554                original_source: Some("source-42".into()),
555                attributes: Some(serde_json::json!({"kind": "demo"})),
556            },
557            payload: None,
558        };
559
560        let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
561        assert_eq!(rec.tenant_id, "tenant-x");
562        assert_eq!(rec.doc_id, "doc-y");
563        assert_eq!(rec.original_source.as_deref(), Some("source-42"));
564        assert_eq!(rec.attributes, Some(serde_json::json!({"kind": "demo"})));
565        assert!(rec.normalized_payload.is_none());
566    }
567
568    #[test]
569    fn test_defaults_applied_when_metadata_missing() {
570        let record = RawIngestRecord {
571            id: "ingest-5".into(),
572            source: IngestSource::RawText,
573            metadata: IngestMetadata {
574                tenant_id: None,
575                doc_id: None,
576                received_at: None,
577                original_source: Some("\u{0007}source\n".into()),
578                attributes: None,
579            },
580            payload: Some(IngestPayload::Text("payload".into())),
581        };
582
583        let cfg = IngestConfig {
584            default_tenant_id: "fallback".into(),
585            ..Default::default()
586        };
587
588        let rec = ingest(record, &cfg).expect("ingest should succeed");
589        assert_eq!(rec.tenant_id, "fallback");
590        assert!(!rec.doc_id.is_empty());
591        assert!(rec.original_source.unwrap().contains("source"));
592    }
593
594    #[test]
595    fn test_doc_id_derivation_deterministic() {
596        let metadata = IngestMetadata {
597            tenant_id: None,
598            doc_id: None,
599            received_at: None,
600            original_source: None,
601            attributes: None,
602        };
603
604        let cfg = IngestConfig::default();
605        let record_a = RawIngestRecord {
606            id: "deterministic".into(),
607            source: IngestSource::RawText,
608            metadata: metadata.clone(),
609            payload: Some(IngestPayload::Text("payload".into())),
610        };
611        let record_b = RawIngestRecord {
612            id: "deterministic".into(),
613            source: IngestSource::RawText,
614            metadata,
615            payload: Some(IngestPayload::Text("payload".into())),
616        };
617
618        let rec_a = ingest(record_a, &cfg).expect("first ingest succeeds");
619        let rec_b = ingest(record_b, &cfg).expect("second ingest succeeds");
620
621        assert_eq!(rec_a.doc_id, rec_b.doc_id);
622    }
623
624    #[test]
625    fn test_invalid_utf8_payload_rejected() {
626        let record = RawIngestRecord {
627            id: "ingest-utf8".into(),
628            source: IngestSource::RawText,
629            metadata: base_metadata(),
630            payload: Some(IngestPayload::TextBytes(vec![0xff, 0xfe])),
631        };
632
633        let res = ingest(record, &IngestConfig::default());
634        assert!(matches!(res, Err(IngestError::InvalidUtf8(_))));
635    }
636
637    #[test]
638    fn test_control_chars_removed_from_metadata() {
639        let record = RawIngestRecord {
640            id: "ingest-ctrl".into(),
641            source: IngestSource::Api,
642            metadata: IngestMetadata {
643                tenant_id: Some("tenant\u{0003}".into()),
644                doc_id: Some("doc\n\r".into()),
645                received_at: None,
646                original_source: Some(" source\u{0008} ".into()),
647                attributes: None,
648            },
649            payload: None,
650        };
651
652        let rec = ingest(record, &IngestConfig::default()).expect("ingest should succeed");
653        assert_eq!(rec.tenant_id, "tenant");
654        assert_eq!(rec.doc_id, "doc");
655        assert_eq!(rec.original_source.as_deref(), Some("source"));
656    }
657
658    #[test]
659    fn required_tenant_id_enforced() {
660        let record = RawIngestRecord {
661            id: "ingest-required-tenant".into(),
662            source: IngestSource::RawText,
663            metadata: IngestMetadata {
664                tenant_id: None,
665                doc_id: Some("doc".into()),
666                received_at: Some(fixed_timestamp()),
667                original_source: None,
668                attributes: None,
669            },
670            payload: Some(IngestPayload::Text("payload".into())),
671        };
672
673        let cfg = IngestConfig {
674            metadata_policy: MetadataPolicy {
675                required_fields: vec![RequiredField::TenantId],
676                ..Default::default()
677            },
678            ..Default::default()
679        };
680
681        let res = ingest(record, &cfg);
682        assert!(matches!(res, Err(IngestError::InvalidMetadata(_))));
683    }
684
685    #[test]
686    fn future_timestamp_rejected() {
687        let future = Utc::now() + Duration::days(1);
688        let record = RawIngestRecord {
689            id: "ingest-future-ts".into(),
690            source: IngestSource::Api,
691            metadata: IngestMetadata {
692                tenant_id: Some("tenant".into()),
693                doc_id: Some("doc".into()),
694                received_at: Some(future),
695                original_source: None,
696                attributes: None,
697            },
698            payload: None,
699        };
700
701        let cfg = IngestConfig {
702            metadata_policy: MetadataPolicy {
703                reject_future_timestamps: true,
704                ..Default::default()
705            },
706            ..Default::default()
707        };
708
709        let res = ingest(record, &cfg);
710        assert!(matches!(res, Err(IngestError::InvalidMetadata(msg)) if msg.contains("future")));
711    }
712
713    #[test]
714    fn max_attribute_bytes_enforced() {
715        let record = RawIngestRecord {
716            id: "ingest-attrs".into(),
717            source: IngestSource::Api,
718            metadata: IngestMetadata {
719                tenant_id: Some("tenant".into()),
720                doc_id: Some("doc".into()),
721                received_at: Some(fixed_timestamp()),
722                original_source: None,
723                attributes: Some(serde_json::json!({
724                    "blob": "x".repeat(32)
725                })),
726            },
727            payload: None,
728        };
729
730        let cfg = IngestConfig {
731            metadata_policy: MetadataPolicy {
732                max_attribute_bytes: Some(16),
733                ..Default::default()
734            },
735            ..Default::default()
736        };
737
738        let res = ingest(record, &cfg);
739        assert!(
740            matches!(res, Err(IngestError::InvalidMetadata(msg)) if msg.contains("attributes exceed"))
741        );
742    }
743
744    #[test]
745    fn test_ingest_empty_binary_payload() {
746        let record = RawIngestRecord {
747            id: "ingest-empty-binary".into(),
748            source: IngestSource::File {
749                filename: "empty.bin".into(),
750                content_type: Some("application/octet-stream".into()),
751            },
752            metadata: base_metadata(),
753            payload: Some(IngestPayload::Binary(vec![])),
754        };
755
756        let res = ingest(record, &IngestConfig::default());
757        assert!(matches!(res, Err(IngestError::EmptyBinaryPayload)));
758    }
759
760    #[test]
761    fn max_payload_bytes_enforced_text() {
762        let record = RawIngestRecord {
763            id: "ingest-payload-limit".into(),
764            source: IngestSource::RawText,
765            metadata: base_metadata(),
766            payload: Some(IngestPayload::Text("x".repeat(17))),
767        };
768
769        let cfg = IngestConfig {
770            max_payload_bytes: Some(16),
771            ..Default::default()
772        };
773
774        let res = ingest(record, &cfg);
775        assert!(
776            matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("raw payload"))
777        );
778    }
779
780    #[test]
781    fn max_payload_bytes_enforced_bytes() {
782        let record = RawIngestRecord {
783            id: "ingest-payload-limit-bytes".into(),
784            source: IngestSource::RawText,
785            metadata: base_metadata(),
786            payload: Some(IngestPayload::TextBytes(vec![b'x'; 17])),
787        };
788
789        let cfg = IngestConfig {
790            max_payload_bytes: Some(16),
791            ..Default::default()
792        };
793
794        let res = ingest(record, &cfg);
795        assert!(
796            matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("raw payload"))
797        );
798    }
799
800    #[test]
801    fn max_payload_bytes_enforced_binary() {
802        let record = RawIngestRecord {
803            id: "ingest-payload-limit-binary".into(),
804            source: IngestSource::File {
805                filename: "large.bin".into(),
806                content_type: None,
807            },
808            metadata: base_metadata(),
809            payload: Some(IngestPayload::Binary(vec![0; 17])),
810        };
811
812        let cfg = IngestConfig {
813            max_payload_bytes: Some(16),
814            ..Default::default()
815        };
816
817        let res = ingest(record, &cfg);
818        assert!(
819            matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("raw payload"))
820        );
821    }
822
823    #[test]
824    fn max_normalized_bytes_enforced() {
825        let record = RawIngestRecord {
826            id: "ingest-norm-limit".into(),
827            source: IngestSource::RawText,
828            metadata: base_metadata(),
829            payload: Some(IngestPayload::Text("a ".repeat(9))), // Raw: 18 bytes
830        };
831
832        let cfg = IngestConfig {
833            max_payload_bytes: Some(20),
834            max_normalized_bytes: Some(16),
835            ..Default::default()
836        };
837
838        let res = ingest(record, &cfg);
839        // Normalizes to "a a a a a a a a a", which is 17 bytes long
840        assert!(
841            matches!(res, Err(IngestError::PayloadTooLarge(msg)) if msg.contains("normalized payload"))
842        );
843    }
844
845    #[test]
846    fn payload_size_limits_respected() {
847        let record = RawIngestRecord {
848            id: "ingest-limits-ok".into(),
849            source: IngestSource::RawText,
850            metadata: base_metadata(),
851            payload: Some(IngestPayload::Text(" data data ".into())), // Raw: 11, Normalized: 9
852        };
853
854        let cfg = IngestConfig {
855            max_payload_bytes: Some(12),
856            max_normalized_bytes: Some(10),
857            ..Default::default()
858        };
859
860        let res = ingest(record, &cfg);
861        assert!(res.is_ok());
862        let rec = res.unwrap();
863        assert_eq!(
864            rec.normalized_payload,
865            Some(CanonicalPayload::Text("data data".into()))
866        );
867    }
868}