azeventhubs/
event_data.rs1use std::borrow::Cow;
2
3use fe2o3_amqp_types::messaging::annotations::AnnotationKey;
4use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Data, Message, Properties};
5use fe2o3_amqp_types::primitives::{OrderedMap, SimpleValue};
6use serde_amqp::primitives::Binary;
7use serde_amqp::Value;
8use time::OffsetDateTime;
9
10use crate::amqp::amqp_property;
11use crate::amqp::error::RawAmqpMessageError;
12use crate::amqp::{
13 amqp_message_extension::{AmqpMessageExt, AmqpMessageMutExt},
14 error::SetMessageIdError,
15};
16use crate::constants::DEFAULT_OFFSET_DATE_TIME;
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
20pub struct EventData {
21 pub(crate) amqp_message: Message<Data>,
22}
23
24impl<T> From<T> for EventData
25where
26 T: Into<Vec<u8>>,
27{
28 fn from(value: T) -> Self {
29 Self {
30 amqp_message: Message::builder().data(Binary::from(value)).build(),
31 }
32 }
33}
34
35impl EventData {
36 pub fn new(body: impl Into<Vec<u8>>) -> Self {
38 Self::from(body)
39 }
40
41 pub fn body(&self) -> &[u8] {
43 &self.amqp_message.body.0
44 }
45
46 pub fn set_body(&mut self, body: impl Into<Vec<u8>>) {
48 self.amqp_message.body = Data(Binary::from(body));
49 }
50
51 pub fn content_type(&self) -> Option<&str> {
53 self.amqp_message.content_type()
54 }
55
56 pub fn set_content_type(&mut self, content_type: impl Into<Option<String>>) {
58 self.amqp_message.set_content_type(content_type)
59 }
60
61 pub fn message_id(&self) -> Option<Cow<'_, str>> {
65 self.amqp_message.message_id()
66 }
67
68 pub fn set_message_id(
70 &mut self,
71 message_id: impl Into<String>,
72 ) -> Result<(), SetMessageIdError> {
73 self.amqp_message.set_message_id(message_id)
74 }
75
76 pub fn correlation_id(&self) -> Option<Cow<'_, str>> {
80 self.amqp_message.correlation_id()
81 }
82
83 pub fn set_correlation_id(&mut self, correlation_id: impl Into<Option<String>>) {
85 self.amqp_message.set_correlation_id(correlation_id)
86 }
87
88 pub fn properties(&self) -> Option<&OrderedMap<String, SimpleValue>> {
91 self.amqp_message
92 .application_properties
93 .as_ref()
94 .map(|p| &p.0)
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct ReceivedEventData {
101 raw_amqp_message: Message<Body<Value>>,
102}
103
104impl ReceivedEventData {
105 pub(crate) fn from_raw_amqp_message(raw_amqp_message: Message<Body<Value>>) -> Self {
106 Self { raw_amqp_message }
107 }
108
109 pub fn raw_amqp_message(&self) -> &Message<Body<Value>> {
111 &self.raw_amqp_message
112 }
113
114 pub fn into_raw_amqp_message(self) -> Message<Body<Value>> {
116 self.raw_amqp_message
117 }
118
119 pub fn body(&self) -> Result<&[u8], RawAmqpMessageError> {
121 match &self.raw_amqp_message.body {
122 Body::Data(batch) => match batch.len() {
123 1 => Ok(batch[0].0.as_ref()),
124 _ => Err(RawAmqpMessageError {}),
125 },
126 _ => Err(RawAmqpMessageError {}),
127 }
128 }
129
130 pub fn message_id(&self) -> Option<Cow<'_, str>> {
139 self.raw_amqp_message.message_id()
140 }
141
142 pub fn correlation_id(&self) -> Option<Cow<'_, str>> {
149 self.raw_amqp_message.correlation_id()
150 }
151
152 pub fn properties(&self) -> Option<&ApplicationProperties> {
155 self.raw_amqp_message.application_properties.as_ref()
156 }
157
158 pub fn system_properties(&self) -> Option<&Properties> {
161 self.raw_amqp_message.properties.as_ref()
162 }
163
164 pub fn sequence_number(&self) -> i64 {
166 self.raw_amqp_message
167 .message_annotations
168 .as_ref()
169 .and_then(|m| m.get(&amqp_property::SEQUENCE_NUMBER as &dyn AnnotationKey))
170 .map(|value| match value {
171 Value::Long(val) => *val,
172 _ => unreachable!("Expecting a Long"),
173 })
174 .unwrap_or_default()
175 }
176
177 pub fn offset(&self) -> Option<i64> {
179 self.raw_amqp_message
180 .message_annotations
181 .as_ref()
182 .and_then(|m| m.get(&amqp_property::OFFSET as &dyn AnnotationKey))
183 .and_then(|value| match value {
184 Value::Long(val) => Some(*val),
185 Value::String(val) => val.parse().ok(),
186 _ => unreachable!("Expecting a Long"),
187 })
188 }
189
190 pub fn enqueued_time(&self) -> OffsetDateTime {
193 self.raw_amqp_message
194 .message_annotations
195 .as_ref()
196 .and_then(|m| m.get(&amqp_property::ENQUEUED_TIME as &dyn AnnotationKey))
197 .map(|value| match value {
198 Value::Timestamp(val) => OffsetDateTime::from(val.clone()),
199 _ => unreachable!("Expecting a Timestamp"),
200 })
201 .unwrap_or(DEFAULT_OFFSET_DATE_TIME)
202 }
203
204 pub fn partition_key(&self) -> Option<&str> {
206 self.raw_amqp_message.partition_key()
207 }
208}