azure_functions/bindings/
service_bus_trigger.rs

1use crate::{
2    bindings::ServiceBusMessage,
3    rpc::{typed_data::Data, TypedData},
4    util::convert_from,
5};
6use chrono::{DateTime, Utc};
7use serde_json::{from_str, Map, Value};
8use std::collections::HashMap;
9
10const DELIVERY_COUNT_KEY: &str = "DeliveryCount";
11const DEAD_LETTER_SOURCE_KEY: &str = "DeadLetterSource";
12const EXPIRATION_TIME_KEY: &str = "ExpiresAtUtc";
13const ENQUEUED_TIME_KEY: &str = "EnqueuedTimeUtc";
14const MESSAGE_ID_KEY: &str = "MessageId";
15const CONTENT_TYPE_KEY: &str = "ContentType";
16const REPLY_TO_KEY: &str = "ReplyTo";
17const SEQUENCE_NUMBER_KEY: &str = "SequenceNumber";
18const TO_KEY: &str = "To";
19const LABEL_KEY: &str = "Label";
20const CORRELATION_ID_KEY: &str = "CorrelationId";
21const USER_PROPERTIES_KEY: &str = "UserProperties";
22
23/// Represents a service bus trigger binding.
24///
25/// The following binding attributes are supported:
26///
27/// | Name                | Description                                                                                                                               |
28/// |---------------------|-------------------------------------------------------------------------------------------------------------------------------------------|
29/// | `name`              | The name of the parameter being bound.                                                                                                    |
30/// | `queue_name`        | The name of the queue to monitor. Use only if monitoring a queue, not for a topic.                                                        |
31/// | `topic_name`        | The name of the topic to monitor. Use only if monitoring a topic, not for a queue.                                                        |
32/// | `subscription_name` | The name of the subscription to monitor. Use only if monitoring a topic, not for a queue.                                                 |
33/// | `connection`        | The name of an app setting that contains the Service Bus connection string to use for this binding. Defaults to `AzureWebJobsServiceBus`. |
34///
35/// # Examples
36///
37/// An example that logs a message when a message is posted to a queue:
38///
39/// ```rust
40/// use azure_functions::{
41///     bindings::ServiceBusTrigger,
42///     func,
43/// };
44///
45/// #[func]
46/// #[binding(name = "trigger", queue_name = "example", connection = "connection")]
47/// pub fn log_message(trigger: ServiceBusTrigger) {
48///     log::info!("{}", trigger.message.as_str().unwrap());
49/// }
50/// ```
51///
52/// An example that logs a message when a message is posted to a topic and subscription:
53///
54/// ```rust
55/// use azure_functions::{bindings::ServiceBusTrigger, func};
56///
57/// #[func]
58/// #[binding(
59///     name = "trigger",
60///     topic_name = "mytopic",
61///     subscription_name = "mysubscription",
62///     connection = "connection"
63/// )]
64/// pub fn log_topic_message(trigger: ServiceBusTrigger) {
65///     log::info!("{}", trigger.message.as_str().unwrap());
66/// }
67/// ```
68#[derive(Debug)]
69pub struct ServiceBusTrigger {
70    /// The message that triggered the function.
71    pub message: ServiceBusMessage,
72    /// The number of deliveries.
73    pub delivery_count: i32,
74    /// The dead letter source.
75    pub dead_letter_source: Option<String>,
76    /// The time that the message expires.
77    pub expiration_time: DateTime<Utc>,
78    /// The time that the message was enqueued.
79    pub enqueued_time: DateTime<Utc>,
80    /// The user-defined value that Service Bus can use to identify duplicate messages, if enabled.
81    pub message_id: String,
82    /// The content type identifier utilized by the sender and receiver for application specific logic.
83    pub content_type: Option<String>,
84    /// The reply to queue address.
85    pub reply_to: Option<String>,
86    /// The unique number assigned to a message by the Service Bus.
87    pub sequence_number: i64,
88    /// The send to address.
89    pub to: Option<String>,
90    /// The application specific label.
91    pub label: Option<String>,
92    /// The correlation ID.
93    pub correlation_id: Option<String>,
94    /// The application specific message properties.
95    pub user_properties: Map<String, Value>,
96}
97
98impl ServiceBusTrigger {
99    #[doc(hidden)]
100    pub fn new(data: TypedData, mut metadata: HashMap<String, TypedData>) -> Self {
101        ServiceBusTrigger {
102            message: data.into(),
103            delivery_count: convert_from(
104                metadata
105                    .get(DELIVERY_COUNT_KEY)
106                    .expect("expected a delivery count"),
107            )
108            .expect("failed to convert delivery count"),
109            dead_letter_source: metadata.remove(DEAD_LETTER_SOURCE_KEY).map(|data| {
110                match data.data {
111                    Some(Data::String(s)) => s,
112                    _ => panic!("expected a string for dead letter source"),
113                }
114            }),
115            expiration_time: convert_from(
116                metadata
117                    .get(EXPIRATION_TIME_KEY)
118                    .expect("expected an expiration time"),
119            )
120            .expect("failed to convert expiration time"),
121            enqueued_time: convert_from(
122                metadata
123                    .get(ENQUEUED_TIME_KEY)
124                    .expect("expected an enqueued time"),
125            )
126            .expect("failed to convert enqueued time"),
127            message_id: metadata
128                .remove(MESSAGE_ID_KEY)
129                .map(|data| match data.data {
130                    Some(Data::String(s)) => s,
131                    _ => panic!("expected a string for message id"),
132                })
133                .expect("expected a message id"),
134            content_type: metadata
135                .remove(CONTENT_TYPE_KEY)
136                .map(|data| match data.data {
137                    Some(Data::String(s)) => s,
138                    _ => panic!("expected a string for content type"),
139                }),
140            reply_to: metadata.remove(REPLY_TO_KEY).map(|data| match data.data {
141                Some(Data::String(s)) => s,
142                _ => panic!("expected a string for reply to"),
143            }),
144            sequence_number: convert_from(
145                metadata
146                    .get(SEQUENCE_NUMBER_KEY)
147                    .expect("expected a sequence number"),
148            )
149            .expect("failed to convert sequence number"),
150            to: metadata.remove(TO_KEY).map(|data| match data.data {
151                Some(Data::String(s)) => s,
152                _ => panic!("expected a string for to"),
153            }),
154            label: metadata.remove(LABEL_KEY).map(|data| match data.data {
155                Some(Data::String(s)) => s,
156                _ => panic!("expected a string for label"),
157            }),
158            correlation_id: metadata
159                .remove(CORRELATION_ID_KEY)
160                .map(|data| match data.data {
161                    Some(Data::String(s)) => s,
162                    _ => panic!("expected a string for correlation id"),
163                }),
164            user_properties: from_str(
165                metadata
166                    .get(USER_PROPERTIES_KEY)
167                    .map(|data| match &data.data {
168                        Some(Data::Json(s)) => s.as_str(),
169                        _ => panic!("expected JSON data for user properties"),
170                    })
171                    .unwrap_or("{}"),
172            )
173            .expect("failed to convert user properties"),
174        }
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181
182    #[test]
183    fn it_constructs() {
184        const DELIVERY_COUNT: i32 = 42;
185        const DEAD_LETTER_SOURCE: &str = "dead letter source";
186        const MESSAGE_ID: &str = "message id";
187        const CONTENT_TYPE: &str = "content type";
188        const REPLY_TO: &str = "reply to";
189        const SEQUENCE_NUMBER: i64 = 12345678;
190        const TO: &str = "to";
191        const LABEL: &str = "label";
192        const CORRELATION_ID: &str = "correlation id";
193        const USER_PROPERTIES: &str = r#"{ "hello": "world" }"#;
194        const MESSAGE: &'static str = "\"hello world\"";
195        let now = Utc::now();
196
197        let data = TypedData {
198            data: Some(Data::Json(MESSAGE.to_string())),
199        };
200
201        let mut metadata = HashMap::new();
202
203        metadata.insert(
204            DELIVERY_COUNT_KEY.to_string(),
205            TypedData {
206                data: Some(Data::Int(DELIVERY_COUNT as i64)),
207            },
208        );
209
210        metadata.insert(
211            DEAD_LETTER_SOURCE_KEY.to_string(),
212            TypedData {
213                data: Some(Data::String(DEAD_LETTER_SOURCE.to_string())),
214            },
215        );
216
217        metadata.insert(
218            EXPIRATION_TIME_KEY.to_string(),
219            TypedData {
220                data: Some(Data::String(now.to_rfc3339())),
221            },
222        );
223
224        metadata.insert(
225            ENQUEUED_TIME_KEY.to_string(),
226            TypedData {
227                data: Some(Data::String(now.to_rfc3339())),
228            },
229        );
230
231        metadata.insert(
232            MESSAGE_ID_KEY.to_string(),
233            TypedData {
234                data: Some(Data::String(MESSAGE_ID.to_string())),
235            },
236        );
237
238        metadata.insert(
239            CONTENT_TYPE_KEY.to_string(),
240            TypedData {
241                data: Some(Data::String(CONTENT_TYPE.to_string())),
242            },
243        );
244
245        metadata.insert(
246            REPLY_TO_KEY.to_string(),
247            TypedData {
248                data: Some(Data::String(REPLY_TO.to_string())),
249            },
250        );
251
252        metadata.insert(
253            SEQUENCE_NUMBER_KEY.to_string(),
254            TypedData {
255                data: Some(Data::Int(SEQUENCE_NUMBER)),
256            },
257        );
258
259        metadata.insert(
260            TO_KEY.to_string(),
261            TypedData {
262                data: Some(Data::String(TO.to_string())),
263            },
264        );
265
266        metadata.insert(
267            LABEL_KEY.to_string(),
268            TypedData {
269                data: Some(Data::String(LABEL.to_string())),
270            },
271        );
272
273        metadata.insert(
274            CORRELATION_ID_KEY.to_string(),
275            TypedData {
276                data: Some(Data::String(CORRELATION_ID.to_string())),
277            },
278        );
279
280        metadata.insert(
281            USER_PROPERTIES_KEY.to_string(),
282            TypedData {
283                data: Some(Data::Json(USER_PROPERTIES.to_string())),
284            },
285        );
286
287        let trigger = ServiceBusTrigger::new(data, metadata);
288
289        assert_eq!(trigger.delivery_count, DELIVERY_COUNT);
290        assert_eq!(trigger.dead_letter_source.unwrap(), DEAD_LETTER_SOURCE);
291        assert_eq!(trigger.expiration_time.to_rfc3339(), now.to_rfc3339());
292        assert_eq!(trigger.enqueued_time.to_rfc3339(), now.to_rfc3339());
293        assert_eq!(trigger.message_id, MESSAGE_ID);
294        assert_eq!(trigger.content_type.unwrap(), CONTENT_TYPE);
295        assert_eq!(trigger.reply_to.unwrap(), REPLY_TO);
296        assert_eq!(trigger.sequence_number, SEQUENCE_NUMBER);
297        assert_eq!(trigger.to.unwrap(), TO);
298        assert_eq!(trigger.label.unwrap(), LABEL);
299        assert_eq!(trigger.correlation_id.unwrap(), CORRELATION_ID);
300        assert_eq!(trigger.user_properties.len(), 1);
301        assert_eq!(trigger.user_properties["hello"].as_str().unwrap(), "world");
302        assert_eq!(trigger.message.as_str().unwrap(), MESSAGE);
303    }
304}