azure_functions/bindings/
service_bus_trigger.rs1use crate::{
2 bindings::ServiceBusMessage,
3 rpc::{typed_data::Data, TypedData},
4 util::convert_from,
5};
6use chrono::{DateTime, Utc};
7use serde_json::{from_str, Map, Value};
8use std::collections::HashMap;
9
10const DELIVERY_COUNT_KEY: &str = "DeliveryCount";
11const DEAD_LETTER_SOURCE_KEY: &str = "DeadLetterSource";
12const EXPIRATION_TIME_KEY: &str = "ExpiresAtUtc";
13const ENQUEUED_TIME_KEY: &str = "EnqueuedTimeUtc";
14const MESSAGE_ID_KEY: &str = "MessageId";
15const CONTENT_TYPE_KEY: &str = "ContentType";
16const REPLY_TO_KEY: &str = "ReplyTo";
17const SEQUENCE_NUMBER_KEY: &str = "SequenceNumber";
18const TO_KEY: &str = "To";
19const LABEL_KEY: &str = "Label";
20const CORRELATION_ID_KEY: &str = "CorrelationId";
21const USER_PROPERTIES_KEY: &str = "UserProperties";
22
23#[derive(Debug)]
69pub struct ServiceBusTrigger {
70 pub message: ServiceBusMessage,
72 pub delivery_count: i32,
74 pub dead_letter_source: Option<String>,
76 pub expiration_time: DateTime<Utc>,
78 pub enqueued_time: DateTime<Utc>,
80 pub message_id: String,
82 pub content_type: Option<String>,
84 pub reply_to: Option<String>,
86 pub sequence_number: i64,
88 pub to: Option<String>,
90 pub label: Option<String>,
92 pub correlation_id: Option<String>,
94 pub user_properties: Map<String, Value>,
96}
97
98impl ServiceBusTrigger {
99 #[doc(hidden)]
100 pub fn new(data: TypedData, mut metadata: HashMap<String, TypedData>) -> Self {
101 ServiceBusTrigger {
102 message: data.into(),
103 delivery_count: convert_from(
104 metadata
105 .get(DELIVERY_COUNT_KEY)
106 .expect("expected a delivery count"),
107 )
108 .expect("failed to convert delivery count"),
109 dead_letter_source: metadata.remove(DEAD_LETTER_SOURCE_KEY).map(|data| {
110 match data.data {
111 Some(Data::String(s)) => s,
112 _ => panic!("expected a string for dead letter source"),
113 }
114 }),
115 expiration_time: convert_from(
116 metadata
117 .get(EXPIRATION_TIME_KEY)
118 .expect("expected an expiration time"),
119 )
120 .expect("failed to convert expiration time"),
121 enqueued_time: convert_from(
122 metadata
123 .get(ENQUEUED_TIME_KEY)
124 .expect("expected an enqueued time"),
125 )
126 .expect("failed to convert enqueued time"),
127 message_id: metadata
128 .remove(MESSAGE_ID_KEY)
129 .map(|data| match data.data {
130 Some(Data::String(s)) => s,
131 _ => panic!("expected a string for message id"),
132 })
133 .expect("expected a message id"),
134 content_type: metadata
135 .remove(CONTENT_TYPE_KEY)
136 .map(|data| match data.data {
137 Some(Data::String(s)) => s,
138 _ => panic!("expected a string for content type"),
139 }),
140 reply_to: metadata.remove(REPLY_TO_KEY).map(|data| match data.data {
141 Some(Data::String(s)) => s,
142 _ => panic!("expected a string for reply to"),
143 }),
144 sequence_number: convert_from(
145 metadata
146 .get(SEQUENCE_NUMBER_KEY)
147 .expect("expected a sequence number"),
148 )
149 .expect("failed to convert sequence number"),
150 to: metadata.remove(TO_KEY).map(|data| match data.data {
151 Some(Data::String(s)) => s,
152 _ => panic!("expected a string for to"),
153 }),
154 label: metadata.remove(LABEL_KEY).map(|data| match data.data {
155 Some(Data::String(s)) => s,
156 _ => panic!("expected a string for label"),
157 }),
158 correlation_id: metadata
159 .remove(CORRELATION_ID_KEY)
160 .map(|data| match data.data {
161 Some(Data::String(s)) => s,
162 _ => panic!("expected a string for correlation id"),
163 }),
164 user_properties: from_str(
165 metadata
166 .get(USER_PROPERTIES_KEY)
167 .map(|data| match &data.data {
168 Some(Data::Json(s)) => s.as_str(),
169 _ => panic!("expected JSON data for user properties"),
170 })
171 .unwrap_or("{}"),
172 )
173 .expect("failed to convert user properties"),
174 }
175 }
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181
182 #[test]
183 fn it_constructs() {
184 const DELIVERY_COUNT: i32 = 42;
185 const DEAD_LETTER_SOURCE: &str = "dead letter source";
186 const MESSAGE_ID: &str = "message id";
187 const CONTENT_TYPE: &str = "content type";
188 const REPLY_TO: &str = "reply to";
189 const SEQUENCE_NUMBER: i64 = 12345678;
190 const TO: &str = "to";
191 const LABEL: &str = "label";
192 const CORRELATION_ID: &str = "correlation id";
193 const USER_PROPERTIES: &str = r#"{ "hello": "world" }"#;
194 const MESSAGE: &'static str = "\"hello world\"";
195 let now = Utc::now();
196
197 let data = TypedData {
198 data: Some(Data::Json(MESSAGE.to_string())),
199 };
200
201 let mut metadata = HashMap::new();
202
203 metadata.insert(
204 DELIVERY_COUNT_KEY.to_string(),
205 TypedData {
206 data: Some(Data::Int(DELIVERY_COUNT as i64)),
207 },
208 );
209
210 metadata.insert(
211 DEAD_LETTER_SOURCE_KEY.to_string(),
212 TypedData {
213 data: Some(Data::String(DEAD_LETTER_SOURCE.to_string())),
214 },
215 );
216
217 metadata.insert(
218 EXPIRATION_TIME_KEY.to_string(),
219 TypedData {
220 data: Some(Data::String(now.to_rfc3339())),
221 },
222 );
223
224 metadata.insert(
225 ENQUEUED_TIME_KEY.to_string(),
226 TypedData {
227 data: Some(Data::String(now.to_rfc3339())),
228 },
229 );
230
231 metadata.insert(
232 MESSAGE_ID_KEY.to_string(),
233 TypedData {
234 data: Some(Data::String(MESSAGE_ID.to_string())),
235 },
236 );
237
238 metadata.insert(
239 CONTENT_TYPE_KEY.to_string(),
240 TypedData {
241 data: Some(Data::String(CONTENT_TYPE.to_string())),
242 },
243 );
244
245 metadata.insert(
246 REPLY_TO_KEY.to_string(),
247 TypedData {
248 data: Some(Data::String(REPLY_TO.to_string())),
249 },
250 );
251
252 metadata.insert(
253 SEQUENCE_NUMBER_KEY.to_string(),
254 TypedData {
255 data: Some(Data::Int(SEQUENCE_NUMBER)),
256 },
257 );
258
259 metadata.insert(
260 TO_KEY.to_string(),
261 TypedData {
262 data: Some(Data::String(TO.to_string())),
263 },
264 );
265
266 metadata.insert(
267 LABEL_KEY.to_string(),
268 TypedData {
269 data: Some(Data::String(LABEL.to_string())),
270 },
271 );
272
273 metadata.insert(
274 CORRELATION_ID_KEY.to_string(),
275 TypedData {
276 data: Some(Data::String(CORRELATION_ID.to_string())),
277 },
278 );
279
280 metadata.insert(
281 USER_PROPERTIES_KEY.to_string(),
282 TypedData {
283 data: Some(Data::Json(USER_PROPERTIES.to_string())),
284 },
285 );
286
287 let trigger = ServiceBusTrigger::new(data, metadata);
288
289 assert_eq!(trigger.delivery_count, DELIVERY_COUNT);
290 assert_eq!(trigger.dead_letter_source.unwrap(), DEAD_LETTER_SOURCE);
291 assert_eq!(trigger.expiration_time.to_rfc3339(), now.to_rfc3339());
292 assert_eq!(trigger.enqueued_time.to_rfc3339(), now.to_rfc3339());
293 assert_eq!(trigger.message_id, MESSAGE_ID);
294 assert_eq!(trigger.content_type.unwrap(), CONTENT_TYPE);
295 assert_eq!(trigger.reply_to.unwrap(), REPLY_TO);
296 assert_eq!(trigger.sequence_number, SEQUENCE_NUMBER);
297 assert_eq!(trigger.to.unwrap(), TO);
298 assert_eq!(trigger.label.unwrap(), LABEL);
299 assert_eq!(trigger.correlation_id.unwrap(), CORRELATION_ID);
300 assert_eq!(trigger.user_properties.len(), 1);
301 assert_eq!(trigger.user_properties["hello"].as_str().unwrap(), "world");
302 assert_eq!(trigger.message.as_str().unwrap(), MESSAGE);
303 }
304}