daedalus_core/
messages.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use serde::{Deserialize, Serialize};
5
6use crate::clock::Tick;
7use crate::ids::ChannelId;
8
9/// Monotonic sequence number for newest-wins/broadcast helpers.
10///
11/// ```
12/// use daedalus_core::messages::Sequence;
13/// let seq = Sequence::new(3);
14/// assert_eq!(seq.next().value(), 4);
15/// ```
16#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
17#[serde(transparent)]
18pub struct Sequence(u64);
19
20impl Sequence {
21    pub const ZERO: Sequence = Sequence(0);
22
23    pub fn new(value: u64) -> Self {
24        Sequence(value)
25    }
26
27    pub fn value(self) -> u64 {
28        self.0
29    }
30
31    pub fn next(self) -> Self {
32        Sequence(self.0.saturating_add(1))
33    }
34}
35
36/// Unique token carried by every message.
37///
38/// ```
39/// use daedalus_core::messages::Token;
40/// let token = Token::new(42);
41/// assert_eq!(token.value(), 42);
42/// ```
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct Token(u64);
46
47impl Token {
48    pub fn new(raw: u64) -> Self {
49        Token(raw)
50    }
51
52    pub fn value(self) -> u64 {
53        self.0
54    }
55}
56
57/// Generates monotonic tokens in a thread-safe manner.
58///
59/// ```
60/// use daedalus_core::messages::TokenGenerator;
61/// fn main() {
62///     let generator = TokenGenerator::new();
63///     let a = generator.next();
64///     let b = generator.next();
65///     assert!(b.value() > a.value());
66/// }
67/// ```
68#[derive(Debug, Default)]
69pub struct TokenGenerator {
70    counter: AtomicU64,
71}
72
73impl TokenGenerator {
74    pub fn new() -> Self {
75        Self {
76            counter: AtomicU64::new(1),
77        }
78    }
79
80    pub fn next(&self) -> Token {
81        let id = self.counter.fetch_add(1, Ordering::Relaxed);
82        Token::new(id)
83    }
84}
85
86/// Watermark used to signal progress on a stream.
87///
88/// ```
89/// use daedalus_core::clock::Tick;
90/// use daedalus_core::messages::{Sequence, Watermark};
91/// let wm = Watermark::new(Sequence::new(5), Tick::new(9));
92/// assert_eq!(wm.sequence().value(), 5);
93/// assert_eq!(wm.tick().value(), 9);
94/// ```
95#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
96pub struct Watermark {
97    sequence: Sequence,
98    tick: Tick,
99}
100
101impl Watermark {
102    pub fn new(sequence: Sequence, tick: Tick) -> Self {
103        Self { sequence, tick }
104    }
105
106    pub fn sequence(&self) -> Sequence {
107        self.sequence
108    }
109
110    pub fn tick(&self) -> Tick {
111        self.tick
112    }
113}
114
115/// Metadata attached to messages for diagnostics/telemetry.
116///
117/// ```
118/// use daedalus_core::clock::Tick;
119/// use daedalus_core::messages::{MessageMeta, Sequence};
120/// let meta = MessageMeta::new(Tick::new(2), Sequence::new(1));
121/// assert_eq!(meta.sequence.value(), 1);
122/// ```
123#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
124pub struct MessageMeta {
125    pub created_at: Tick,
126    pub source: Option<ChannelId>,
127    pub sequence: Sequence,
128}
129
130impl MessageMeta {
131    pub fn new(created_at: Tick, sequence: Sequence) -> Self {
132        Self {
133            created_at,
134            source: None,
135            sequence,
136        }
137    }
138
139    pub fn with_source(mut self, source: ChannelId) -> Self {
140        self.source = Some(source);
141        self
142    }
143}
144
145impl Default for MessageMeta {
146    fn default() -> Self {
147        Self {
148            created_at: Tick::ZERO,
149            source: None,
150            sequence: Sequence::ZERO,
151        }
152    }
153}
154
155/// Envelope carrying payload plus token/metadata.
156///
157/// Payload `T` must be `Send + Sync` to be safely shared in async/concurrent
158/// runtimes.
159///
160/// ```
161/// use daedalus_core::clock::Tick;
162/// use daedalus_core::messages::{Message, MessageMeta, Sequence, Token};
163///
164/// let meta = MessageMeta::new(Tick::new(1), Sequence::new(0));
165/// let msg = Message::new(Token::new(7), meta, "payload");
166/// assert_eq!(msg.payload.as_ref(), &"payload");
167/// ```
168#[derive(Clone, Debug, Serialize, Deserialize)]
169pub struct Message<T: Send + Sync> {
170    pub token: Token,
171    pub meta: MessageMeta,
172    pub payload: Arc<T>,
173}
174
175impl<T: Send + Sync> Message<T> {
176    pub fn new(token: Token, meta: MessageMeta, payload: T) -> Self {
177        Self {
178            token,
179            meta,
180            payload: Arc::new(payload),
181        }
182    }
183
184    /// Map payload while preserving token/metadata.
185    pub fn map<U: Send + Sync>(self, f: impl FnOnce(Arc<T>) -> U) -> Message<U> {
186        let payload = f(self.payload);
187        Message {
188            token: self.token,
189            meta: self.meta,
190            payload: Arc::new(payload),
191        }
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::clock::TickClock;
199
200    #[test]
201    fn token_generator_monotonic() {
202        let token_gen = TokenGenerator::new();
203        let t1 = token_gen.next();
204        let t2 = token_gen.next();
205        assert!(t2.value() > t1.value());
206    }
207
208    #[test]
209    fn message_round_trip() {
210        let clock = TickClock::default();
211        let token = Token::new(1);
212        let meta = MessageMeta::new(clock.now_tick(), Sequence::ZERO);
213        let msg = Message::new(token, meta, "payload");
214        assert_eq!(msg.token.value(), 1);
215        assert_eq!(msg.meta.sequence.value(), 0);
216        assert_eq!(msg.payload.as_ref(), &"payload");
217    }
218}