Skip to main content

ingest

Function ingest 

Source
pub fn ingest(
    raw: RawIngestRecord,
    cfg: &IngestConfig,
) -> Result<CanonicalIngestRecord, IngestError>
Expand description

Ingests a raw record and produces a canonical, normalized record.

This is the primary entry point for the ingest pipeline. It validates the raw record, normalizes metadata and payload, and returns a deterministic CanonicalIngestRecord suitable for downstream processing.

§Arguments

  • raw - The raw ingest record containing metadata and optional payload
  • cfg - Runtime configuration controlling validation and normalization behavior

§Returns

  • Ok(CanonicalIngestRecord) - Successfully ingested and normalized record
  • Err(IngestError) - Validation or normalization failure with specific error type

§Errors

This function can return various IngestError variants:

  • MissingPayload - Source requires a payload but none was provided
  • EmptyBinaryPayload - Binary payload has zero bytes
  • InvalidMetadata(String) - Metadata policy violation (required field missing, future timestamp, etc.)
  • InvalidUtf8(String) - TextBytes payload contains invalid UTF-8 sequences
  • EmptyNormalizedText - Text payload is empty after whitespace normalization
  • PayloadTooLarge(String) - Payload exceeds configured size limits

§Side Effects

  • Emits structured tracing spans for observability
  • Records timing metrics for performance monitoring

§Examples

§Basic Usage

use ingest::{
    ingest, IngestConfig, RawIngestRecord,
    IngestSource, IngestMetadata, IngestPayload
};
use chrono::Utc;

let config = IngestConfig::default();
let record = RawIngestRecord {
    id: "my-doc-1".into(),
    source: IngestSource::RawText,
    metadata: IngestMetadata {
        tenant_id: Some("my-tenant".into()),
        doc_id: None, // Will be derived
        received_at: Some(Utc::now()),
        original_source: None,
        attributes: None,
    },
    payload: Some(IngestPayload::Text(
        "  Some text with   extra whitespace.  ".into()
    )),
};

let canonical = ingest(record, &config).unwrap();
assert_eq!(canonical.tenant_id, "my-tenant");
// Note: doc_id is derived if not provided

§Error Handling

use ingest::{ingest, IngestConfig, IngestError, IngestPayload, IngestSource};
use ingest::{RawIngestRecord, IngestMetadata};

let config = IngestConfig::default();

// Invalid UTF-8 bytes
let record = RawIngestRecord {
    id: "test".into(),
    source: IngestSource::RawText,
    metadata: IngestMetadata {
        tenant_id: Some("tenant".into()),
        doc_id: Some("doc".into()),
        received_at: None,
        original_source: None,
        attributes: None,
    },
    payload: Some(IngestPayload::TextBytes(vec![0xff, 0xfe])),
};

match ingest(record, &config) {
    Err(IngestError::InvalidUtf8(_)) => println!("Invalid UTF-8 detected"),
    _ => println!("Other result"),
}

§Performance

  • Small text payloads: ~10-20μs
  • Large text payloads: scales linearly with size
  • Binary payloads: minimal overhead (size check only)
