use serde_json::Value;
use time::OffsetDateTime;
use crate::ids::{
CausationId, ConversationId, CorrelationId, EventId, ProcessId, StreamId, TenantId,
};
use crate::version::WorkflowId;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct NewEvent {
pub correlation_id: CorrelationId,
pub causation_id: Option<CausationId>,
pub conversation_id: ConversationId,
pub process_id: ProcessId,
pub tenant_id: TenantId,
pub workflow_id: WorkflowId,
pub event_type: Box<str>,
pub schema_version: u32,
pub payload: Value,
}
impl NewEvent {
#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
correlation_id: CorrelationId,
causation_id: Option<CausationId>,
conversation_id: ConversationId,
process_id: ProcessId,
tenant_id: TenantId,
workflow_id: WorkflowId,
event_type: impl Into<Box<str>>,
schema_version: u32,
payload: Value,
) -> Self {
Self {
correlation_id,
causation_id,
conversation_id,
process_id,
tenant_id,
workflow_id,
event_type: event_type.into(),
schema_version,
payload,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EventEnvelope {
pub event_id: EventId,
pub stream_id: StreamId,
pub sequence_number: u64,
#[serde(with = "time::serde::rfc3339")]
pub timestamp: OffsetDateTime,
pub correlation_id: CorrelationId,
pub causation_id: Option<CausationId>,
pub conversation_id: ConversationId,
pub process_id: ProcessId,
pub tenant_id: TenantId,
pub workflow_id: WorkflowId,
pub event_type: Box<str>,
pub schema_version: u32,
pub payload: Value,
}
impl EventEnvelope {
pub fn decode<T: serde::de::DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.payload.clone())
}
pub fn decode_owned<T: serde::de::DeserializeOwned>(self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.payload)
}
#[must_use]
pub fn from_new(
new: NewEvent,
stream_id: StreamId,
sequence_number: u64,
timestamp: OffsetDateTime,
) -> Self {
Self {
event_id: EventId::new(),
stream_id,
sequence_number,
timestamp,
correlation_id: new.correlation_id,
causation_id: new.causation_id,
conversation_id: new.conversation_id,
process_id: new.process_id,
tenant_id: new.tenant_id,
workflow_id: new.workflow_id,
event_type: new.event_type,
schema_version: new.schema_version,
payload: new.payload,
}
}
}