allora_core/message.rs
1//! Messaging core types: `Payload`, `Message`, and `Exchange`.
2//!
3//! # Overview
4//! These types model the fundamental unit that flows through a Route / processors.
5//! * `Payload` – concrete body representation (text, bytes, json, empty).
6//! * `Message` – wraps a `Payload` plus string key/value headers (metadata).
7//! * `Exchange` – carries an inbound `Message`, an optional outbound `Message`, and free-form properties used during routing.
8//!
9//! # IDs & Correlation
10//! Every `Message` automatically gets a unique `message_id` header when constructed via `Message::new` / `Message::from_text`.
11//! A `correlation_id` is NOT generated automatically—call `Message::ensure_correlation_id()` or `Exchange::correlation_id()` (or use the `CorrelationInitializer` processor / `Route::with_correlation`) to lazily create one when needed (aggregation, split/join, request/reply, etc.).
12//!
13//! # Headers vs Properties
14//! * Headers live on the `Message` and are typically serialized/shared externally.
15//! * Properties live on the `Exchange` and are intended for transient, internal routing state.
16//! Choose headers when downstream systems or processors need the metadata; choose properties for ephemeral routing hints.
17//!
18//! # Thread Safety / Mutation
19//! These types are plain structs; mutation requires &mut access. If sharing between threads, wrap in synchronization primitives (e.g. Arc<Mutex<_>>). The library leaves concurrency control to callers for flexibility.
20//!
21//! # Serialization
22//! All types derive `Serialize`/`Deserialize` unconditionally; JSON bodies use `serde_json::Value`.
23//!
24//! # Examples
25//! Basic creation:
26//! ```no_run
27//! use allora_core::message::{Exchange, Message};
28//! let mut ex = Exchange::new(Message::from_text("ping"));
29//! assert_eq!(ex.in_msg.body_text(), Some("ping"));
30//! ```
31//!
32//! With bytes payload:
33//! ```no_run
34//! use allora_core::message::{Message, Payload};
35//! let msg = Message::from_text("hello");
36//! assert_eq!(msg.body_text(), Some("hello"));
37//! ```
38//!
39//! Adding and overriding headers:
40//! ```no_run
41//! use allora_core::message::Message;
42//! let msg = Message::from_text("demo");
43//! assert!(msg.header("missing").is_none());
44//! ```
45//! [`CorrelationInitializer`]: crate::patterns::correlation_initializer::CorrelationInitializer
46//! [`Route::with_correlation`]: crate::route::Route::with_correlation
47
48use serde::{Deserialize, Serialize};
49use std::collections::HashMap;
50
51/// Represents the payload of a message, supporting text, bytes, JSON, or empty.
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
53pub enum Payload {
54 /// UTF-8 text payload.
55 Text(String),
56 /// Raw bytes payload.
57 Bytes(Vec<u8>),
58 /// JSON value payload (only when serde feature enabled).
59 Json(serde_json::Value),
60 /// Empty payload.
61 Empty,
62}
63
64impl Default for Payload {
65 /// Returns an empty payload.
66 fn default() -> Self {
67 Payload::Empty
68 }
69}
70
71impl Payload {
72 /// Returns the payload as text, if available.
73 pub fn as_text(&self) -> Option<&str> {
74 match self {
75 Payload::Text(s) => Some(s),
76 _ => None,
77 }
78 }
79 /// Returns the payload as bytes, if available.
80 pub fn as_bytes(&self) -> Option<&[u8]> {
81 match self {
82 Payload::Bytes(b) => Some(b),
83 _ => None,
84 }
85 }
86}
87
88/// A message containing a payload and headers (metadata).
89///
90/// # Automatic Message ID
91/// On construction a UUID v4 is inserted under the `message_id` header. While you *can*
92/// override this with `set_header`, it is discouraged—other infrastructure (logging, tracing)
93/// may rely on uniqueness.
94///
95/// # Correlation ID
96/// Not created automatically to avoid overhead if unused. Call [`Message::ensure_correlation_id`]
97/// when a processor needs it. Subsequent calls return the same stable value.
98///
99/// # Header Semantics
100/// All headers are simple UTF-8 strings. Keep values small; store large / complex structures
101/// in the payload or properties instead.
102#[derive(Debug, Clone, Default, Serialize, Deserialize)]
103pub struct Message {
104 /// The message payload.
105 pub payload: Payload,
106 /// Arbitrary string headers for metadata.
107 pub headers: HashMap<String, String>,
108}
109
110impl Message {
111 /// Creates a new message with the given payload.
112 pub fn new(payload: Payload) -> Self {
113 let mut m = Self {
114 payload,
115 headers: HashMap::new(),
116 };
117 // Automatically assign a unique message id header.
118 let mid = uuid::Uuid::new_v4().to_string();
119 m.headers.insert("message_id".to_string(), mid);
120 m
121 }
122 /// Creates a new text message.
123 pub fn from_text<T: Into<String>>(s: T) -> Self {
124 Self::new(Payload::Text(s.into()))
125 }
126 /// Returns the message body as text, if available.
127 pub fn body_text(&self) -> Option<&str> {
128 self.payload.as_text()
129 }
130 /// Returns the value of a header, if present.
131 pub fn header(&self, key: &str) -> Option<&str> {
132 self.headers.get(key).map(|s| s.as_str())
133 }
134 /// Sets a header key-value pair.
135 pub fn set_header<K: Into<String>, V: Into<String>>(&mut self, k: K, v: V) {
136 self.headers.insert(k.into(), v.into());
137 }
138 /// Ensures a correlation id header exists; returns its value.
139 ///
140 /// If a `correlation_id` header is present it is returned unchanged; otherwise a new
141 /// UUID v4 is generated, inserted, and then returned.
142 ///
143 /// # Use Cases
144 /// * Aggregator / Splitter patterns grouping related messages.
145 /// * Request/Reply correlation.
146 /// * Distributed tracing boundary (can pair with trace/span ids).
147 ///
148 /// Prefer calling once early (or using `CorrelationInitializer`) rather than sprinkling
149 /// calls across processors.
150 pub fn ensure_correlation_id(&mut self) -> &str {
151 if !self.headers.contains_key("correlation_id") {
152 let cid = uuid::Uuid::new_v4().to_string();
153 self.headers.insert("correlation_id".to_string(), cid);
154 }
155 self.headers.get("correlation_id").unwrap()
156 }
157}
158
159/// An exchange wraps an inbound and outbound message, plus routing properties.
160///
161/// # Inbound vs Outbound
162/// * `in_msg` – original message entering the pipeline.
163/// * `out_msg` – optional transformed result set by processors (e.g. formatter, enricher).
164///
165/// # Properties
166/// Free-form key/value pairs for internal routing state (e.g. retry count, timing info).
167/// Not automatically serialized; use headers for externally visible metadata.
168///
169/// # Correlation Helper
170/// `Exchange::correlation_id` provides a convenience wrapper around `in_msg.ensure_correlation_id()`.
171#[derive(Debug, Clone, Default, Serialize, Deserialize)]
172pub struct Exchange {
173 /// The inbound message.
174 pub in_msg: Message,
175 /// The outbound message, if set.
176 pub out_msg: Option<Message>,
177 /// Arbitrary string properties for routing or context.
178 pub properties: HashMap<String, String>,
179}
180
181impl Exchange {
182 /// Creates a new exchange with the given inbound message.
183 pub fn new(msg: Message) -> Self {
184 Self {
185 in_msg: msg,
186 out_msg: None,
187 properties: HashMap::new(),
188 }
189 }
190 /// Sets a property key-value pair.
191 pub fn set_property<K: Into<String>, V: Into<String>>(&mut self, k: K, v: V) {
192 self.properties.insert(k.into(), v.into());
193 }
194 /// Returns the value of a property, if present.
195 pub fn property(&self, k: &str) -> Option<&str> {
196 self.properties.get(k).map(|s| s.as_str())
197 }
198 /// Ensures the inbound message has a correlation id, returning it.
199 /// Equivalent to `self.in_msg.ensure_correlation_id()`; provided for ergonomic chaining
200 /// in routing code.
201 pub fn correlation_id(&mut self) -> &str {
202 self.in_msg.ensure_correlation_id()
203 }
204}
205
206/// Alias for Payload type.
207pub use Payload as RawPayload;
208