1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
use std::borrow::Cow;
use fe2o3_amqp_types::messaging::annotations::AnnotationKey;
use fe2o3_amqp_types::messaging::{ApplicationProperties, Body, Data, Message, Properties};
use fe2o3_amqp_types::primitives::{OrderedMap, SimpleValue};
use serde_amqp::primitives::Binary;
use serde_amqp::Value;
use time::OffsetDateTime;
use crate::amqp::amqp_property;
use crate::amqp::error::RawAmqpMessageError;
use crate::amqp::{
amqp_message_extension::{AmqpMessageExt, AmqpMessageMutExt},
error::SetMessageIdError,
};
use crate::constants::DEFAULT_OFFSET_DATE_TIME;
/// An Event Hubs event, encapsulating a set of data and its associated metadata.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EventData {
pub(crate) amqp_message: Message<Data>,
}
impl<T> From<T> for EventData
where
T: Into<Vec<u8>>,
{
fn from(value: T) -> Self {
Self {
amqp_message: Message::builder().data(Binary::from(value)).build(),
}
}
}
impl EventData {
/// Creates a new event from the given data
pub fn new(body: impl Into<Vec<u8>>) -> Self {
Self::from(body)
}
/// The data associated with the event
pub fn body(&self) -> &[u8] {
&self.amqp_message.body.0
}
/// Sets the body associated with the event
pub fn set_body(&mut self, body: impl Into<Vec<u8>>) {
self.amqp_message.body = Data(Binary::from(body));
}
/// The content type associated with the event
pub fn content_type(&self) -> Option<&str> {
self.amqp_message.content_type()
}
/// Sets the content type associated with the event
pub fn set_content_type(&mut self, content_type: impl Into<Option<String>>) {
self.amqp_message.set_content_type(content_type)
}
/// An application-defined value that uniquely identifies the event. The identifier is
/// a free-form value and can reflect a GUID or an identifier derived from the application
/// context.
pub fn message_id(&self) -> Option<Cow<'_, str>> {
self.amqp_message.message_id()
}
/// Sets the message ID associated with the event
pub fn set_message_id(
&mut self,
message_id: impl Into<String>,
) -> Result<(), SetMessageIdError> {
self.amqp_message.set_message_id(message_id)
}
/// An application-defined value that represents the context to use for correlation across
/// one or more operations. The identifier is a free-form value and may reflect a unique
/// identity or a shared data element with significance to the application.
pub fn correlation_id(&self) -> Option<Cow<'_, str>> {
self.amqp_message.correlation_id()
}
/// Sets the correlation ID associated with the event
pub fn set_correlation_id(&mut self, correlation_id: impl Into<Option<String>>) {
self.amqp_message.set_correlation_id(correlation_id)
}
/// The set of free-form properties which may be used for associating metadata with the event that
/// is meaningful within the application context.
pub fn properties(&self) -> Option<&OrderedMap<String, SimpleValue>> {
self.amqp_message
.application_properties
.as_ref()
.map(|p| &p.0)
}
}
/// A received event.
#[derive(Debug, Clone)]
pub struct ReceivedEventData {
raw_amqp_message: Message<Body<Value>>,
}
impl ReceivedEventData {
pub(crate) fn from_raw_amqp_message(raw_amqp_message: Message<Body<Value>>) -> Self {
Self { raw_amqp_message }
}
/// Gets the raw AMQP message.
pub fn raw_amqp_message(&self) -> &Message<Body<Value>> {
&self.raw_amqp_message
}
/// Consumes the event and returns the raw AMQP message.
pub fn into_raw_amqp_message(self) -> Message<Body<Value>> {
self.raw_amqp_message
}
/// Gets the body of the message.
pub fn body(&self) -> Result<&[u8], RawAmqpMessageError> {
match &self.raw_amqp_message.body {
Body::Data(batch) => match batch.len() {
1 => Ok(batch[0].0.as_ref()),
_ => Err(RawAmqpMessageError {}),
},
_ => Err(RawAmqpMessageError {}),
}
}
/// Gets the MessageId to identify the message.
///
/// The message identifier is an application-defined value that uniquely identifies the message
/// and its payload. The identifier is a free-form string and can reflect a GUID or an
/// identifier derived from the application context. If enabled, the [duplicate
/// detection](https://docs.microsoft.com/azure/service-bus-messaging/duplicate-detection)
/// feature identifies and removes second and further submissions of messages with the same
/// MessageId.
pub fn message_id(&self) -> Option<Cow<'_, str>> {
self.raw_amqp_message.message_id()
}
/// Gets the correlation identifier.
///
/// Allows an application to specify a context for the message for the purposes of correlation,
/// for example reflecting the MessageId of a message that is being replied to. See [Message
/// Routing and
/// Correlation](https://docs.microsoft.com/azure/service-bus-messaging/service-bus-messages-payloads?#message-routing-and-correlation").
pub fn correlation_id(&self) -> Option<Cow<'_, str>> {
self.raw_amqp_message.correlation_id()
}
/// The set of free-form properties which may be used for associating metadata with the event that
/// is meaningful within the application context.
pub fn properties(&self) -> Option<&ApplicationProperties> {
self.raw_amqp_message.application_properties.as_ref()
}
/// The set of free-form event properties which were provided by the Event Hubs service to pass metadata associated with the
/// event or associated Event Hubs operation.
pub fn system_properties(&self) -> Option<&Properties> {
self.raw_amqp_message.properties.as_ref()
}
/// The sequence number assigned to the event when it was enqueued in the associated Event Hub partition.
pub fn sequence_number(&self) -> i64 {
self.raw_amqp_message
.message_annotations
.as_ref()
.and_then(|m| m.get(&amqp_property::SEQUENCE_NUMBER as &dyn AnnotationKey))
.map(|value| match value {
Value::Long(val) => *val,
_ => unreachable!("Expecting a Long"),
})
.unwrap_or_default()
}
/// The offset of the event when it was received from the associated Event Hub partition.
pub fn offset(&self) -> Option<i64> {
self.raw_amqp_message
.message_annotations
.as_ref()
.and_then(|m| m.get(&amqp_property::OFFSET as &dyn AnnotationKey))
.and_then(|value| match value {
Value::Long(val) => Some(*val),
Value::String(val) => val.parse().ok(),
_ => unreachable!("Expecting a Long"),
})
}
/// The date and time, in UTC, that the event was enqueued in the associated Event Hub
/// partition.
pub fn enqueued_time(&self) -> OffsetDateTime {
self.raw_amqp_message
.message_annotations
.as_ref()
.and_then(|m| m.get(&amqp_property::ENQUEUED_TIME as &dyn AnnotationKey))
.map(|value| match value {
Value::Timestamp(val) => OffsetDateTime::from(val.clone()),
_ => unreachable!("Expecting a Timestamp"),
})
.unwrap_or(DEFAULT_OFFSET_DATE_TIME)
}
/// The partition key set when the event was enqueued in the associated Event Hub partition.
pub fn partition_key(&self) -> Option<&str> {
self.raw_amqp_message.partition_key()
}
}