allora_core/
message.rs

1//! Messaging core types: `Payload`, `Message`, and `Exchange`.
2//!
3//! # Overview
4//! These types model the fundamental unit that flows through a Route / processors.
5//! * `Payload` – concrete body representation (text, bytes, json, empty).
6//! * `Message` – wraps a `Payload` plus string key/value headers (metadata).
7//! * `Exchange` – carries an inbound `Message`, an optional outbound `Message`, and free-form properties used during routing.
8//!
9//! # IDs & Correlation
10//! Every `Message` automatically gets a unique `message_id` header when constructed via `Message::new` / `Message::from_text`.
11//! A `correlation_id` is NOT generated automatically—call `Message::ensure_correlation_id()` or `Exchange::correlation_id()` (or use the `CorrelationInitializer` processor / `Route::with_correlation`) to lazily create one when needed (aggregation, split/join, request/reply, etc.).
12//!
13//! # Headers vs Properties
14//! * Headers live on the `Message` and are typically serialized/shared externally.
15//! * Properties live on the `Exchange` and are intended for transient, internal routing state.
16//! Choose headers when downstream systems or processors need the metadata; choose properties for ephemeral routing hints.
17//!
18//! # Thread Safety / Mutation
19//! These types are plain structs; mutation requires &mut access. If sharing between threads, wrap in synchronization primitives (e.g. Arc<Mutex<_>>). The library leaves concurrency control to callers for flexibility.
20//!
21//! # Serialization
22//! All types derive `Serialize`/`Deserialize` unconditionally; JSON bodies use `serde_json::Value`.
23//!
24//! # Examples
25//! Basic creation:
26//! ```no_run
27//! use allora_core::message::{Exchange, Message};
28//! let mut ex = Exchange::new(Message::from_text("ping"));
29//! assert_eq!(ex.in_msg.body_text(), Some("ping"));
30//! ```
31//!
32//! With bytes payload:
33//! ```no_run
34//! use allora_core::message::{Message, Payload};
35//! let msg = Message::from_text("hello");
36//! assert_eq!(msg.body_text(), Some("hello"));
37//! ```
38//!
39//! Adding and overriding headers:
40//! ```no_run
41//! use allora_core::message::Message;
42//! let msg = Message::from_text("demo");
43//! assert!(msg.header("missing").is_none());
44//! ```
45//! [`CorrelationInitializer`]: crate::patterns::correlation_initializer::CorrelationInitializer
46//! [`Route::with_correlation`]: crate::route::Route::with_correlation
47
48use serde::{Deserialize, Serialize};
49use std::collections::HashMap;
50
51/// Represents the payload of a message, supporting text, bytes, JSON, or empty.
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
53pub enum Payload {
54    /// UTF-8 text payload.
55    Text(String),
56    /// Raw bytes payload.
57    Bytes(Vec<u8>),
58    /// JSON value payload (only when serde feature enabled).
59    Json(serde_json::Value),
60    /// Empty payload.
61    Empty,
62}
63
64impl Default for Payload {
65    /// Returns an empty payload.
66    fn default() -> Self {
67        Payload::Empty
68    }
69}
70
71impl Payload {
72    /// Returns the payload as text, if available.
73    pub fn as_text(&self) -> Option<&str> {
74        match self {
75            Payload::Text(s) => Some(s),
76            _ => None,
77        }
78    }
79    /// Returns the payload as bytes, if available.
80    pub fn as_bytes(&self) -> Option<&[u8]> {
81        match self {
82            Payload::Bytes(b) => Some(b),
83            _ => None,
84        }
85    }
86}
87
88/// A message containing a payload and headers (metadata).
89///
90/// # Automatic Message ID
91/// On construction a UUID v4 is inserted under the `message_id` header. While you *can*
92/// override this with `set_header`, it is discouraged—other infrastructure (logging, tracing)
93/// may rely on uniqueness.
94///
95/// # Correlation ID
96/// Not created automatically to avoid overhead if unused. Call [`Message::ensure_correlation_id`]
97/// when a processor needs it. Subsequent calls return the same stable value.
98///
99/// # Header Semantics
100/// All headers are simple UTF-8 strings. Keep values small; store large / complex structures
101/// in the payload or properties instead.
102#[derive(Debug, Clone, Default, Serialize, Deserialize)]
103pub struct Message {
104    /// The message payload.
105    pub payload: Payload,
106    /// Arbitrary string headers for metadata.
107    pub headers: HashMap<String, String>,
108}
109
110impl Message {
111    /// Creates a new message with the given payload.
112    pub fn new(payload: Payload) -> Self {
113        let mut m = Self {
114            payload,
115            headers: HashMap::new(),
116        };
117        // Automatically assign a unique message id header.
118        let mid = uuid::Uuid::new_v4().to_string();
119        m.headers.insert("message_id".to_string(), mid);
120        m
121    }
122    /// Creates a new text message.
123    pub fn from_text<T: Into<String>>(s: T) -> Self {
124        Self::new(Payload::Text(s.into()))
125    }
126    /// Returns the message body as text, if available.
127    pub fn body_text(&self) -> Option<&str> {
128        self.payload.as_text()
129    }
130    /// Returns the value of a header, if present.
131    pub fn header(&self, key: &str) -> Option<&str> {
132        self.headers.get(key).map(|s| s.as_str())
133    }
134    /// Sets a header key-value pair.
135    pub fn set_header<K: Into<String>, V: Into<String>>(&mut self, k: K, v: V) {
136        self.headers.insert(k.into(), v.into());
137    }
138    /// Ensures a correlation id header exists; returns its value.
139    ///
140    /// If a `correlation_id` header is present it is returned unchanged; otherwise a new
141    /// UUID v4 is generated, inserted, and then returned.
142    ///
143    /// # Use Cases
144    /// * Aggregator / Splitter patterns grouping related messages.
145    /// * Request/Reply correlation.
146    /// * Distributed tracing boundary (can pair with trace/span ids).
147    ///
148    /// Prefer calling once early (or using `CorrelationInitializer`) rather than sprinkling
149    /// calls across processors.
150    pub fn ensure_correlation_id(&mut self) -> &str {
151        if !self.headers.contains_key("correlation_id") {
152            let cid = uuid::Uuid::new_v4().to_string();
153            self.headers.insert("correlation_id".to_string(), cid);
154        }
155        self.headers.get("correlation_id").unwrap()
156    }
157}
158
159/// An exchange wraps an inbound and outbound message, plus routing properties.
160///
161/// # Inbound vs Outbound
162/// * `in_msg` – original message entering the pipeline.
163/// * `out_msg` – optional transformed result set by processors (e.g. formatter, enricher).
164///
165/// # Properties
166/// Free-form key/value pairs for internal routing state (e.g. retry count, timing info).
167/// Not automatically serialized; use headers for externally visible metadata.
168///
169/// # Correlation Helper
170/// `Exchange::correlation_id` provides a convenience wrapper around `in_msg.ensure_correlation_id()`.
171#[derive(Debug, Clone, Default, Serialize, Deserialize)]
172pub struct Exchange {
173    /// The inbound message.
174    pub in_msg: Message,
175    /// The outbound message, if set.
176    pub out_msg: Option<Message>,
177    /// Arbitrary string properties for routing or context.
178    pub properties: HashMap<String, String>,
179}
180
181impl Exchange {
182    /// Creates a new exchange with the given inbound message.
183    pub fn new(msg: Message) -> Self {
184        Self {
185            in_msg: msg,
186            out_msg: None,
187            properties: HashMap::new(),
188        }
189    }
190    /// Sets a property key-value pair.
191    pub fn set_property<K: Into<String>, V: Into<String>>(&mut self, k: K, v: V) {
192        self.properties.insert(k.into(), v.into());
193    }
194    /// Returns the value of a property, if present.
195    pub fn property(&self, k: &str) -> Option<&str> {
196        self.properties.get(k).map(|s| s.as_str())
197    }
198    /// Ensures the inbound message has a correlation id, returning it.
199    /// Equivalent to `self.in_msg.ensure_correlation_id()`; provided for ergonomic chaining
200    /// in routing code.
201    pub fn correlation_id(&mut self) -> &str {
202        self.in_msg.ensure_correlation_id()
203    }
204}
205
206/// Alias for Payload type.
207pub use Payload as RawPayload;
208