use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::error::{MqttError, Result};
use crate::topic::TopicAddress;
const KEY_TIMESTAMP: &str = "ts";
const KEY_SOURCE_SERVICE: &str = "svc";
const KEY_SOURCE_INSTANCE: &str = "inst";
const KEY_OPERATOR_ID: &str = "op";
const KEY_SCHEMA_VERSION: &str = "sv";
const KEY_CORRELATION_ID: &str = "cid";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Metadata {
pub timestamp: DateTime<Utc>,
pub source_service: String,
pub source_instance: String,
pub operator_id: String,
pub schema_version: u16,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct Envelope<T> {
pub meta: Metadata,
pub payload: T,
}
#[derive(Debug, Clone)]
pub struct Message<T> {
pub envelope: Envelope<T>,
pub address: TopicAddress,
pub qos: u8,
pub retained: bool,
}
pub fn encode_user_properties(meta: &Metadata) -> Vec<(String, String)> {
let mut props = vec![
(KEY_TIMESTAMP.into(), meta.timestamp.to_rfc3339()),
(KEY_SOURCE_SERVICE.into(), meta.source_service.clone()),
(KEY_SOURCE_INSTANCE.into(), meta.source_instance.clone()),
(KEY_OPERATOR_ID.into(), meta.operator_id.clone()),
(KEY_SCHEMA_VERSION.into(), meta.schema_version.to_string()),
];
if let Some(cid) = &meta.correlation_id {
props.push((KEY_CORRELATION_ID.into(), cid.clone()));
}
props
}
pub fn decode_user_properties(props: &[(String, String)]) -> Result<Metadata> {
let find = |key: &str| -> Option<&str> {
props.iter().find(|(k, _)| k == key).map(|(_, v)| v.as_str())
};
let ts_str = find(KEY_TIMESTAMP).ok_or_else(|| {
MqttError::InvalidTopic(format!("missing user property `{KEY_TIMESTAMP}`"))
})?;
let timestamp = DateTime::parse_from_rfc3339(ts_str)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| MqttError::InvalidTopic(format!("invalid timestamp `{ts_str}`: {e}")))?;
let source_service = find(KEY_SOURCE_SERVICE)
.ok_or_else(|| {
MqttError::InvalidTopic(format!("missing user property `{KEY_SOURCE_SERVICE}`"))
})?
.to_owned();
let source_instance = find(KEY_SOURCE_INSTANCE)
.ok_or_else(|| {
MqttError::InvalidTopic(format!("missing user property `{KEY_SOURCE_INSTANCE}`"))
})?
.to_owned();
let operator_id = find(KEY_OPERATOR_ID)
.ok_or_else(|| {
MqttError::InvalidTopic(format!("missing user property `{KEY_OPERATOR_ID}`"))
})?
.to_owned();
let sv_str = find(KEY_SCHEMA_VERSION).ok_or_else(|| {
MqttError::InvalidTopic(format!("missing user property `{KEY_SCHEMA_VERSION}`"))
})?;
let schema_version: u16 = sv_str
.parse()
.map_err(|e| MqttError::InvalidTopic(format!("invalid schema_version `{sv_str}`: {e}")))?;
let correlation_id = find(KEY_CORRELATION_ID).map(str::to_owned);
Ok(Metadata {
timestamp,
source_service,
source_instance,
operator_id,
schema_version,
correlation_id,
})
}
pub fn encode_payload<T: Serialize>(payload: &T) -> Result<Vec<u8>> {
serde_json::to_vec(payload).map_err(MqttError::from)
}
pub fn decode_payload<T: DeserializeOwned>(bytes: &[u8], topic: &str) -> Result<T> {
serde_json::from_slice(bytes)
.map_err(|e| MqttError::Deserialization { topic: topic.to_owned(), source: e })
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn sample_meta() -> Metadata {
Metadata {
timestamp: Utc.with_ymd_and_hms(2026, 4, 25, 12, 0, 0).unwrap(),
source_service: "omegon".into(),
source_instance: "abc123".into(),
operator_id: "op1".into(),
schema_version: 1,
correlation_id: None,
}
}
#[test]
fn user_properties_roundtrip() {
let meta = sample_meta();
let props = encode_user_properties(&meta);
let decoded = decode_user_properties(&props).expect("decode should succeed");
assert_eq!(decoded, meta);
}
#[test]
fn user_properties_with_correlation_id() {
let mut meta = sample_meta();
meta.correlation_id = Some("req-42".into());
let props = encode_user_properties(&meta);
let decoded = decode_user_properties(&props).expect("decode should succeed");
assert_eq!(decoded.correlation_id, Some("req-42".into()));
}
#[test]
fn decode_rejects_missing_timestamp() {
let props = vec![
("svc".into(), "omegon".into()),
("inst".into(), "abc".into()),
("op".into(), "op1".into()),
("sv".into(), "1".into()),
];
assert!(decode_user_properties(&props).is_err());
}
#[test]
fn payload_roundtrip() {
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct TestEvent {
turn: u32,
}
let evt = TestEvent { turn: 7 };
let bytes = encode_payload(&evt).expect("encode");
let decoded: TestEvent = decode_payload(&bytes, "test/topic").expect("decode");
assert_eq!(decoded, evt);
}
}