use std::collections::HashMap;
use std::sync::atomic::{
AtomicU64,
Ordering,
};
use std::time::{
Duration,
SystemTime,
};
use crate::{
Acknowledgement,
EventEnvelopeBuilder,
Topic,
};
static NEXT_EVENT_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Clone)]
pub struct EventEnvelope<T: 'static> {
id: String,
topic: Topic<T>,
payload: T,
headers: HashMap<String, String>,
ordering_key: Option<String>,
timestamp: SystemTime,
delay: Option<Duration>,
acknowledgement: Option<Acknowledgement>,
dead_letter: bool,
}
#[derive(Debug, Clone)]
pub struct EventEnvelopeMetadata {
id: String,
topic_name: String,
payload_type_name: &'static str,
headers: HashMap<String, String>,
ordering_key: Option<String>,
timestamp: SystemTime,
delay: Option<Duration>,
dead_letter: bool,
}
impl EventEnvelopeMetadata {
pub fn id(&self) -> &str {
&self.id
}
pub fn topic_name(&self) -> &str {
&self.topic_name
}
pub fn payload_type_name(&self) -> &'static str {
self.payload_type_name
}
pub fn headers(&self) -> &HashMap<String, String> {
&self.headers
}
pub fn ordering_key(&self) -> Option<&str> {
self.ordering_key.as_deref()
}
pub fn timestamp(&self) -> SystemTime {
self.timestamp
}
pub fn delay(&self) -> Option<Duration> {
self.delay
}
pub fn is_dead_letter(&self) -> bool {
self.dead_letter
}
pub fn with_header(mut self, key: &str, value: impl ToString) -> Self {
self.headers.insert(key.to_string(), value.to_string());
self
}
pub fn without_header(mut self, key: &str) -> Self {
self.headers.remove(key);
self
}
pub fn with_ordering_key(mut self, ordering_key: &str) -> Self {
self.ordering_key = Some(ordering_key.to_string());
self
}
pub fn without_ordering_key(mut self) -> Self {
self.ordering_key = None;
self
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.delay = Some(delay);
self
}
pub fn without_delay(mut self) -> Self {
self.delay = None;
self
}
}
impl<T: 'static> EventEnvelope<T> {
pub fn create(topic: Topic<T>, payload: T) -> Self {
Self {
id: generate_event_id(),
topic,
payload,
headers: HashMap::new(),
ordering_key: None,
timestamp: SystemTime::now(),
delay: None,
acknowledgement: None,
dead_letter: false,
}
}
pub fn builder() -> EventEnvelopeBuilder<T> {
EventEnvelopeBuilder::new()
}
pub(crate) fn from_builder(builder: EventEnvelopeBuilder<T>) -> Self {
Self {
id: builder.id,
topic: builder
.topic
.expect("validated builder should contain a topic"),
payload: builder
.payload
.expect("validated builder should contain a payload"),
headers: builder.headers,
ordering_key: builder.ordering_key,
timestamp: builder.timestamp,
delay: builder.delay,
acknowledgement: builder.acknowledgement,
dead_letter: builder.dead_letter,
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn topic(&self) -> &Topic<T> {
&self.topic
}
pub fn payload(&self) -> &T {
&self.payload
}
pub fn headers(&self) -> &HashMap<String, String> {
&self.headers
}
pub fn metadata(&self) -> EventEnvelopeMetadata {
EventEnvelopeMetadata {
id: self.id.clone(),
topic_name: self.topic.name().to_string(),
payload_type_name: self.topic.payload_type_name(),
headers: self.headers.clone(),
ordering_key: self.ordering_key.clone(),
timestamp: self.timestamp,
delay: self.delay,
dead_letter: self.dead_letter,
}
}
pub fn ordering_key(&self) -> Option<&str> {
self.ordering_key.as_deref()
}
pub fn timestamp(&self) -> SystemTime {
self.timestamp
}
pub fn delay(&self) -> Option<Duration> {
self.delay
}
pub fn acknowledgement(&self) -> Option<&Acknowledgement> {
self.acknowledgement.as_ref()
}
pub fn is_dead_letter(&self) -> bool {
self.dead_letter
}
pub fn with_header(mut self, key: impl Into<String>, value: impl ToString) -> Self {
self.headers.insert(key.into(), value.to_string());
self
}
pub fn with_ordering_key(mut self, ordering_key: impl Into<String>) -> Self {
self.ordering_key = Some(ordering_key.into());
self
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.delay = Some(delay);
self
}
pub fn with_acknowledgement(mut self, acknowledgement: Acknowledgement) -> Self {
self.acknowledgement = Some(acknowledgement);
self
}
pub fn as_dead_letter(mut self) -> Self {
self.dead_letter = true;
self
}
pub(crate) fn apply_metadata(&mut self, metadata: EventEnvelopeMetadata) {
self.headers = metadata.headers;
self.ordering_key = metadata.ordering_key;
self.delay = metadata.delay;
}
}
pub(crate) fn generate_event_id() -> String {
let id = NEXT_EVENT_ID.fetch_add(1, Ordering::SeqCst);
format!("event-{id}")
}