emergent_client/
message.rs1use crate::types::{CausationId, CorrelationId, MessageId, MessageType, PrimitiveName, Timestamp};
4use serde::{Deserialize, Serialize, de::DeserializeOwned};
5
6#[must_use]
25pub fn create_message(message_type: impl AsRef<str>) -> EmergentMessage {
26 EmergentMessage::new(message_type.as_ref())
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct EmergentMessage {
49 pub id: MessageId,
51
52 pub message_type: MessageType,
54
55 pub source: PrimitiveName,
57
58 #[serde(skip_serializing_if = "Option::is_none")]
60 pub correlation_id: Option<CorrelationId>,
61
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub causation_id: Option<CausationId>,
65
66 pub timestamp_ms: Timestamp,
68
69 pub payload: serde_json::Value,
71
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub metadata: Option<serde_json::Value>,
75}
76
77impl EmergentMessage {
78 #[must_use]
86 pub fn new(message_type: &str) -> Self {
87 Self::new_with_id_and_timestamp(message_type, MessageId::new(), Timestamp::now())
88 }
89
90 #[must_use]
99 pub fn new_with_id_and_timestamp(
100 message_type: &str,
101 id: MessageId,
102 timestamp_ms: Timestamp,
103 ) -> Self {
104 let msg_type = MessageType::new(message_type)
107 .unwrap_or_else(|e| panic!("invalid message type '{message_type}': {e}"));
108
109 let source = PrimitiveName::new("unknown")
112 .unwrap_or_else(|e| panic!("failed to create default source: {e}"));
113
114 Self {
115 id,
116 message_type: msg_type,
117 source,
118 correlation_id: None,
119 causation_id: None,
120 timestamp_ms,
121 payload: serde_json::Value::Null,
122 metadata: None,
123 }
124 }
125
126 #[must_use]
132 pub fn with_source(mut self, source: &str) -> Self {
133 self.source = PrimitiveName::new(source)
134 .unwrap_or_else(|e| panic!("invalid source name '{source}': {e}"));
135 self
136 }
137
138 #[must_use]
140 pub fn with_payload(mut self, payload: impl Serialize) -> Self {
141 self.payload = serde_json::to_value(payload).unwrap_or(serde_json::Value::Null);
142 self
143 }
144
145 #[must_use]
147 pub fn with_correlation_id(mut self, id: impl Into<CorrelationId>) -> Self {
148 self.correlation_id = Some(id.into());
149 self
150 }
151
152 #[must_use]
154 pub fn with_causation_id(mut self, id: impl Into<CausationId>) -> Self {
155 self.causation_id = Some(id.into());
156 self
157 }
158
159 #[must_use]
163 pub fn with_correlation_id_option(mut self, id: Option<&CorrelationId>) -> Self {
164 self.correlation_id = id.cloned();
165 self
166 }
167
168 #[must_use]
172 pub fn with_causation_from_message(mut self, msg_id: &MessageId) -> Self {
173 self.causation_id = Some(CausationId::from(msg_id));
174 self
175 }
176
177 #[must_use]
179 pub fn with_metadata(mut self, metadata: impl Serialize) -> Self {
180 self.metadata = Some(serde_json::to_value(metadata).unwrap_or(serde_json::Value::Null));
181 self
182 }
183
184 #[must_use]
186 pub fn id(&self) -> &MessageId {
187 &self.id
188 }
189
190 #[must_use]
192 pub fn message_type(&self) -> &MessageType {
193 &self.message_type
194 }
195
196 #[must_use]
198 pub fn source(&self) -> &PrimitiveName {
199 &self.source
200 }
201
202 #[must_use]
204 pub fn payload(&self) -> &serde_json::Value {
205 &self.payload
206 }
207
208 pub fn payload_as<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
214 serde_json::from_value(self.payload.clone())
215 }
216
217 #[must_use]
222 pub fn has_stdout_payload(&self) -> bool {
223 self.payload
224 .as_object()
225 .and_then(|obj| obj.get("stdout"))
226 .is_some_and(serde_json::Value::is_string)
227 }
228
229 #[must_use]
242 pub fn unwrap_stdout(mut self) -> Self {
243 if let Some(stdout) = self
244 .payload
245 .as_object()
246 .and_then(|obj| obj.get("stdout"))
247 .and_then(serde_json::Value::as_str)
248 .map(String::from)
249 {
250 self.payload = serde_json::from_str(&stdout)
251 .unwrap_or(serde_json::Value::String(stdout));
252 }
253 self
254 }
255
256 pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
262 serde_json::to_vec(self)
263 }
264
265 pub fn from_json(data: &[u8]) -> Result<Self, serde_json::Error> {
271 serde_json::from_slice(data)
272 }
273
274 pub fn to_msgpack(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
280 rmp_serde::to_vec_named(self)
281 }
282
283 pub fn from_msgpack(data: &[u8]) -> Result<Self, rmp_serde::decode::Error> {
289 rmp_serde::from_slice(data)
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use serde_json::json;
297
298 #[test]
299 fn test_message_creation() {
300 let msg = EmergentMessage::new("test.event")
301 .with_source("test_source")
302 .with_payload(json!({"key": "value"}));
303
304 assert!(msg.id.to_string().starts_with("msg_"));
305 assert_eq!(msg.message_type.as_str(), "test.event");
306 assert_eq!(msg.source.as_str(), "test_source");
307 assert!(msg.timestamp_ms.as_millis() > 0);
308 }
309
310 #[test]
311 fn test_message_serialization() -> Result<(), Box<dyn std::error::Error>> {
312 let msg = EmergentMessage::new("test.event")
313 .with_source("test")
314 .with_payload(json!({"num": 42}));
315
316 let json_bytes = msg.to_json()?;
318 let from_json = EmergentMessage::from_json(&json_bytes)?;
319 assert_eq!(from_json.message_type.as_str(), "test.event");
320
321 let msgpack_bytes = msg.to_msgpack()?;
323 let from_msgpack = EmergentMessage::from_msgpack(&msgpack_bytes)?;
324 assert_eq!(from_msgpack.message_type.as_str(), "test.event");
325 Ok(())
326 }
327
328 #[test]
329 fn test_payload_extraction() -> Result<(), Box<dyn std::error::Error>> {
330 #[derive(Debug, Deserialize, PartialEq)]
331 struct TestPayload {
332 count: u32,
333 name: String,
334 }
335
336 let msg = EmergentMessage::new("test.event").with_payload(json!({
337 "count": 42,
338 "name": "test"
339 }));
340
341 let payload: TestPayload = msg.payload_as()?;
342 assert_eq!(payload.count, 42);
343 assert_eq!(payload.name, "test");
344 Ok(())
345 }
346
347 #[test]
348 fn test_message_tracing() {
349 let original = EmergentMessage::new("request");
350 let response = EmergentMessage::new("response")
351 .with_causation_from_message(original.id())
352 .with_correlation_id(CorrelationId::new());
353
354 assert_eq!(
355 response.causation_id.as_ref().map(|c| c.to_string()),
356 Some(original.id().to_string())
357 );
358 assert!(response.correlation_id.is_some());
359 }
360
361 #[test]
362 fn test_unwrap_stdout_json() {
363 let msg = EmergentMessage::new("batch.raw").with_payload(json!({
364 "command": "jq -s .",
365 "stdout": "{\"transactions\":[1,2,3]}",
366 "exit_code": 0
367 }));
368
369 assert!(msg.has_stdout_payload());
370 let unwrapped = msg.unwrap_stdout();
371 assert_eq!(unwrapped.payload(), &json!({"transactions": [1, 2, 3]}));
372 }
373
374 #[test]
375 fn test_unwrap_stdout_plain_text() {
376 let msg = EmergentMessage::new("exec.output").with_payload(json!({
377 "command": "echo hello",
378 "stdout": "hello world",
379 "exit_code": 0
380 }));
381
382 let unwrapped = msg.unwrap_stdout();
383 assert_eq!(unwrapped.payload(), &json!("hello world"));
384 }
385
386 #[test]
387 fn test_unwrap_stdout_no_stdout_field() {
388 let msg = EmergentMessage::new("timer.tick")
389 .with_payload(json!({"count": 42}));
390
391 assert!(!msg.has_stdout_payload());
392 let unwrapped = msg.unwrap_stdout();
393 assert_eq!(unwrapped.payload(), &json!({"count": 42}));
394 }
395
396 #[test]
397 fn test_unwrap_stdout_system_event_passthrough() {
398 let msg = EmergentMessage::new("system.started.foo")
399 .with_payload(json!({"kind": "handler"}));
400
401 assert!(!msg.has_stdout_payload());
402 let unwrapped = msg.unwrap_stdout();
403 assert_eq!(unwrapped.payload(), &json!({"kind": "handler"}));
404 }
405
406 #[test]
407 fn test_new_with_id_and_timestamp_is_pure() {
408 let id = MessageId::new();
409 let timestamp = Timestamp::from_millis(1704067200000); let msg1 = EmergentMessage::new_with_id_and_timestamp("test.event", id.clone(), timestamp);
412 let msg2 = EmergentMessage::new_with_id_and_timestamp("test.event", id.clone(), timestamp);
413
414 assert_eq!(msg1.id, msg2.id);
416 assert_eq!(msg1.message_type, msg2.message_type);
417 assert_eq!(msg1.timestamp_ms, msg2.timestamp_ms);
418 }
419}