use std::any::{
Any,
type_name,
};
use std::sync::Arc;
use std::time::{
SystemTime,
UNIX_EPOCH,
};
use qubit_metadata::Metadata;
use crate::{
EventBusError,
EventEnvelope,
EventEnvelopeMetadata,
};
pub type DeadLetterOriginalPayload = Arc<dyn Any + Send + Sync + 'static>;
pub type DeadLetterPayload = DeadLetterRecord;
#[derive(Clone)]
pub struct DeadLetterRecord {
metadata: Metadata,
original_payload: DeadLetterOriginalPayload,
}
impl DeadLetterRecord {
pub fn new(metadata: Metadata, original_payload: DeadLetterOriginalPayload) -> Self {
Self {
metadata,
original_payload,
}
}
pub fn from_failure<T>(
subscriber_id: &str,
envelope: &EventEnvelope<T>,
error: &EventBusError,
) -> Self
where
T: Clone + Send + Sync + 'static,
{
let failed_at_unix_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis().min(i64::MAX as u128) as i64)
.unwrap_or_default();
let mut metadata = Metadata::new()
.with("subscriber_id", subscriber_id.to_string())
.with("event_id", envelope.id().to_string())
.with("topic", envelope.topic().name().to_string())
.with("failure_reason", error.to_string())
.with("failure_type", error.kind().to_string())
.with("payload_type", type_name::<T>().to_string())
.with("failed_at_unix_millis", failed_at_unix_millis)
.with("dead_letter", true);
if let Some(ordering_key) = envelope.ordering_key() {
metadata.set("ordering_key", ordering_key.to_string());
}
Self::new(metadata, Arc::new(envelope.payload().clone()))
}
pub fn from_metadata_failure(
subscriber_id: &str,
metadata: EventEnvelopeMetadata,
original_payload: DeadLetterOriginalPayload,
error: &EventBusError,
) -> Self {
let failed_at_unix_millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis().min(i64::MAX as u128) as i64)
.unwrap_or_default();
let mut record_metadata = Metadata::new()
.with("subscriber_id", subscriber_id.to_string())
.with("event_id", metadata.id().to_string())
.with("topic", metadata.topic_name().to_string())
.with("failure_reason", error.to_string())
.with("failure_type", error.kind().to_string())
.with("payload_type", metadata.payload_type_name().to_string())
.with("failed_at_unix_millis", failed_at_unix_millis)
.with("dead_letter", true);
if let Some(ordering_key) = metadata.ordering_key() {
record_metadata.set("ordering_key", ordering_key.to_string());
}
Self::new(record_metadata, original_payload)
}
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
pub fn original_payload(&self) -> DeadLetterOriginalPayload {
Arc::clone(&self.original_payload)
}
pub fn downcast_original_payload_ref<T>(&self) -> Option<&T>
where
T: 'static,
{
self.original_payload.as_ref().downcast_ref::<T>()
}
}