azure_functions/bindings/
event_hub_message.rs

1use crate::{
2    http::Body,
3    rpc::{typed_data::Data, TypedData},
4    FromVec,
5};
6use serde::de::Error;
7use serde::Deserialize;
8use serde_json::{from_str, Result, Value};
9use std::borrow::Cow;
10use std::fmt;
11use std::str::from_utf8;
12
13/// Represents an Event Hubs message output binding.
14///
15/// The following binding attributes are supported:
16///
17/// | Name             | Description                                                                                                                                                                             |
18/// |------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
19/// | `name`           | The name of the parameter being bound.                                                                                                                                                  |
20/// | `event_hub_name` | The name of the event hub. When the event hub name is also present in the connection string, that value overrides this property at runtime.                                             |
21/// | `connection`     | The name of an app setting that contains the connection string to the event hub's namespace. This connection string must have send permissions to send the message to the event stream. |
22///
23/// # Examples
24///
25/// Creating a message from a string:
26///
27/// ```rust
28/// use azure_functions::bindings::{HttpRequest, EventHubMessage};
29/// use azure_functions::func;
30///
31/// #[func]
32/// #[binding(name = "output1", connection = "connection", event_hub_name = "example")]
33/// pub fn create_message(_req: HttpRequest) -> ((), EventHubMessage) {
34///     ((), "Hello world!".into())
35/// }
36/// ```
37///
38/// Creating a message from a JSON value (see the [json! macro](https://docs.serde.rs/serde_json/macro.json.html) from the `serde_json` crate):
39///
40/// ```rust
41/// use azure_functions::bindings::{HttpRequest, EventHubMessage};
42/// use azure_functions::func;
43/// use serde_json::json;
44///
45/// #[func]
46/// #[binding(name = "output1", connection = "connection", event_hub_name = "example")]
47/// pub fn create_message(_req: HttpRequest) -> ((), EventHubMessage) {
48///     (() ,json!({ "hello": "world!" }).into())
49/// }
50/// ```
51///
52/// Creating a message from a sequence of bytes:
53///
54/// ```rust
55/// use azure_functions::bindings::{HttpRequest, EventHubMessage};
56/// use azure_functions::func;
57///
58/// #[func]
59/// #[binding(name = "output1", connection = "connection", event_hub_name = "example")]
60/// pub fn create_message(_req: HttpRequest) -> ((), EventHubMessage) {
61///     ((), [1, 2, 3][..].into())
62/// }
63/// ```
64#[derive(Debug, Clone)]
65pub struct EventHubMessage(TypedData);
66
67impl EventHubMessage {
68    /// Gets the content of the message as a string.
69    ///
70    /// Returns None if there is no valid string representation of the message.
71    pub fn as_str(&self) -> Option<&str> {
72        match &self.0.data {
73            Some(Data::String(s)) => Some(s),
74            Some(Data::Json(s)) => Some(s),
75            Some(Data::Bytes(b)) => from_utf8(b).ok(),
76            Some(Data::Stream(s)) => from_utf8(s).ok(),
77            _ => None,
78        }
79    }
80
81    /// Gets the content of the message as a slice of bytes.
82    pub fn as_bytes(&self) -> &[u8] {
83        match &self.0.data {
84            Some(Data::String(s)) => s.as_bytes(),
85            Some(Data::Json(s)) => s.as_bytes(),
86            Some(Data::Bytes(b)) => b,
87            Some(Data::Stream(s)) => s,
88            _ => panic!("unexpected data for Event Hub message contents"),
89        }
90    }
91
92    /// Deserializes the message as JSON to the requested type.
93    pub fn as_json<'b, T>(&'b self) -> Result<T>
94    where
95        T: Deserialize<'b>,
96    {
97        from_str(
98            self.as_str().ok_or_else(|| {
99                ::serde_json::Error::custom("Event Hub message is not valid UTF-8")
100            })?,
101        )
102    }
103}
104
105impl fmt::Display for EventHubMessage {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        write!(f, "{}", self.as_str().unwrap_or(""))
108    }
109}
110
111impl<'a> From<&'a str> for EventHubMessage {
112    fn from(content: &'a str) -> Self {
113        EventHubMessage(TypedData {
114            data: Some(Data::String(content.to_owned())),
115        })
116    }
117}
118
119impl From<String> for EventHubMessage {
120    fn from(content: String) -> Self {
121        EventHubMessage(TypedData {
122            data: Some(Data::String(content)),
123        })
124    }
125}
126
127impl From<&Value> for EventHubMessage {
128    fn from(content: &Value) -> Self {
129        EventHubMessage(TypedData {
130            data: Some(Data::Json(content.to_string())),
131        })
132    }
133}
134
135impl From<Value> for EventHubMessage {
136    fn from(content: Value) -> Self {
137        EventHubMessage(TypedData {
138            data: Some(Data::Json(content.to_string())),
139        })
140    }
141}
142
143impl<'a> From<&'a [u8]> for EventHubMessage {
144    fn from(content: &'a [u8]) -> Self {
145        EventHubMessage(TypedData {
146            data: Some(Data::Bytes(content.to_owned())),
147        })
148    }
149}
150
151impl From<Vec<u8>> for EventHubMessage {
152    fn from(content: Vec<u8>) -> Self {
153        EventHubMessage(TypedData {
154            data: Some(Data::Bytes(content)),
155        })
156    }
157}
158
159#[doc(hidden)]
160impl From<TypedData> for EventHubMessage {
161    fn from(data: TypedData) -> Self {
162        EventHubMessage(data)
163    }
164}
165
166#[doc(hidden)]
167impl FromVec<EventHubMessage> for TypedData {
168    fn from_vec(vec: Vec<EventHubMessage>) -> Self {
169        TypedData {
170            data: Some(Data::Json(
171                Value::Array(vec.into_iter().map(Into::into).collect()).to_string(),
172            )),
173        }
174    }
175}
176
177impl Into<String> for EventHubMessage {
178    fn into(self) -> String {
179        match self.0.data {
180            Some(Data::String(s)) => s,
181            Some(Data::Json(s)) => s,
182            Some(Data::Bytes(b)) => {
183                String::from_utf8(b).expect("Event Hub message does not contain valid UTF-8 bytes")
184            }
185            Some(Data::Stream(s)) => {
186                String::from_utf8(s).expect("Event Hub message does not contain valid UTF-8 bytes")
187            }
188            _ => panic!("unexpected data for Event Hub message content"),
189        }
190    }
191}
192
193impl Into<Value> for EventHubMessage {
194    fn into(self) -> Value {
195        // TODO: this is not an efficient encoding for bytes/stream
196        match self.0.data {
197            Some(Data::String(s)) => Value::String(s),
198            Some(Data::Json(s)) => {
199                from_str(&s).expect("Event Hub message does not contain valid JSON data")
200            }
201            Some(Data::Bytes(b)) => Value::Array(
202                b.iter()
203                    .map(|n| Value::Number(u64::from(*n).into()))
204                    .collect(),
205            ),
206            Some(Data::Stream(s)) => Value::Array(
207                s.iter()
208                    .map(|n| Value::Number(u64::from(*n).into()))
209                    .collect(),
210            ),
211            _ => panic!("unexpected data for Event Hub message content"),
212        }
213    }
214}
215
216impl Into<Vec<u8>> for EventHubMessage {
217    fn into(self) -> Vec<u8> {
218        match self.0.data {
219            Some(Data::String(s)) => s.into_bytes(),
220            Some(Data::Json(s)) => s.into_bytes(),
221            Some(Data::Bytes(b)) => b,
222            Some(Data::Stream(s)) => s,
223            _ => panic!("unexpected data for Event Hub message content"),
224        }
225    }
226}
227
228impl<'a> Into<Body<'a>> for EventHubMessage {
229    fn into(self) -> Body<'a> {
230        match self.0.data {
231            Some(Data::String(s)) => s.into(),
232            Some(Data::Json(s)) => Body::Json(Cow::from(s)),
233            Some(Data::Bytes(b)) => b.into(),
234            Some(Data::Stream(s)) => s.into(),
235            _ => panic!("unexpected data for Event Hub message content"),
236        }
237    }
238}
239
240#[doc(hidden)]
241impl Into<TypedData> for EventHubMessage {
242    fn into(self) -> TypedData {
243        self.0
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use serde::{Deserialize, Serialize};
251    use serde_json::{json, to_value};
252    use std::fmt::Write;
253
254    #[test]
255    fn it_has_string_content() {
256        const MESSAGE: &'static str = "test message";
257
258        let message: EventHubMessage = MESSAGE.into();
259        assert_eq!(message.as_str().unwrap(), MESSAGE);
260
261        let data: TypedData = message.into();
262        assert_eq!(data.data, Some(Data::String(MESSAGE.to_string())));
263    }
264
265    #[test]
266    fn it_has_json_content() {
267        #[derive(Serialize, Deserialize)]
268        struct SerializedData {
269            message: String,
270        };
271
272        const MESSAGE: &'static str = "test";
273
274        let data = SerializedData {
275            message: MESSAGE.to_string(),
276        };
277
278        let message: EventHubMessage = ::serde_json::to_value(data).unwrap().into();
279        assert_eq!(
280            message.as_json::<SerializedData>().unwrap().message,
281            MESSAGE
282        );
283
284        let data: TypedData = message.into();
285        assert_eq!(
286            data.data,
287            Some(Data::Json(r#"{"message":"test"}"#.to_string()))
288        );
289    }
290
291    #[test]
292    fn it_has_bytes_content() {
293        const MESSAGE: &'static [u8] = &[1, 2, 3];
294
295        let message: EventHubMessage = MESSAGE.into();
296        assert_eq!(message.as_bytes(), MESSAGE);
297
298        let data: TypedData = message.into();
299        assert_eq!(data.data, Some(Data::Bytes(MESSAGE.to_owned())));
300    }
301
302    #[test]
303    fn it_displays_as_a_string() {
304        const MESSAGE: &'static str = "test";
305
306        let message: EventHubMessage = MESSAGE.into();
307
308        let mut s = String::new();
309        write!(s, "{}", message).unwrap();
310
311        assert_eq!(s, MESSAGE);
312    }
313
314    #[test]
315    fn it_converts_from_str() {
316        let message: EventHubMessage = "test".into();
317        assert_eq!(message.as_str().unwrap(), "test");
318    }
319
320    #[test]
321    fn it_converts_from_string() {
322        let message: EventHubMessage = "test".to_string().into();
323        assert_eq!(message.as_str().unwrap(), "test");
324    }
325
326    #[test]
327    fn it_converts_from_json() {
328        let message: EventHubMessage = to_value("hello world").unwrap().into();
329        assert_eq!(message.as_str().unwrap(), r#""hello world""#);
330    }
331
332    #[test]
333    fn it_converts_from_u8_slice() {
334        let message: EventHubMessage = [0, 1, 2][..].into();
335        assert_eq!(message.as_bytes(), [0, 1, 2]);
336    }
337
338    #[test]
339    fn it_converts_from_u8_vec() {
340        let message: EventHubMessage = vec![0, 1, 2].into();
341        assert_eq!(message.as_bytes(), [0, 1, 2]);
342    }
343
344    #[test]
345    fn it_converts_to_string() {
346        let message: EventHubMessage = "hello world!".into();
347        let s: String = message.into();
348        assert_eq!(s, "hello world!");
349    }
350
351    #[test]
352    fn it_converts_to_json() {
353        let message: EventHubMessage = json!({"hello": "world"}).into();
354        let value: Value = message.into();
355        assert_eq!(value.to_string(), r#"{"hello":"world"}"#);
356    }
357
358    #[test]
359    fn it_converts_to_bytes() {
360        let message: EventHubMessage = vec![1, 2, 3].into();
361        let bytes: Vec<u8> = message.into();
362        assert_eq!(bytes, [1, 2, 3]);
363    }
364
365    #[test]
366    fn it_converts_to_body() {
367        let message: EventHubMessage = "hello world!".into();
368        let body: Body = message.into();
369        assert_eq!(body.as_str().unwrap(), "hello world!");
370
371        let message: EventHubMessage = json!({"hello": "world"}).into();
372        let body: Body = message.into();
373        assert_eq!(body.as_str().unwrap(), r#"{"hello":"world"}"#);
374
375        let message: EventHubMessage = vec![1, 2, 3].into();
376        let body: Body = message.into();
377        assert_eq!(body.as_bytes(), [1, 2, 3]);
378    }
379
380    #[test]
381    fn it_converts_to_typed_data() {
382        let message: EventHubMessage = "test".into();
383        let data: TypedData = message.into();
384        assert_eq!(data.data, Some(Data::String("test".to_string())));
385
386        let message: EventHubMessage = to_value("test").unwrap().into();
387        let data: TypedData = message.into();
388        assert_eq!(data.data, Some(Data::Json(r#""test""#.to_string())));
389
390        let message: EventHubMessage = vec![1, 2, 3].into();
391        let data: TypedData = message.into();
392        assert_eq!(data.data, Some(Data::Bytes([1, 2, 3].to_vec())));
393    }
394}