Skip to main content

courier/
envelope.rs

1use std::collections::HashMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7/// The single wire type flowing between all pipeline nodes.
8///
9/// Generics stop at the node boundary: every `Source`, `Transform`, and `Sink`
10/// works on `Envelope` regardless of the underlying schema. Strongly-typed
11/// payloads are opt-in via transforms that deserialize, map, and re-serialize.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct Envelope {
14    pub meta: Meta,
15    pub payload: Value,
16}
17
18/// Per-envelope metadata.
19///
20/// `key` is populated by the source when available (e.g., Kafka record key)
21/// or by a transform like `SetKeyTransform`. Sinks that require a key (like
22/// `KafkaSink`) fall back to `source_id` when `key` is `None`.
23#[derive(Debug, Clone, Default, Serialize, Deserialize)]
24pub struct Meta {
25    pub key: Option<String>,
26    pub source_id: String,
27    pub timestamp_ms: u64,
28    pub headers: HashMap<String, String>,
29}
30
31impl Envelope {
32    pub fn new(source_id: impl Into<String>, payload: Value) -> Self {
33        Self {
34            meta: Meta {
35                source_id: source_id.into(),
36                timestamp_ms: now_ms(),
37                ..Meta::default()
38            },
39            payload,
40        }
41    }
42}
43
44fn now_ms() -> u64 {
45    SystemTime::now()
46        .duration_since(SystemTime::UNIX_EPOCH)
47        .map(|d| d.as_millis() as u64)
48        .unwrap_or(0)
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use serde_json::json;
55
56    #[test]
57    fn new_sets_source_id_and_defaults() {
58        let e = Envelope::new("src", json!({ "a": 1 }));
59        assert_eq!(e.meta.source_id, "src");
60        assert!(e.meta.key.is_none());
61        assert!(e.meta.headers.is_empty());
62        assert!(e.meta.timestamp_ms > 0);
63        assert_eq!(e.payload, json!({ "a": 1 }));
64    }
65}