use crate::types::{CausationId, CorrelationId, MessageId, MessageType, PrimitiveName, Timestamp};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
#[must_use]
pub fn create_message(message_type: impl AsRef<str>) -> EmergentMessage {
EmergentMessage::new(message_type.as_ref())
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EmergentMessage {
pub id: MessageId,
pub message_type: MessageType,
pub source: PrimitiveName,
#[serde(skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<CorrelationId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub causation_id: Option<CausationId>,
pub timestamp_ms: Timestamp,
pub payload: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<serde_json::Value>,
}
impl EmergentMessage {
#[must_use]
pub fn new(message_type: &str) -> Self {
Self::new_with_id_and_timestamp(message_type, MessageId::new(), Timestamp::now())
}
#[must_use]
pub fn new_with_id_and_timestamp(
message_type: &str,
id: MessageId,
timestamp_ms: Timestamp,
) -> Self {
let msg_type = MessageType::new(message_type)
.unwrap_or_else(|e| panic!("invalid message type '{message_type}': {e}"));
let source = PrimitiveName::new("unknown")
.unwrap_or_else(|e| panic!("failed to create default source: {e}"));
Self {
id,
message_type: msg_type,
source,
correlation_id: None,
causation_id: None,
timestamp_ms,
payload: serde_json::Value::Null,
metadata: None,
}
}
#[must_use]
pub fn with_source(mut self, source: &str) -> Self {
self.source = PrimitiveName::new(source)
.unwrap_or_else(|e| panic!("invalid source name '{source}': {e}"));
self
}
#[must_use]
pub fn with_payload(mut self, payload: impl Serialize) -> Self {
self.payload = serde_json::to_value(payload).unwrap_or(serde_json::Value::Null);
self
}
#[must_use]
pub fn with_correlation_id(mut self, id: impl Into<CorrelationId>) -> Self {
self.correlation_id = Some(id.into());
self
}
#[must_use]
pub fn with_causation_id(mut self, id: impl Into<CausationId>) -> Self {
self.causation_id = Some(id.into());
self
}
#[must_use]
pub fn with_correlation_id_option(mut self, id: Option<&CorrelationId>) -> Self {
self.correlation_id = id.cloned();
self
}
#[must_use]
pub fn with_causation_from_message(mut self, msg_id: &MessageId) -> Self {
self.causation_id = Some(CausationId::from(msg_id));
self
}
#[must_use]
pub fn with_metadata(mut self, metadata: impl Serialize) -> Self {
self.metadata = Some(serde_json::to_value(metadata).unwrap_or(serde_json::Value::Null));
self
}
#[must_use]
pub fn id(&self) -> &MessageId {
&self.id
}
#[must_use]
pub fn message_type(&self) -> &MessageType {
&self.message_type
}
#[must_use]
pub fn source(&self) -> &PrimitiveName {
&self.source
}
#[must_use]
pub fn payload(&self) -> &serde_json::Value {
&self.payload
}
pub fn payload_as<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_value(self.payload.clone())
}
#[must_use]
pub fn has_stdout_payload(&self) -> bool {
self.payload
.as_object()
.and_then(|obj| obj.get("stdout"))
.is_some_and(serde_json::Value::is_string)
}
#[must_use]
pub fn unwrap_stdout(mut self) -> Self {
if let Some(stdout) = self
.payload
.as_object()
.and_then(|obj| obj.get("stdout"))
.and_then(serde_json::Value::as_str)
.map(String::from)
{
self.payload = serde_json::from_str(&stdout)
.unwrap_or(serde_json::Value::String(stdout));
}
self
}
pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
pub fn from_json(data: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(data)
}
pub fn to_msgpack(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
rmp_serde::to_vec_named(self)
}
pub fn from_msgpack(data: &[u8]) -> Result<Self, rmp_serde::decode::Error> {
rmp_serde::from_slice(data)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_message_creation() {
let msg = EmergentMessage::new("test.event")
.with_source("test_source")
.with_payload(json!({"key": "value"}));
assert!(msg.id.to_string().starts_with("msg_"));
assert_eq!(msg.message_type.as_str(), "test.event");
assert_eq!(msg.source.as_str(), "test_source");
assert!(msg.timestamp_ms.as_millis() > 0);
}
#[test]
fn test_message_serialization() -> Result<(), Box<dyn std::error::Error>> {
let msg = EmergentMessage::new("test.event")
.with_source("test")
.with_payload(json!({"num": 42}));
let json_bytes = msg.to_json()?;
let from_json = EmergentMessage::from_json(&json_bytes)?;
assert_eq!(from_json.message_type.as_str(), "test.event");
let msgpack_bytes = msg.to_msgpack()?;
let from_msgpack = EmergentMessage::from_msgpack(&msgpack_bytes)?;
assert_eq!(from_msgpack.message_type.as_str(), "test.event");
Ok(())
}
#[test]
fn test_payload_extraction() -> Result<(), Box<dyn std::error::Error>> {
#[derive(Debug, Deserialize, PartialEq)]
struct TestPayload {
count: u32,
name: String,
}
let msg = EmergentMessage::new("test.event").with_payload(json!({
"count": 42,
"name": "test"
}));
let payload: TestPayload = msg.payload_as()?;
assert_eq!(payload.count, 42);
assert_eq!(payload.name, "test");
Ok(())
}
#[test]
fn test_message_tracing() {
let original = EmergentMessage::new("request");
let response = EmergentMessage::new("response")
.with_causation_from_message(original.id())
.with_correlation_id(CorrelationId::new());
assert_eq!(
response.causation_id.as_ref().map(|c| c.to_string()),
Some(original.id().to_string())
);
assert!(response.correlation_id.is_some());
}
#[test]
fn test_unwrap_stdout_json() {
let msg = EmergentMessage::new("batch.raw").with_payload(json!({
"command": "jq -s .",
"stdout": "{\"transactions\":[1,2,3]}",
"exit_code": 0
}));
assert!(msg.has_stdout_payload());
let unwrapped = msg.unwrap_stdout();
assert_eq!(unwrapped.payload(), &json!({"transactions": [1, 2, 3]}));
}
#[test]
fn test_unwrap_stdout_plain_text() {
let msg = EmergentMessage::new("exec.output").with_payload(json!({
"command": "echo hello",
"stdout": "hello world",
"exit_code": 0
}));
let unwrapped = msg.unwrap_stdout();
assert_eq!(unwrapped.payload(), &json!("hello world"));
}
#[test]
fn test_unwrap_stdout_no_stdout_field() {
let msg = EmergentMessage::new("timer.tick")
.with_payload(json!({"count": 42}));
assert!(!msg.has_stdout_payload());
let unwrapped = msg.unwrap_stdout();
assert_eq!(unwrapped.payload(), &json!({"count": 42}));
}
#[test]
fn test_unwrap_stdout_system_event_passthrough() {
let msg = EmergentMessage::new("system.started.foo")
.with_payload(json!({"kind": "handler"}));
assert!(!msg.has_stdout_payload());
let unwrapped = msg.unwrap_stdout();
assert_eq!(unwrapped.payload(), &json!({"kind": "handler"}));
}
#[test]
fn test_new_with_id_and_timestamp_is_pure() {
let id = MessageId::new();
let timestamp = Timestamp::from_millis(1704067200000);
let msg1 = EmergentMessage::new_with_id_and_timestamp("test.event", id.clone(), timestamp);
let msg2 = EmergentMessage::new_with_id_and_timestamp("test.event", id.clone(), timestamp);
assert_eq!(msg1.id, msg2.id);
assert_eq!(msg1.message_type, msg2.message_type);
assert_eq!(msg1.timestamp_ms, msg2.timestamp_ms);
}
}