Skip to main content

emergent_client/
message.rs

1//! Message types for the Emergent client library.
2
3use crate::types::{CausationId, CorrelationId, MessageId, MessageType, PrimitiveName, Timestamp};
4use serde::{Deserialize, Serialize, de::DeserializeOwned};
5
6/// Create a new message with the given type.
7///
8/// This is a convenience factory function that matches the Python and TypeScript SDKs.
9///
10/// # Panics
11///
12/// Panics if the message type is invalid.
13///
14/// # Example
15///
16/// ```rust
17/// use emergent_client::create_message;
18/// use serde_json::json;
19///
20/// let msg = create_message("timer.tick")
21///     .with_payload(json!({"count": 1}))
22///     .with_metadata(json!({"trace_id": "abc123"}));
23/// ```
24#[must_use]
25pub fn create_message(message_type: impl AsRef<str>) -> EmergentMessage {
26    EmergentMessage::new(message_type.as_ref())
27}
28
29/// Standard message envelope for all Emergent communications.
30///
31/// All messages in Emergent use this standard envelope format. Developers specify
32/// a `message_type` string and put their domain data in the `payload` field.
33///
34/// # Example
35///
36/// ```rust
37/// use emergent_client::EmergentMessage;
38/// use serde_json::json;
39///
40/// let message = EmergentMessage::new("user.created")
41///     .with_source("user_service")
42///     .with_payload(json!({
43///         "user_id": "u_12345",
44///         "email": "user@example.com"
45///     }));
46/// ```
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct EmergentMessage {
49    /// Unique message ID (TypeID format: msg_<uuid_v7>).
50    pub id: MessageId,
51
52    /// Message type for routing (e.g., "email.received", "timer.tick").
53    pub message_type: MessageType,
54
55    /// Source client that published this message.
56    pub source: PrimitiveName,
57
58    /// Optional correlation ID for request-response or tracing.
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub correlation_id: Option<CorrelationId>,
61
62    /// Optional causation ID (ID of message that triggered this one).
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub causation_id: Option<CausationId>,
65
66    /// Timestamp when message was created (Unix ms).
67    pub timestamp_ms: Timestamp,
68
69    /// User-defined payload (any serializable data).
70    pub payload: serde_json::Value,
71
72    /// Optional metadata for debugging, tracing, etc.
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub metadata: Option<serde_json::Value>,
75}
76
77impl EmergentMessage {
78    /// Create a new message with the given type.
79    ///
80    /// Generates a unique ID and sets the current timestamp.
81    ///
82    /// # Panics
83    ///
84    /// Panics if the message type is invalid.
85    #[must_use]
86    pub fn new(message_type: &str) -> Self {
87        Self::new_with_id_and_timestamp(message_type, MessageId::new(), Timestamp::now())
88    }
89
90    /// Create a new message with explicit ID and timestamp.
91    ///
92    /// This is a pure function suitable for testing and deterministic message creation.
93    /// For production use, prefer `new()` which generates a unique ID and current timestamp.
94    ///
95    /// # Panics
96    ///
97    /// Panics if the message type is invalid.
98    #[must_use]
99    pub fn new_with_id_and_timestamp(
100        message_type: &str,
101        id: MessageId,
102        timestamp_ms: Timestamp,
103    ) -> Self {
104        // For backwards compatibility, we'll panic on invalid message types
105        // In a future version, we might want to return Result instead
106        let msg_type = MessageType::new(message_type)
107            .unwrap_or_else(|e| panic!("invalid message type '{message_type}': {e}"));
108
109        // Use a default source that can be overwritten with with_source()
110        // We use "unknown" as a valid placeholder
111        let source = PrimitiveName::new("unknown")
112            .unwrap_or_else(|e| panic!("failed to create default source: {e}"));
113
114        Self {
115            id,
116            message_type: msg_type,
117            source,
118            correlation_id: None,
119            causation_id: None,
120            timestamp_ms,
121            payload: serde_json::Value::Null,
122            metadata: None,
123        }
124    }
125
126    /// Set the source of this message.
127    ///
128    /// # Panics
129    ///
130    /// Panics if the source name is invalid.
131    #[must_use]
132    pub fn with_source(mut self, source: &str) -> Self {
133        self.source = PrimitiveName::new(source)
134            .unwrap_or_else(|e| panic!("invalid source name '{source}': {e}"));
135        self
136    }
137
138    /// Set the payload of this message.
139    #[must_use]
140    pub fn with_payload(mut self, payload: impl Serialize) -> Self {
141        self.payload = serde_json::to_value(payload).unwrap_or(serde_json::Value::Null);
142        self
143    }
144
145    /// Set the correlation ID (for request-response patterns).
146    #[must_use]
147    pub fn with_correlation_id(mut self, id: impl Into<CorrelationId>) -> Self {
148        self.correlation_id = Some(id.into());
149        self
150    }
151
152    /// Set the causation ID (ID of the message that triggered this one).
153    #[must_use]
154    pub fn with_causation_id(mut self, id: impl Into<CausationId>) -> Self {
155        self.causation_id = Some(id.into());
156        self
157    }
158
159    /// Set the correlation ID from an optional value.
160    ///
161    /// Useful for copying correlation IDs from request messages to responses.
162    #[must_use]
163    pub fn with_correlation_id_option(mut self, id: Option<&CorrelationId>) -> Self {
164        self.correlation_id = id.cloned();
165        self
166    }
167
168    /// Set the causation ID from a MessageId.
169    ///
170    /// This is a convenience method that converts the MessageId to a CausationId.
171    #[must_use]
172    pub fn with_causation_from_message(mut self, msg_id: &MessageId) -> Self {
173        self.causation_id = Some(CausationId::from(msg_id));
174        self
175    }
176
177    /// Set optional metadata.
178    #[must_use]
179    pub fn with_metadata(mut self, metadata: impl Serialize) -> Self {
180        self.metadata = Some(serde_json::to_value(metadata).unwrap_or(serde_json::Value::Null));
181        self
182    }
183
184    /// Get the message ID.
185    #[must_use]
186    pub fn id(&self) -> &MessageId {
187        &self.id
188    }
189
190    /// Get the message type.
191    #[must_use]
192    pub fn message_type(&self) -> &MessageType {
193        &self.message_type
194    }
195
196    /// Get the source.
197    #[must_use]
198    pub fn source(&self) -> &PrimitiveName {
199        &self.source
200    }
201
202    /// Get the raw payload value.
203    #[must_use]
204    pub fn payload(&self) -> &serde_json::Value {
205        &self.payload
206    }
207
208    /// Deserialize the payload into a specific type.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the payload cannot be deserialized into type `T`.
213    pub fn payload_as<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
214        serde_json::from_value(self.payload.clone())
215    }
216
217    /// Check whether this message has an exec-source payload shape.
218    ///
219    /// Returns `true` if the payload is a JSON object with a `stdout` string field,
220    /// which is the envelope format produced by the `exec-source` primitive.
221    #[must_use]
222    pub fn has_stdout_payload(&self) -> bool {
223        self.payload
224            .as_object()
225            .and_then(|obj| obj.get("stdout"))
226            .is_some_and(serde_json::Value::is_string)
227    }
228
229    /// Unwrap an exec-source payload by extracting and parsing the `stdout` field.
230    ///
231    /// If the payload is an object with a `stdout` string field (the envelope format
232    /// produced by `exec-source`), extracts that string and attempts to parse it as
233    /// JSON. If parsing succeeds, the payload is replaced with the parsed value. If
234    /// `stdout` is not valid JSON, the payload is replaced with the raw string value.
235    ///
236    /// If the payload does not have the exec-source shape, the message is returned
237    /// unchanged.
238    ///
239    /// This eliminates the need for a dedicated exec-handler running
240    /// `jq -c '.stdout | fromjson'` in the pipeline.
241    #[must_use]
242    pub fn unwrap_stdout(mut self) -> Self {
243        if let Some(stdout) = self
244            .payload
245            .as_object()
246            .and_then(|obj| obj.get("stdout"))
247            .and_then(serde_json::Value::as_str)
248            .map(String::from)
249        {
250            self.payload = serde_json::from_str(&stdout)
251                .unwrap_or(serde_json::Value::String(stdout));
252        }
253        self
254    }
255
256    /// Serialize the message to JSON bytes.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if serialization fails.
261    pub fn to_json(&self) -> Result<Vec<u8>, serde_json::Error> {
262        serde_json::to_vec(self)
263    }
264
265    /// Deserialize a message from JSON bytes.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if deserialization fails.
270    pub fn from_json(data: &[u8]) -> Result<Self, serde_json::Error> {
271        serde_json::from_slice(data)
272    }
273
274    /// Serialize the message to MessagePack bytes.
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if serialization fails.
279    pub fn to_msgpack(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
280        rmp_serde::to_vec_named(self)
281    }
282
283    /// Deserialize a message from MessagePack bytes.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if deserialization fails.
288    pub fn from_msgpack(data: &[u8]) -> Result<Self, rmp_serde::decode::Error> {
289        rmp_serde::from_slice(data)
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use serde_json::json;
297
298    #[test]
299    fn test_message_creation() {
300        let msg = EmergentMessage::new("test.event")
301            .with_source("test_source")
302            .with_payload(json!({"key": "value"}));
303
304        assert!(msg.id.to_string().starts_with("msg_"));
305        assert_eq!(msg.message_type.as_str(), "test.event");
306        assert_eq!(msg.source.as_str(), "test_source");
307        assert!(msg.timestamp_ms.as_millis() > 0);
308    }
309
310    #[test]
311    fn test_message_serialization() -> Result<(), Box<dyn std::error::Error>> {
312        let msg = EmergentMessage::new("test.event")
313            .with_source("test")
314            .with_payload(json!({"num": 42}));
315
316        // Test JSON serialization
317        let json_bytes = msg.to_json()?;
318        let from_json = EmergentMessage::from_json(&json_bytes)?;
319        assert_eq!(from_json.message_type.as_str(), "test.event");
320
321        // Test MessagePack serialization
322        let msgpack_bytes = msg.to_msgpack()?;
323        let from_msgpack = EmergentMessage::from_msgpack(&msgpack_bytes)?;
324        assert_eq!(from_msgpack.message_type.as_str(), "test.event");
325        Ok(())
326    }
327
328    #[test]
329    fn test_payload_extraction() -> Result<(), Box<dyn std::error::Error>> {
330        #[derive(Debug, Deserialize, PartialEq)]
331        struct TestPayload {
332            count: u32,
333            name: String,
334        }
335
336        let msg = EmergentMessage::new("test.event").with_payload(json!({
337            "count": 42,
338            "name": "test"
339        }));
340
341        let payload: TestPayload = msg.payload_as()?;
342        assert_eq!(payload.count, 42);
343        assert_eq!(payload.name, "test");
344        Ok(())
345    }
346
347    #[test]
348    fn test_message_tracing() {
349        let original = EmergentMessage::new("request");
350        let response = EmergentMessage::new("response")
351            .with_causation_from_message(original.id())
352            .with_correlation_id(CorrelationId::new());
353
354        assert_eq!(
355            response.causation_id.as_ref().map(|c| c.to_string()),
356            Some(original.id().to_string())
357        );
358        assert!(response.correlation_id.is_some());
359    }
360
361    #[test]
362    fn test_unwrap_stdout_json() {
363        let msg = EmergentMessage::new("batch.raw").with_payload(json!({
364            "command": "jq -s .",
365            "stdout": "{\"transactions\":[1,2,3]}",
366            "exit_code": 0
367        }));
368
369        assert!(msg.has_stdout_payload());
370        let unwrapped = msg.unwrap_stdout();
371        assert_eq!(unwrapped.payload(), &json!({"transactions": [1, 2, 3]}));
372    }
373
374    #[test]
375    fn test_unwrap_stdout_plain_text() {
376        let msg = EmergentMessage::new("exec.output").with_payload(json!({
377            "command": "echo hello",
378            "stdout": "hello world",
379            "exit_code": 0
380        }));
381
382        let unwrapped = msg.unwrap_stdout();
383        assert_eq!(unwrapped.payload(), &json!("hello world"));
384    }
385
386    #[test]
387    fn test_unwrap_stdout_no_stdout_field() {
388        let msg = EmergentMessage::new("timer.tick")
389            .with_payload(json!({"count": 42}));
390
391        assert!(!msg.has_stdout_payload());
392        let unwrapped = msg.unwrap_stdout();
393        assert_eq!(unwrapped.payload(), &json!({"count": 42}));
394    }
395
396    #[test]
397    fn test_unwrap_stdout_system_event_passthrough() {
398        let msg = EmergentMessage::new("system.started.foo")
399            .with_payload(json!({"kind": "handler"}));
400
401        assert!(!msg.has_stdout_payload());
402        let unwrapped = msg.unwrap_stdout();
403        assert_eq!(unwrapped.payload(), &json!({"kind": "handler"}));
404    }
405
406    #[test]
407    fn test_new_with_id_and_timestamp_is_pure() {
408        let id = MessageId::new();
409        let timestamp = Timestamp::from_millis(1704067200000); // 2024-01-01 00:00:00 UTC
410
411        let msg1 = EmergentMessage::new_with_id_and_timestamp("test.event", id.clone(), timestamp);
412        let msg2 = EmergentMessage::new_with_id_and_timestamp("test.event", id.clone(), timestamp);
413
414        // Pure function should produce identical results for identical inputs
415        assert_eq!(msg1.id, msg2.id);
416        assert_eq!(msg1.message_type, msg2.message_type);
417        assert_eq!(msg1.timestamp_ms, msg2.timestamp_ms);
418    }
419}