use axum::http::HeaderMap;
use serde_json::Value as JsonValue;
use uuid::Uuid;
use crate::events::{
CausationId, CorrelationId, ENVELOPE_VERSION, EventBus, EventEnvelope, NodeId, PublishError,
StreamId, StreamRegistry, TraceContext,
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CommandId(String);
impl CommandId {
pub fn new() -> Self {
Self(Uuid::new_v4().to_string())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for CommandId {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, Default)]
pub struct RequestCtx {
traceparent: Option<String>,
tracestate: Option<String>,
request_id: Option<String>,
}
impl RequestCtx {
pub fn from_headers(headers: &HeaderMap) -> Self {
let pick = |name: &str| {
headers
.get(name)
.and_then(|v| v.to_str().ok())
.map(str::to_string)
};
Self {
traceparent: pick("traceparent"),
tracestate: pick("tracestate"),
request_id: pick("x-request-id"),
}
}
pub fn traceparent(&self) -> Option<&str> {
self.traceparent.as_deref()
}
pub fn tracestate(&self) -> Option<&str> {
self.tracestate.as_deref()
}
pub fn request_id(&self) -> Option<&str> {
self.request_id.as_deref()
}
}
#[derive(Clone)]
pub(crate) struct Publisher {
bus: EventBus,
registry: StreamRegistry,
}
impl Publisher {
pub(crate) fn new(bus: EventBus, registry: StreamRegistry) -> Self {
Self { bus, registry }
}
pub(crate) async fn publish(
&self,
plan: EnvelopePlan<'_>,
ctx: EmissionContext<'_>,
) -> Result<(), PublishError> {
publish_envelope(&self.bus, &self.registry, plan, ctx).await
}
}
pub(crate) struct EnvelopePlan<'a> {
pub node_id: &'a NodeId,
pub resource_id: &'a str,
pub resource_kind: &'a str,
pub stream: &'a str,
pub payload_kind: &'a str,
pub payload_version: u32,
pub data: JsonValue,
}
#[derive(Default)]
pub(crate) struct EmissionContext<'a> {
pub correlation: Option<&'a str>,
pub causation: Option<&'a str>,
pub trace: Option<TraceContext>,
}
pub(crate) async fn publish_envelope(
bus: &EventBus,
registry: &StreamRegistry,
plan: EnvelopePlan<'_>,
ctx: EmissionContext<'_>,
) -> Result<(), PublishError> {
let stream_id = StreamId::for_resource(plan.node_id, plan.resource_id, plan.stream);
let allocated = registry.allocate(&stream_id);
let env = EventEnvelope {
envelope_version: ENVELOPE_VERSION,
event_id: allocated.event_id,
node_id: plan.node_id.clone(),
resource_id: plan.resource_id.to_string(),
resource_kind: plan.resource_kind.to_string(),
resource_version: 1,
stream_id,
stream: plan.stream.to_string(),
sequence: allocated.sequence,
timestamp: time::OffsetDateTime::now_utc(),
payload_kind: plan.payload_kind.to_string(),
payload_version: plan.payload_version,
payload_schema: None,
correlation_id: ctx.correlation.map(|s| CorrelationId(s.to_string())),
causation_id: ctx.causation.map(|s| CausationId(s.to_string())),
trace_context: ctx.trace,
data: plan.data,
};
bus.publish(env).await.map(|_| ())
}