pub fn ingest(
raw: RawIngestRecord,
cfg: &IngestConfig,
) -> Result<CanonicalIngestRecord, IngestError>Expand description
Ingests a raw record and produces a canonical, normalized record.
This is the primary entry point for the ingest pipeline. It validates the raw record,
normalizes metadata and payload, and returns a deterministic CanonicalIngestRecord
suitable for downstream processing.
§Arguments
raw- The raw ingest record containing metadata and optional payloadcfg- Runtime configuration controlling validation and normalization behavior
§Returns
Ok(CanonicalIngestRecord)- Successfully ingested and normalized recordErr(IngestError)- Validation or normalization failure with specific error type
§Errors
This function can return various IngestError variants:
MissingPayload- Source requires a payload but none was providedEmptyBinaryPayload- Binary payload has zero bytesInvalidMetadata(String)- Metadata policy violation (required field missing, future timestamp, etc.)InvalidUtf8(String)-TextBytespayload contains invalid UTF-8 sequencesEmptyNormalizedText- Text payload is empty after whitespace normalizationPayloadTooLarge(String)- Payload exceeds configured size limits
§Side Effects
- Emits structured tracing spans for observability
- Records timing metrics for performance monitoring
§Examples
§Basic Usage
use ingest::{
ingest, IngestConfig, RawIngestRecord,
IngestSource, IngestMetadata, IngestPayload
};
use chrono::Utc;
let config = IngestConfig::default();
let record = RawIngestRecord {
id: "my-doc-1".into(),
source: IngestSource::RawText,
metadata: IngestMetadata {
tenant_id: Some("my-tenant".into()),
doc_id: None, // Will be derived
received_at: Some(Utc::now()),
original_source: None,
attributes: None,
},
payload: Some(IngestPayload::Text(
" Some text with extra whitespace. ".into()
)),
};
let canonical = ingest(record, &config).unwrap();
assert_eq!(canonical.tenant_id, "my-tenant");
// Note: doc_id is derived if not provided§Error Handling
use ingest::{ingest, IngestConfig, IngestError, IngestPayload, IngestSource};
use ingest::{RawIngestRecord, IngestMetadata};
let config = IngestConfig::default();
// Invalid UTF-8 bytes
let record = RawIngestRecord {
id: "test".into(),
source: IngestSource::RawText,
metadata: IngestMetadata {
tenant_id: Some("tenant".into()),
doc_id: Some("doc".into()),
received_at: None,
original_source: None,
attributes: None,
},
payload: Some(IngestPayload::TextBytes(vec![0xff, 0xfe])),
};
match ingest(record, &config) {
Err(IngestError::InvalidUtf8(_)) => println!("Invalid UTF-8 detected"),
_ => println!("Other result"),
}§Performance
- Small text payloads: ~10-20μs
- Large text payloads: scales linearly with size
- Binary payloads: minimal overhead (size check only)
Examples found in repository?
examples/ingest_demo.rs (line 33)
17fn main() {
18 let record = RawIngestRecord {
19 id: "ingest-demo".into(),
20 source: IngestSource::RawText,
21 metadata: IngestMetadata {
22 tenant_id: Some("tenant1".into()),
23 doc_id: Some("doc1".into()),
24 received_at: Some(fixed_timestamp()),
25 original_source: None,
26 attributes: None,
27 },
28 payload: Some(IngestPayload::Text(
29 " Hello world\nThis is\tUC-FP ".into(),
30 )),
31 };
32
33 match ingest(record, &IngestConfig::default()) {
34 Ok(rec) => {
35 let CanonicalIngestRecord { .. } = rec;
36 println!("{rec:#?}");
37 }
38 Err(err) => eprintln!("ingest failed: {err}"),
39 }
40}More examples
examples/batch_ingest.rs (line 63)
17fn main() {
18 let fixtures: [RawIngestRecord; 3] = [
19 RawIngestRecord {
20 id: "text-001".into(),
21 source: IngestSource::RawText,
22 metadata: IngestMetadata {
23 tenant_id: Some("tenant-a".into()),
24 doc_id: Some("doc-1".into()),
25 received_at: Some(timestamp(0)),
26 original_source: Some("https://example.com/a".into()),
27 attributes: None,
28 },
29 payload: Some(IngestPayload::Text(" First text\npayload ".into())),
30 },
31 RawIngestRecord {
32 id: "text-002".into(),
33 source: IngestSource::Url("https://example.com/b".into()),
34 metadata: IngestMetadata {
35 tenant_id: Some("tenant-a".into()),
36 doc_id: Some("doc-2".into()),
37 received_at: Some(timestamp(1)),
38 original_source: None,
39 attributes: Some(serde_json::json!({"topic": "news"})),
40 },
41 payload: Some(IngestPayload::Text("Second payload\twith spacing".into())),
42 },
43 RawIngestRecord {
44 id: "bin-001".into(),
45 source: IngestSource::File {
46 filename: "image.png".into(),
47 content_type: Some("image/png".into()),
48 },
49 metadata: IngestMetadata {
50 tenant_id: Some("tenant-b".into()),
51 doc_id: Some("image-1".into()),
52 received_at: Some(timestamp(2)),
53 original_source: None,
54 attributes: Some(serde_json::json!({"kind": "thumbnail"})),
55 },
56 payload: Some(IngestPayload::Binary(vec![0, 1, 2, 3])),
57 },
58 ];
59
60 let cfg = IngestConfig::default();
61
62 for record in fixtures {
63 match ingest(record, &cfg) {
64 Ok(CanonicalIngestRecord {
65 normalized_payload, ..
66 }) => match normalized_payload {
67 Some(CanonicalPayload::Text(text)) => {
68 println!("text payload -> \"{text}\"");
69 }
70 Some(CanonicalPayload::Binary(bytes)) => {
71 println!("binary payload -> {} bytes", bytes.len());
72 }
73 None => println!("no payload provided"),
74 Some(_) => println!("unsupported payload variant"),
75 },
76 Err(err) => eprintln!("ingest failed: {err}"),
77 }
78 }
79}examples/size_limit_demo.rs (line 27)
3fn main() {
4 println!("--- Demonstrating Payload Size Limit Policies ---");
5
6 let cfg = IngestConfig {
7 max_payload_bytes: Some(32),
8 max_normalized_bytes: Some(20),
9 ..Default::default()
10 };
11
12 // --- Case 1: Payload within all limits ---
13 println!("\n1. Ingesting a payload that is within all size limits...");
14 let record1 = RawIngestRecord {
15 id: "demo-1-success".into(),
16 source: IngestSource::RawText,
17 metadata: IngestMetadata {
18 tenant_id: None,
19 doc_id: None,
20 received_at: None,
21 original_source: None,
22 attributes: None,
23 },
24 payload: Some(IngestPayload::Text(" valid payload data ".into())), // Raw: 22 bytes, Norm: 18
25 };
26
27 match ingest(record1, &cfg) {
28 Ok(rec) => println!(
29 " -> Success! Normalized payload: {:?}",
30 rec.normalized_payload.unwrap()
31 ),
32 Err(err) => eprintln!(" -> Unexpected Error: {err}"),
33 }
34
35 // --- Case 2: Raw payload exceeds max_payload_bytes ---
36 println!("\n2. Ingesting a payload that exceeds the raw size limit...");
37 let record2 = RawIngestRecord {
38 id: "demo-2-raw-limit".into(),
39 source: IngestSource::RawText,
40 metadata: IngestMetadata {
41 tenant_id: None,
42 doc_id: None,
43 received_at: None,
44 original_source: None,
45 attributes: None,
46 },
47 payload: Some(IngestPayload::Text(
48 "this raw payload is definitely way too long".into(),
49 )), // 43 bytes
50 };
51
52 match ingest(record2, &cfg) {
53 Ok(_) => eprintln!(" -> Unexpected Success!"),
54 Err(err) => println!(" -> Success! Caught expected error: {err}"),
55 }
56
57 // --- Case 3: Normalized payload exceeds max_normalized_bytes ---
58 println!("\n3. Ingesting a payload that exceeds the normalized size limit...");
59 let record3 = RawIngestRecord {
60 id: "demo-3-norm-limit".into(),
61 source: IngestSource::RawText,
62 metadata: IngestMetadata {
63 tenant_id: None,
64 doc_id: None,
65 received_at: None,
66 original_source: None,
67 attributes: None,
68 },
69 payload: Some(IngestPayload::Text("short raw, but long normalized".into())), // Raw: 30 bytes, Norm: 29
70 };
71
72 match ingest(record3, &cfg) {
73 Ok(_) => eprintln!(" -> Unexpected Success!"),
74 Err(err) => println!(" -> Success! Caught expected error: {err}"),
75 }
76}