azure_functions/bindings/
event_hub_trigger.rs1use crate::{
2 bindings::EventHubMessage,
3 event_hub::{PartitionContext, SystemProperties},
4 rpc::{typed_data::Data, TypedData},
5 util::convert_from,
6};
7use chrono::{DateTime, Utc};
8use serde_json::{from_str, Value};
9use std::collections::HashMap;
10
11const PARTITION_CONTEXT_KEY: &str = "PartitionContext";
12const ENQUEUED_TIME_KEY: &str = "EnqueuedTimeUtc";
13const OFFSET_KEY: &str = "Offset";
14const PROPERTIES_KEY: &str = "Properties";
15const SEQUENCE_NUMBER_KEY: &str = "SequenceNumber";
16const SYSTEM_PROPERTIES_KEY: &str = "SystemProperties";
17
18#[derive(Debug)]
45pub struct EventHubTrigger {
46 pub message: EventHubMessage,
48 pub partition_context: PartitionContext,
50 pub enqueued_time: DateTime<Utc>,
52 pub offset: String,
54 pub properties: Value,
56 pub sequence_number: i64,
58 pub system_properties: SystemProperties,
60}
61
62impl EventHubTrigger {
63 #[doc(hidden)]
64 pub fn new(data: TypedData, mut metadata: HashMap<String, TypedData>) -> Self {
65 EventHubTrigger {
66 message: data.into(),
67 partition_context: from_str(
68 match &metadata
69 .get(PARTITION_CONTEXT_KEY)
70 .expect("expected partition context")
71 .data
72 {
73 Some(Data::Json(s)) => s,
74 _ => panic!("expected JSON data for partition context"),
75 },
76 )
77 .expect("failed to deserialize partition context"),
78 enqueued_time: convert_from(
79 metadata
80 .get(ENQUEUED_TIME_KEY)
81 .expect("expected enqueued time"),
82 )
83 .expect("failed to convert enqueued time"),
84 offset: metadata
85 .remove(OFFSET_KEY)
86 .map(|offset| match offset.data {
87 Some(Data::String(s)) => s,
88 _ => panic!("expected a string for offset"),
89 })
90 .expect("expected offset"),
91 properties: from_str(
92 match &metadata
93 .get(PROPERTIES_KEY)
94 .expect("expected properties")
95 .data
96 {
97 Some(Data::Json(s)) => s,
98 _ => panic!("expected JSON data for properties"),
99 },
100 )
101 .expect("failed to deserialize properties"),
102 sequence_number: convert_from(
103 metadata
104 .get(SEQUENCE_NUMBER_KEY)
105 .expect("expected sequence number"),
106 )
107 .expect("failed to convert sequence number"),
108 system_properties: from_str(
109 match &metadata
110 .get(SYSTEM_PROPERTIES_KEY)
111 .expect("expected system properties")
112 .data
113 {
114 Some(Data::Json(s)) => s,
115 _ => panic!("expected JSON data for system properties"),
116 },
117 )
118 .expect("failed to deserialize system properties"),
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::event_hub::RuntimeInformation;
127 use serde_json::json;
128 use std::str::FromStr;
129
130 #[test]
131 fn it_constructs() {
132 const MESSAGE: &str = "hello world!";
133 const ENQUEUED_TIME: &str = "2018-07-25T06:24:00+00:00";
134 const RETRIEVAL_TIME: &str = "0001-01-01T00:00:00Z";
135 const OFFSET: &str = "98765";
136 const SEQUENCE_NUMBER: i64 = 12345;
137 const PARTITION_ID: &str = "1";
138 const OWNER: &str = "358d9b05-56fe-4549-bafb-e0e102b29b05";
139 const EVENT_HUB_PATH: &str = "my_event_hub";
140 const CONSUMER_GROUP: &str = "$Default";
141 const USER_PROPERTY_NAME: &str = "property name";
142 const USER_PROPERTY_VALUE: &str = "property value";
143 const PARTITION_KEY: &str = "partition key";
144
145 let data = TypedData {
146 data: Some(Data::String(MESSAGE.to_string())),
147 };
148
149 let mut metadata = HashMap::new();
150
151 let context = PartitionContext {
152 consumer_group_name: CONSUMER_GROUP.to_string(),
153 event_hub_path: EVENT_HUB_PATH.to_string(),
154 partition_id: PARTITION_ID.to_string(),
155 owner: OWNER.to_string(),
156 runtime_information: RuntimeInformation {
157 partition_id: PARTITION_ID.to_string(),
158 last_sequence_number: SEQUENCE_NUMBER,
159 last_enqueued_time: DateTime::<Utc>::from_str(ENQUEUED_TIME).unwrap(),
160 last_enqueued_offset: Some(OFFSET.to_string()),
161 retrieval_time: DateTime::<Utc>::from_str(RETRIEVAL_TIME).unwrap(),
162 },
163 };
164
165 let properties = json!({ USER_PROPERTY_NAME: USER_PROPERTY_VALUE });
166
167 let system_properties = SystemProperties {
168 sequence_number: SEQUENCE_NUMBER,
169 offset: OFFSET.to_string(),
170 partition_key: Some(PARTITION_KEY.to_string()),
171 enqueued_time: DateTime::<Utc>::from_str(ENQUEUED_TIME).unwrap(),
172 };
173
174 metadata.insert(
175 PARTITION_CONTEXT_KEY.to_string(),
176 TypedData {
177 data: Some(Data::Json(serde_json::to_string(&context).unwrap())),
178 },
179 );
180 metadata.insert(
181 ENQUEUED_TIME_KEY.to_string(),
182 TypedData {
183 data: Some(Data::String(ENQUEUED_TIME.to_string())),
184 },
185 );
186 metadata.insert(
187 OFFSET_KEY.to_string(),
188 TypedData {
189 data: Some(Data::String(OFFSET.to_string())),
190 },
191 );
192 metadata.insert(
193 PROPERTIES_KEY.to_string(),
194 TypedData {
195 data: Some(Data::Json(properties.to_string())),
196 },
197 );
198 metadata.insert(
199 SEQUENCE_NUMBER_KEY.to_string(),
200 TypedData {
201 data: Some(Data::Int(SEQUENCE_NUMBER)),
202 },
203 );
204 metadata.insert(
205 SYSTEM_PROPERTIES_KEY.to_string(),
206 TypedData {
207 data: Some(Data::Json(
208 serde_json::to_string(&system_properties).unwrap(),
209 )),
210 },
211 );
212
213 let trigger = EventHubTrigger::new(data, metadata);
214
215 assert_eq!(trigger.message.as_str().unwrap(), MESSAGE);
216 assert_eq!(
217 trigger.partition_context.consumer_group_name,
218 CONSUMER_GROUP
219 );
220 assert_eq!(trigger.partition_context.event_hub_path, EVENT_HUB_PATH);
221 assert_eq!(trigger.partition_context.partition_id, PARTITION_ID);
222 assert_eq!(trigger.partition_context.owner, OWNER);
223 assert_eq!(
224 trigger.partition_context.runtime_information.partition_id,
225 PARTITION_ID
226 );
227 assert_eq!(
228 trigger
229 .partition_context
230 .runtime_information
231 .last_sequence_number,
232 SEQUENCE_NUMBER
233 );
234 assert_eq!(
235 trigger
236 .partition_context
237 .runtime_information
238 .last_enqueued_time
239 .to_rfc3339(),
240 ENQUEUED_TIME
241 );
242 assert_eq!(trigger.enqueued_time.to_rfc3339(), ENQUEUED_TIME);
243 assert_eq!(trigger.offset, OFFSET);
244 assert_eq!(
245 trigger.properties,
246 json! {{ USER_PROPERTY_NAME: USER_PROPERTY_VALUE }}
247 );
248 assert_eq!(trigger.sequence_number, SEQUENCE_NUMBER);
249 assert_eq!(trigger.system_properties.sequence_number, SEQUENCE_NUMBER);
250 assert_eq!(trigger.system_properties.offset, OFFSET);
251 assert_eq!(
252 trigger.system_properties.partition_key.unwrap(),
253 PARTITION_KEY
254 );
255 assert_eq!(
256 trigger.system_properties.enqueued_time.to_rfc3339(),
257 ENQUEUED_TIME
258 );
259 }
260}