daedalus_core/
messages.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use serde::{Deserialize, Serialize};
5
6use crate::clock::Tick;
7use crate::ids::ChannelId;
8
9/// Monotonic sequence number for newest-wins/broadcast helpers.
10///
11/// ```
12/// use daedalus_core::messages::Sequence;
13/// let seq = Sequence::new(3);
14/// assert_eq!(seq.next().value(), 4);
15/// ```
16#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
17#[serde(transparent)]
18pub struct Sequence(u64);
19
20impl Sequence {
21    pub const ZERO: Sequence = Sequence(0);
22
23    pub fn new(value: u64) -> Self {
24        Sequence(value)
25    }
26
27    pub fn value(self) -> u64 {
28        self.0
29    }
30
31    pub fn next(self) -> Self {
32        Sequence(self.0.saturating_add(1))
33    }
34}
35
36/// Unique token carried by every message.
37///
38/// ```
39/// use daedalus_core::messages::Token;
40/// let token = Token::new(42);
41/// assert_eq!(token.value(), 42);
42/// ```
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct Token(u64);
46
47impl Token {
48    pub fn new(raw: u64) -> Self {
49        Token(raw)
50    }
51
52    pub fn value(self) -> u64 {
53        self.0
54    }
55}
56
57/// Generates monotonic tokens in a thread-safe manner.
58///
59/// ```
60/// use daedalus_core::messages::TokenGenerator;
61/// let generator = TokenGenerator::new();
62/// let a = generator.next();
63/// let b = generator.next();
64/// assert!(b.value() > a.value());
65/// ```
66#[derive(Debug, Default)]
67pub struct TokenGenerator {
68    counter: AtomicU64,
69}
70
71impl TokenGenerator {
72    pub fn new() -> Self {
73        Self {
74            counter: AtomicU64::new(1),
75        }
76    }
77
78    pub fn next(&self) -> Token {
79        let id = self.counter.fetch_add(1, Ordering::Relaxed);
80        Token::new(id)
81    }
82}
83
84/// Watermark used to signal progress on a stream.
85///
86/// ```
87/// use daedalus_core::clock::Tick;
88/// use daedalus_core::messages::{Sequence, Watermark};
89/// let wm = Watermark::new(Sequence::new(5), Tick::new(9));
90/// assert_eq!(wm.sequence().value(), 5);
91/// assert_eq!(wm.tick().value(), 9);
92/// ```
93#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct Watermark {
95    sequence: Sequence,
96    tick: Tick,
97}
98
99impl Watermark {
100    pub fn new(sequence: Sequence, tick: Tick) -> Self {
101        Self { sequence, tick }
102    }
103
104    pub fn sequence(&self) -> Sequence {
105        self.sequence
106    }
107
108    pub fn tick(&self) -> Tick {
109        self.tick
110    }
111}
112
113/// Metadata attached to messages for diagnostics/telemetry.
114///
115/// ```
116/// use daedalus_core::clock::Tick;
117/// use daedalus_core::messages::{MessageMeta, Sequence};
118/// let meta = MessageMeta::new(Tick::new(2), Sequence::new(1));
119/// assert_eq!(meta.sequence.value(), 1);
120/// ```
121#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub struct MessageMeta {
123    pub created_at: Tick,
124    pub source: Option<ChannelId>,
125    pub sequence: Sequence,
126}
127
128impl MessageMeta {
129    pub fn new(created_at: Tick, sequence: Sequence) -> Self {
130        Self {
131            created_at,
132            source: None,
133            sequence,
134        }
135    }
136
137    pub fn with_source(mut self, source: ChannelId) -> Self {
138        self.source = Some(source);
139        self
140    }
141}
142
143impl Default for MessageMeta {
144    fn default() -> Self {
145        Self {
146            created_at: Tick::ZERO,
147            source: None,
148            sequence: Sequence::ZERO,
149        }
150    }
151}
152
153/// Envelope carrying payload plus token/metadata.
154///
155/// Payload `T` must be `Send + Sync` to be safely shared in async/concurrent
156/// runtimes.
157///
158/// ```
159/// use daedalus_core::clock::Tick;
160/// use daedalus_core::messages::{Message, MessageMeta, Sequence, Token};
161///
162/// let meta = MessageMeta::new(Tick::new(1), Sequence::new(0));
163/// let msg = Message::new(Token::new(7), meta, "payload");
164/// assert_eq!(msg.payload.as_ref(), &"payload");
165/// ```
166#[derive(Clone, Debug, Serialize, Deserialize)]
167pub struct Message<T: Send + Sync> {
168    pub token: Token,
169    pub meta: MessageMeta,
170    pub payload: Arc<T>,
171}
172
173impl<T: Send + Sync> Message<T> {
174    pub fn new(token: Token, meta: MessageMeta, payload: T) -> Self {
175        Self {
176            token,
177            meta,
178            payload: Arc::new(payload),
179        }
180    }
181
182    /// Map payload while preserving token/metadata.
183    pub fn map<U: Send + Sync>(self, f: impl FnOnce(Arc<T>) -> U) -> Message<U> {
184        let payload = f(self.payload);
185        Message {
186            token: self.token,
187            meta: self.meta,
188            payload: Arc::new(payload),
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use crate::clock::TickClock;
197
198    #[test]
199    fn token_generator_monotonic() {
200        let token_gen = TokenGenerator::new();
201        let t1 = token_gen.next();
202        let t2 = token_gen.next();
203        assert!(t2.value() > t1.value());
204    }
205
206    #[test]
207    fn message_round_trip() {
208        let clock = TickClock::default();
209        let token = Token::new(1);
210        let meta = MessageMeta::new(clock.now_tick(), Sequence::ZERO);
211        let msg = Message::new(token, meta, "payload");
212        assert_eq!(msg.token.value(), 1);
213        assert_eq!(msg.meta.sequence.value(), 0);
214        assert_eq!(msg.payload.as_ref(), &"payload");
215    }
216}