Skip to main content

danube_connect_core/message/
context.rs

1use danube_client::SchemaInfo;
2use std::collections::HashMap;
3
4/// Routing metadata shared by source and sink records.
5#[derive(Debug, Clone, Copy)]
6pub struct RoutingContext<'a> {
7    topic: &'a str,
8    key: Option<&'a str>,
9    partition: Option<&'a str>,
10    attributes: &'a HashMap<String, String>,
11}
12
13impl<'a> RoutingContext<'a> {
14    /// Create a routing context from its raw topic, key, partition, and attributes.
15    pub fn new(
16        topic: &'a str,
17        key: Option<&'a str>,
18        partition: Option<&'a str>,
19        attributes: &'a HashMap<String, String>,
20    ) -> Self {
21        Self {
22            topic,
23            key,
24            partition,
25            attributes,
26        }
27    }
28
29    /// Return the logical Danube topic associated with the record.
30    pub fn topic(&self) -> &'a str {
31        self.topic
32    }
33
34    /// Return the routing key used for partitioned publishing, if present.
35    pub fn key(&self) -> Option<&'a str> {
36        self.key
37    }
38
39    /// Return the partition identifier for consumed records, if known.
40    pub fn partition(&self) -> Option<&'a str> {
41        self.partition
42    }
43
44    /// Return the user-defined record attributes.
45    pub fn attributes(&self) -> &'a HashMap<String, String> {
46        self.attributes
47    }
48}
49
50/// Full record context combining routing data with source metadata.
51#[derive(Debug, Clone, Copy)]
52pub struct RecordContext<'a> {
53    routing: RoutingContext<'a>,
54    publish_time: Option<u64>,
55    producer_name: Option<&'a str>,
56    schema: Option<&'a SchemaInfo>,
57}
58
59impl<'a> RecordContext<'a> {
60    /// Create a record context from routing metadata and optional Danube metadata.
61    pub fn new(
62        routing: RoutingContext<'a>,
63        publish_time: Option<u64>,
64        producer_name: Option<&'a str>,
65        schema: Option<&'a SchemaInfo>,
66    ) -> Self {
67        Self {
68            routing,
69            publish_time,
70            producer_name,
71            schema,
72        }
73    }
74
75    /// Return the routing portion of this record context.
76    pub fn routing(&self) -> RoutingContext<'a> {
77        self.routing
78    }
79
80    /// Return the logical topic associated with the record.
81    pub fn topic(&self) -> &'a str {
82        self.routing.topic()
83    }
84
85    /// Return the routing key used for partitioned publishing, if present.
86    pub fn key(&self) -> Option<&'a str> {
87        self.routing.key()
88    }
89
90    /// Return the consumed partition identifier, if known.
91    pub fn partition(&self) -> Option<&'a str> {
92        self.routing.partition()
93    }
94
95    /// Return the user-defined attributes carried by the record.
96    pub fn attributes(&self) -> &'a HashMap<String, String> {
97        self.routing.attributes()
98    }
99
100    /// Return the Danube publish timestamp in microseconds since epoch, if available.
101    pub fn publish_time(&self) -> Option<u64> {
102        self.publish_time
103    }
104
105    /// Return the producer name that originally published the record, if available.
106    pub fn producer_name(&self) -> Option<&'a str> {
107        self.producer_name
108    }
109
110    /// Return schema metadata associated with the record, if available.
111    pub fn schema(&self) -> Option<&'a SchemaInfo> {
112        self.schema
113    }
114}