Examples found in repository?
examples/ingest_demo.rs (line 33)
17fn main() {
18    let record = RawIngestRecord {
19        id: "ingest-demo".into(),
20        source: IngestSource::RawText,
21        metadata: IngestMetadata {
22            tenant_id: Some("tenant1".into()),
23            doc_id: Some("doc1".into()),
24            received_at: Some(fixed_timestamp()),
25            original_source: None,
26            attributes: None,
27        },
28        payload: Some(IngestPayload::Text(
29            "  Hello   world\nThis  is\tUC-FP  ".into(),
30        )),
31    };
32
33    match ingest(record, &IngestConfig::default()) {
34        Ok(rec) => {
35            let CanonicalIngestRecord { .. } = rec;
36            println!("{rec:#?}");
37        }
38        Err(err) => eprintln!("ingest failed: {err}"),
39    }
40}
More examples
Hide additional examples
examples/batch_ingest.rs (line 63)
17fn main() {
18    let fixtures: [RawIngestRecord; 3] = [
19        RawIngestRecord {
20            id: "text-001".into(),
21            source: IngestSource::RawText,
22            metadata: IngestMetadata {
23                tenant_id: Some("tenant-a".into()),
24                doc_id: Some("doc-1".into()),
25                received_at: Some(timestamp(0)),
26                original_source: Some("https://example.com/a".into()),
27                attributes: None,
28            },
29            payload: Some(IngestPayload::Text("  First   text\npayload ".into())),
30        },
31        RawIngestRecord {
32            id: "text-002".into(),
33            source: IngestSource::Url("https://example.com/b".into()),
34            metadata: IngestMetadata {
35                tenant_id: Some("tenant-a".into()),
36                doc_id: Some("doc-2".into()),
37                received_at: Some(timestamp(1)),
38                original_source: None,
39                attributes: Some(serde_json::json!({"topic": "news"})),
40            },
41            payload: Some(IngestPayload::Text("Second payload\twith spacing".into())),
42        },
43        RawIngestRecord {
44            id: "bin-001".into(),
45            source: IngestSource::File {
46                filename: "image.png".into(),
47                content_type: Some("image/png".into()),
48            },
49            metadata: IngestMetadata {
50                tenant_id: Some("tenant-b".into()),
51                doc_id: Some("image-1".into()),
52                received_at: Some(timestamp(2)),
53                original_source: None,
54                attributes: Some(serde_json::json!({"kind": "thumbnail"})),
55            },
56            payload: Some(IngestPayload::Binary(vec![0, 1, 2, 3])),
57        },
58    ];
59
60    let cfg = IngestConfig::default();
61
62    for record in fixtures {
63        match ingest(record, &cfg) {
64            Ok(CanonicalIngestRecord {
65                normalized_payload, ..
66            }) => match normalized_payload {
67                Some(CanonicalPayload::Text(text)) => {
68                    println!("text payload -> \"{text}\"");
69                }
70                Some(CanonicalPayload::Binary(bytes)) => {
71                    println!("binary payload -> {} bytes", bytes.len());
72                }
73                None => println!("no payload provided"),
74                Some(_) => println!("unsupported payload variant"),
75            },
76            Err(err) => eprintln!("ingest failed: {err}"),
77        }
78    }
79}
examples/size_limit_demo.rs (line 27)
3fn main() {
4    println!("--- Demonstrating Payload Size Limit Policies ---");
5
6    let cfg = IngestConfig {
7        max_payload_bytes: Some(32),
8        max_normalized_bytes: Some(20),
9        ..Default::default()
10    };
11
12    // --- Case 1: Payload within all limits ---
13    println!("\n1. Ingesting a payload that is within all size limits...");
14    let record1 = RawIngestRecord {
15        id: "demo-1-success".into(),
16        source: IngestSource::RawText,
17        metadata: IngestMetadata {
18            tenant_id: None,
19            doc_id: None,
20            received_at: None,
21            original_source: None,
22            attributes: None,
23        },
24        payload: Some(IngestPayload::Text("  valid payload data  ".into())), // Raw: 22 bytes, Norm: 18
25    };
26
27    match ingest(record1, &cfg) {
28        Ok(rec) => println!(
29            " -> Success! Normalized payload: {:?}",
30            rec.normalized_payload.unwrap()
31        ),
32        Err(err) => eprintln!(" -> Unexpected Error: {err}"),
33    }
34
35    // --- Case 2: Raw payload exceeds max_payload_bytes ---
36    println!("\n2. Ingesting a payload that exceeds the raw size limit...");
37    let record2 = RawIngestRecord {
38        id: "demo-2-raw-limit".into(),
39        source: IngestSource::RawText,
40        metadata: IngestMetadata {
41            tenant_id: None,
42            doc_id: None,
43            received_at: None,
44            original_source: None,
45            attributes: None,
46        },
47        payload: Some(IngestPayload::Text(
48            "this raw payload is definitely way too long".into(),
49        )), // 43 bytes
50    };
51
52    match ingest(record2, &cfg) {
53        Ok(_) => eprintln!(" -> Unexpected Success!"),
54        Err(err) => println!(" -> Success! Caught expected error: {err}"),
55    }
56
57    // --- Case 3: Normalized payload exceeds max_normalized_bytes ---
58    println!("\n3. Ingesting a payload that exceeds the normalized size limit...");
59    let record3 = RawIngestRecord {
60        id: "demo-3-norm-limit".into(),
61        source: IngestSource::RawText,
62        metadata: IngestMetadata {
63            tenant_id: None,
64            doc_id: None,
65            received_at: None,
66            original_source: None,
67            attributes: None,
68        },
69        payload: Some(IngestPayload::Text("short raw, but long normalized".into())), // Raw: 30 bytes, Norm: 29
70    };
71
72    match ingest(record3, &cfg) {
73        Ok(_) => eprintln!(" -> Unexpected Success!"),
74        Err(err) => println!(" -> Success! Caught expected error: {err}"),
75    }
76}