azure_functions/bindings/
event_hub_trigger.rs

1use crate::{
2    bindings::EventHubMessage,
3    event_hub::{PartitionContext, SystemProperties},
4    rpc::{typed_data::Data, TypedData},
5    util::convert_from,
6};
7use chrono::{DateTime, Utc};
8use serde_json::{from_str, Value};
9use std::collections::HashMap;
10
11const PARTITION_CONTEXT_KEY: &str = "PartitionContext";
12const ENQUEUED_TIME_KEY: &str = "EnqueuedTimeUtc";
13const OFFSET_KEY: &str = "Offset";
14const PROPERTIES_KEY: &str = "Properties";
15const SEQUENCE_NUMBER_KEY: &str = "SequenceNumber";
16const SYSTEM_PROPERTIES_KEY: &str = "SystemProperties";
17
18/// Represents an Event Hub trigger binding.
19///
20/// The following binding attributes are supported:
21///
22/// | Name             | Description                                                                                                                                                                      |
23/// |------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
24/// | `name`           | The name of the parameter being bound.                                                                                                                                           |
25/// | `event_hub_name` | The name of the event hub. When the event hub name is also present in the connection string, that value overrides this property at runtime.                                      |
26/// | `connection`     | The name of an app setting that contains the connection string to the event hub's namespace. This connection string must have at least read permissions to activate the trigger. |
27/// | `consumer_group` | An optional property that sets the consumer group used to subscribe to events in the hub. If omitted, the `$Default` consumer group is used.                                     |
28///
29/// # Examples
30///
31/// ```rust
32/// use azure_functions::{
33///     bindings::EventHubTrigger,
34///     func,
35/// };
36/// use log::info;
37///
38/// #[func]
39/// #[binding(name = "trigger", connection = "my_connection")]
40/// pub fn log_event(trigger: EventHubTrigger) {
41///     log::info!("{:?}", trigger);
42/// }
43/// ```
44#[derive(Debug)]
45pub struct EventHubTrigger {
46    /// The Event Hub message that triggered the function.
47    pub message: EventHubMessage,
48    /// The partition context information.
49    pub partition_context: PartitionContext,
50    /// The enqueued time in UTC.
51    pub enqueued_time: DateTime<Utc>,
52    /// The offset of the data relative to the Event Hub partition stream.
53    pub offset: String,
54    /// The user properties of the event data.
55    pub properties: Value,
56    /// The logical sequence number of the event.
57    pub sequence_number: i64,
58    /// The system properties of the event data.
59    pub system_properties: SystemProperties,
60}
61
62impl EventHubTrigger {
63    #[doc(hidden)]
64    pub fn new(data: TypedData, mut metadata: HashMap<String, TypedData>) -> Self {
65        EventHubTrigger {
66            message: data.into(),
67            partition_context: from_str(
68                match &metadata
69                    .get(PARTITION_CONTEXT_KEY)
70                    .expect("expected partition context")
71                    .data
72                {
73                    Some(Data::Json(s)) => s,
74                    _ => panic!("expected JSON data for partition context"),
75                },
76            )
77            .expect("failed to deserialize partition context"),
78            enqueued_time: convert_from(
79                metadata
80                    .get(ENQUEUED_TIME_KEY)
81                    .expect("expected enqueued time"),
82            )
83            .expect("failed to convert enqueued time"),
84            offset: metadata
85                .remove(OFFSET_KEY)
86                .map(|offset| match offset.data {
87                    Some(Data::String(s)) => s,
88                    _ => panic!("expected a string for offset"),
89                })
90                .expect("expected offset"),
91            properties: from_str(
92                match &metadata
93                    .get(PROPERTIES_KEY)
94                    .expect("expected properties")
95                    .data
96                {
97                    Some(Data::Json(s)) => s,
98                    _ => panic!("expected JSON data for properties"),
99                },
100            )
101            .expect("failed to deserialize properties"),
102            sequence_number: convert_from(
103                metadata
104                    .get(SEQUENCE_NUMBER_KEY)
105                    .expect("expected sequence number"),
106            )
107            .expect("failed to convert sequence number"),
108            system_properties: from_str(
109                match &metadata
110                    .get(SYSTEM_PROPERTIES_KEY)
111                    .expect("expected system properties")
112                    .data
113                {
114                    Some(Data::Json(s)) => s,
115                    _ => panic!("expected JSON data for system properties"),
116                },
117            )
118            .expect("failed to deserialize system properties"),
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::event_hub::RuntimeInformation;
127    use serde_json::json;
128    use std::str::FromStr;
129
130    #[test]
131    fn it_constructs() {
132        const MESSAGE: &str = "hello world!";
133        const ENQUEUED_TIME: &str = "2018-07-25T06:24:00+00:00";
134        const RETRIEVAL_TIME: &str = "0001-01-01T00:00:00Z";
135        const OFFSET: &str = "98765";
136        const SEQUENCE_NUMBER: i64 = 12345;
137        const PARTITION_ID: &str = "1";
138        const OWNER: &str = "358d9b05-56fe-4549-bafb-e0e102b29b05";
139        const EVENT_HUB_PATH: &str = "my_event_hub";
140        const CONSUMER_GROUP: &str = "$Default";
141        const USER_PROPERTY_NAME: &str = "property name";
142        const USER_PROPERTY_VALUE: &str = "property value";
143        const PARTITION_KEY: &str = "partition key";
144
145        let data = TypedData {
146            data: Some(Data::String(MESSAGE.to_string())),
147        };
148
149        let mut metadata = HashMap::new();
150
151        let context = PartitionContext {
152            consumer_group_name: CONSUMER_GROUP.to_string(),
153            event_hub_path: EVENT_HUB_PATH.to_string(),
154            partition_id: PARTITION_ID.to_string(),
155            owner: OWNER.to_string(),
156            runtime_information: RuntimeInformation {
157                partition_id: PARTITION_ID.to_string(),
158                last_sequence_number: SEQUENCE_NUMBER,
159                last_enqueued_time: DateTime::<Utc>::from_str(ENQUEUED_TIME).unwrap(),
160                last_enqueued_offset: Some(OFFSET.to_string()),
161                retrieval_time: DateTime::<Utc>::from_str(RETRIEVAL_TIME).unwrap(),
162            },
163        };
164
165        let properties = json!({ USER_PROPERTY_NAME: USER_PROPERTY_VALUE });
166
167        let system_properties = SystemProperties {
168            sequence_number: SEQUENCE_NUMBER,
169            offset: OFFSET.to_string(),
170            partition_key: Some(PARTITION_KEY.to_string()),
171            enqueued_time: DateTime::<Utc>::from_str(ENQUEUED_TIME).unwrap(),
172        };
173
174        metadata.insert(
175            PARTITION_CONTEXT_KEY.to_string(),
176            TypedData {
177                data: Some(Data::Json(serde_json::to_string(&context).unwrap())),
178            },
179        );
180        metadata.insert(
181            ENQUEUED_TIME_KEY.to_string(),
182            TypedData {
183                data: Some(Data::String(ENQUEUED_TIME.to_string())),
184            },
185        );
186        metadata.insert(
187            OFFSET_KEY.to_string(),
188            TypedData {
189                data: Some(Data::String(OFFSET.to_string())),
190            },
191        );
192        metadata.insert(
193            PROPERTIES_KEY.to_string(),
194            TypedData {
195                data: Some(Data::Json(properties.to_string())),
196            },
197        );
198        metadata.insert(
199            SEQUENCE_NUMBER_KEY.to_string(),
200            TypedData {
201                data: Some(Data::Int(SEQUENCE_NUMBER)),
202            },
203        );
204        metadata.insert(
205            SYSTEM_PROPERTIES_KEY.to_string(),
206            TypedData {
207                data: Some(Data::Json(
208                    serde_json::to_string(&system_properties).unwrap(),
209                )),
210            },
211        );
212
213        let trigger = EventHubTrigger::new(data, metadata);
214
215        assert_eq!(trigger.message.as_str().unwrap(), MESSAGE);
216        assert_eq!(
217            trigger.partition_context.consumer_group_name,
218            CONSUMER_GROUP
219        );
220        assert_eq!(trigger.partition_context.event_hub_path, EVENT_HUB_PATH);
221        assert_eq!(trigger.partition_context.partition_id, PARTITION_ID);
222        assert_eq!(trigger.partition_context.owner, OWNER);
223        assert_eq!(
224            trigger.partition_context.runtime_information.partition_id,
225            PARTITION_ID
226        );
227        assert_eq!(
228            trigger
229                .partition_context
230                .runtime_information
231                .last_sequence_number,
232            SEQUENCE_NUMBER
233        );
234        assert_eq!(
235            trigger
236                .partition_context
237                .runtime_information
238                .last_enqueued_time
239                .to_rfc3339(),
240            ENQUEUED_TIME
241        );
242        assert_eq!(trigger.enqueued_time.to_rfc3339(), ENQUEUED_TIME);
243        assert_eq!(trigger.offset, OFFSET);
244        assert_eq!(
245            trigger.properties,
246            json! {{ USER_PROPERTY_NAME: USER_PROPERTY_VALUE }}
247        );
248        assert_eq!(trigger.sequence_number, SEQUENCE_NUMBER);
249        assert_eq!(trigger.system_properties.sequence_number, SEQUENCE_NUMBER);
250        assert_eq!(trigger.system_properties.offset, OFFSET);
251        assert_eq!(
252            trigger.system_properties.partition_key.unwrap(),
253            PARTITION_KEY
254        );
255        assert_eq!(
256            trigger.system_properties.enqueued_time.to_rfc3339(),
257            ENQUEUED_TIME
258        );
259    }
260}