Skip to main content

batch_ingest/
batch_ingest.rs

1use chrono::{DateTime, NaiveDate, Utc};
2use ingest::{
3    ingest, CanonicalIngestRecord, CanonicalPayload, IngestConfig, IngestMetadata, IngestPayload,
4    IngestSource, RawIngestRecord,
5};
6
7fn timestamp(hour: u32) -> DateTime<Utc> {
8    let Some(date) = NaiveDate::from_ymd_opt(2024, 1, 1) else {
9        panic!("invalid date components");
10    };
11    let Some(dt) = date.and_hms_opt(hour, 0, 0) else {
12        panic!("invalid time components");
13    };
14    DateTime::<Utc>::from_naive_utc_and_offset(dt, Utc)
15}
16
17fn main() {
18    let fixtures: [RawIngestRecord; 3] = [
19        RawIngestRecord {
20            id: "text-001".into(),
21            source: IngestSource::RawText,
22            metadata: IngestMetadata {
23                tenant_id: Some("tenant-a".into()),
24                doc_id: Some("doc-1".into()),
25                received_at: Some(timestamp(0)),
26                original_source: Some("https://example.com/a".into()),
27                attributes: None,
28            },
29            payload: Some(IngestPayload::Text("  First   text\npayload ".into())),
30        },
31        RawIngestRecord {
32            id: "text-002".into(),
33            source: IngestSource::Url("https://example.com/b".into()),
34            metadata: IngestMetadata {
35                tenant_id: Some("tenant-a".into()),
36                doc_id: Some("doc-2".into()),
37                received_at: Some(timestamp(1)),
38                original_source: None,
39                attributes: Some(serde_json::json!({"topic": "news"})),
40            },
41            payload: Some(IngestPayload::Text("Second payload\twith spacing".into())),
42        },
43        RawIngestRecord {
44            id: "bin-001".into(),
45            source: IngestSource::File {
46                filename: "image.png".into(),
47                content_type: Some("image/png".into()),
48            },
49            metadata: IngestMetadata {
50                tenant_id: Some("tenant-b".into()),
51                doc_id: Some("image-1".into()),
52                received_at: Some(timestamp(2)),
53                original_source: None,
54                attributes: Some(serde_json::json!({"kind": "thumbnail"})),
55            },
56            payload: Some(IngestPayload::Binary(vec![0, 1, 2, 3])),
57        },
58    ];
59
60    let cfg = IngestConfig::default();
61
62    for record in fixtures {
63        match ingest(record, &cfg) {
64            Ok(CanonicalIngestRecord {
65                normalized_payload, ..
66            }) => match normalized_payload {
67                Some(CanonicalPayload::Text(text)) => {
68                    println!("text payload -> \"{text}\"");
69                }
70                Some(CanonicalPayload::Binary(bytes)) => {
71                    println!("binary payload -> {} bytes", bytes.len());
72                }
73                None => println!("no payload provided"),
74                Some(_) => println!("unsupported payload variant"),
75            },
76            Err(err) => eprintln!("ingest failed: {err}"),
77        }
78    }
79}