azeventhubs/
event_data.rs

1use std::borrow::Cow;
2
3use fe2o3_amqp_types::messaging::annotations::AnnotationKey;
4use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Data, Message, Properties};
5use fe2o3_amqp_types::primitives::{OrderedMap, SimpleValue};
6use serde_amqp::primitives::Binary;
7use serde_amqp::Value;
8use time::OffsetDateTime;
9
10use crate::amqp::amqp_property;
11use crate::amqp::error::RawAmqpMessageError;
12use crate::amqp::{
13    amqp_message_extension::{AmqpMessageExt, AmqpMessageMutExt},
14    error::SetMessageIdError,
15};
16use crate::constants::DEFAULT_OFFSET_DATE_TIME;
17
18/// An Event Hubs event, encapsulating a set of data and its associated metadata.
19#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct EventData {
21    pub(crate) amqp_message: Message<Data>,
22}
23
24impl<T> From<T> for EventData
25where
26    T: Into<Vec<u8>>,
27{
28    fn from(value: T) -> Self {
29        Self {
30            amqp_message: Message::builder().data(Binary::from(value)).build(),
31        }
32    }
33}
34
35impl EventData {
36    /// Creates a new event from the given data
37    pub fn new(body: impl Into<Vec<u8>>) -> Self {
38        Self::from(body)
39    }
40
41    /// The data associated with the event
42    pub fn body(&self) -> &[u8] {
43        &self.amqp_message.body.0
44    }
45
46    /// Sets the body associated with the event
47    pub fn set_body(&mut self, body: impl Into<Vec<u8>>) {
48        self.amqp_message.body = Data(Binary::from(body));
49    }
50
51    /// The content type associated with the event
52    pub fn content_type(&self) -> Option<&str> {
53        self.amqp_message.content_type()
54    }
55
56    /// Sets the content type associated with the event
57    pub fn set_content_type(&mut self, content_type: impl Into<Option<String>>) {
58        self.amqp_message.set_content_type(content_type)
59    }
60
61    /// An application-defined value that uniquely identifies the event.  The identifier is
62    /// a free-form value and can reflect a GUID or an identifier derived from the application
63    /// context.
64    pub fn message_id(&self) -> Option<Cow<'_, str>> {
65        self.amqp_message.message_id()
66    }
67
68    /// Sets the message ID associated with the event
69    pub fn set_message_id(
70        &mut self,
71        message_id: impl Into<String>,
72    ) -> Result<(), SetMessageIdError> {
73        self.amqp_message.set_message_id(message_id)
74    }
75
76    /// An application-defined value that represents the context to use for correlation across
77    /// one or more operations.  The identifier is a free-form value and may reflect a unique
78    /// identity or a shared data element with significance to the application.
79    pub fn correlation_id(&self) -> Option<Cow<'_, str>> {
80        self.amqp_message.correlation_id()
81    }
82
83    /// Sets the correlation ID associated with the event
84    pub fn set_correlation_id(&mut self, correlation_id: impl Into<Option<String>>) {
85        self.amqp_message.set_correlation_id(correlation_id)
86    }
87
88    /// The set of free-form properties which may be used for associating metadata with the event that
89    /// is meaningful within the application context.
90    pub fn properties(&self) -> Option<&OrderedMap<String, SimpleValue>> {
91        self.amqp_message
92            .application_properties
93            .as_ref()
94            .map(|p| &p.0)
95    }
96}
97
98/// A received event.
99#[derive(Debug, Clone)]
100pub struct ReceivedEventData {
101    raw_amqp_message: Message<Body<Value>>,
102}
103
104impl ReceivedEventData {
105    pub(crate) fn from_raw_amqp_message(raw_amqp_message: Message<Body<Value>>) -> Self {
106        Self { raw_amqp_message }
107    }
108
109    /// Gets the raw AMQP message.
110    pub fn raw_amqp_message(&self) -> &Message<Body<Value>> {
111        &self.raw_amqp_message
112    }
113
114    /// Consumes the event and returns the raw AMQP message.
115    pub fn into_raw_amqp_message(self) -> Message<Body<Value>> {
116        self.raw_amqp_message
117    }
118
119    /// Gets the body of the message.
120    pub fn body(&self) -> Result<&[u8], RawAmqpMessageError> {
121        match &self.raw_amqp_message.body {
122            Body::Data(batch) => match batch.len() {
123                1 => Ok(batch[0].0.as_ref()),
124                _ => Err(RawAmqpMessageError {}),
125            },
126            _ => Err(RawAmqpMessageError {}),
127        }
128    }
129
130    /// Gets the MessageId to identify the message.
131    ///
132    /// The message identifier is an application-defined value that uniquely identifies the message
133    /// and its payload. The identifier is a free-form string and can reflect a GUID or an
134    /// identifier derived from the application context. If enabled, the [duplicate
135    /// detection](https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection)
136    /// feature identifies and removes second and further submissions of messages with the same
137    /// MessageId.
138    pub fn message_id(&self) -> Option<Cow<'_, str>> {
139        self.raw_amqp_message.message_id()
140    }
141
142    /// Gets the correlation identifier.
143    ///
144    /// Allows an application to specify a context for the message for the purposes of correlation,
145    /// for example reflecting the MessageId of a message that is being replied to. See [Message
146    /// Routing and
147    /// Correlation](https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation").
148    pub fn correlation_id(&self) -> Option<Cow<'_, str>> {
149        self.raw_amqp_message.correlation_id()
150    }
151
152    /// The set of free-form properties which may be used for associating metadata with the event that
153    /// is meaningful within the application context.
154    pub fn properties(&self) -> Option<&ApplicationProperties> {
155        self.raw_amqp_message.application_properties.as_ref()
156    }
157
158    /// The set of free-form event properties which were provided by the Event Hubs service to pass metadata associated with the
159    /// event or associated Event Hubs operation.
160    pub fn system_properties(&self) -> Option<&Properties> {
161        self.raw_amqp_message.properties.as_ref()
162    }
163
164    /// The sequence number assigned to the event when it was enqueued in the associated Event Hub partition.
165    pub fn sequence_number(&self) -> i64 {
166        self.raw_amqp_message
167            .message_annotations
168            .as_ref()
169            .and_then(|m| m.get(&amqp_property::SEQUENCE_NUMBER as &dyn AnnotationKey))
170            .map(|value| match value {
171                Value::Long(val) => *val,
172                _ => unreachable!("Expecting a Long"),
173            })
174            .unwrap_or_default()
175    }
176
177    /// The offset of the event when it was received from the associated Event Hub partition.
178    pub fn offset(&self) -> Option<i64> {
179        self.raw_amqp_message
180            .message_annotations
181            .as_ref()
182            .and_then(|m| m.get(&amqp_property::OFFSET as &dyn AnnotationKey))
183            .and_then(|value| match value {
184                Value::Long(val) => Some(*val),
185                Value::String(val) => val.parse().ok(),
186                _ => unreachable!("Expecting a Long"),
187            })
188    }
189
190    /// The date and time, in UTC, that the event was enqueued in the associated Event Hub
191    /// partition.
192    pub fn enqueued_time(&self) -> OffsetDateTime {
193        self.raw_amqp_message
194            .message_annotations
195            .as_ref()
196            .and_then(|m| m.get(&amqp_property::ENQUEUED_TIME as &dyn AnnotationKey))
197            .map(|value| match value {
198                Value::Timestamp(val) => OffsetDateTime::from(val.clone()),
199                _ => unreachable!("Expecting a Timestamp"),
200            })
201            .unwrap_or(DEFAULT_OFFSET_DATE_TIME)
202    }
203
204    /// The partition key set when the event was enqueued in the associated Event Hub partition.
205    pub fn partition_key(&self) -> Option<&str> {
206        self.raw_amqp_message.partition_key()
207    }
208}