daedalus_core/
messages.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use serde::{Deserialize, Serialize};
5
6use crate::clock::Tick;
7use crate::ids::ChannelId;
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
17#[serde(transparent)]
18pub struct Sequence(u64);
19
20impl Sequence {
21 pub const ZERO: Sequence = Sequence(0);
22
23 pub fn new(value: u64) -> Self {
24 Sequence(value)
25 }
26
27 pub fn value(self) -> u64 {
28 self.0
29 }
30
31 pub fn next(self) -> Self {
32 Sequence(self.0.saturating_add(1))
33 }
34}
35
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct Token(u64);
46
47impl Token {
48 pub fn new(raw: u64) -> Self {
49 Token(raw)
50 }
51
52 pub fn value(self) -> u64 {
53 self.0
54 }
55}
56
57#[derive(Debug, Default)]
67pub struct TokenGenerator {
68 counter: AtomicU64,
69}
70
71impl TokenGenerator {
72 pub fn new() -> Self {
73 Self {
74 counter: AtomicU64::new(1),
75 }
76 }
77
78 pub fn next(&self) -> Token {
79 let id = self.counter.fetch_add(1, Ordering::Relaxed);
80 Token::new(id)
81 }
82}
83
84#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct Watermark {
95 sequence: Sequence,
96 tick: Tick,
97}
98
99impl Watermark {
100 pub fn new(sequence: Sequence, tick: Tick) -> Self {
101 Self { sequence, tick }
102 }
103
104 pub fn sequence(&self) -> Sequence {
105 self.sequence
106 }
107
108 pub fn tick(&self) -> Tick {
109 self.tick
110 }
111}
112
113#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
122pub struct MessageMeta {
123 pub created_at: Tick,
124 pub source: Option<ChannelId>,
125 pub sequence: Sequence,
126}
127
128impl MessageMeta {
129 pub fn new(created_at: Tick, sequence: Sequence) -> Self {
130 Self {
131 created_at,
132 source: None,
133 sequence,
134 }
135 }
136
137 pub fn with_source(mut self, source: ChannelId) -> Self {
138 self.source = Some(source);
139 self
140 }
141}
142
143impl Default for MessageMeta {
144 fn default() -> Self {
145 Self {
146 created_at: Tick::ZERO,
147 source: None,
148 sequence: Sequence::ZERO,
149 }
150 }
151}
152
153#[derive(Clone, Debug, Serialize, Deserialize)]
167pub struct Message<T: Send + Sync> {
168 pub token: Token,
169 pub meta: MessageMeta,
170 pub payload: Arc<T>,
171}
172
173impl<T: Send + Sync> Message<T> {
174 pub fn new(token: Token, meta: MessageMeta, payload: T) -> Self {
175 Self {
176 token,
177 meta,
178 payload: Arc::new(payload),
179 }
180 }
181
182 pub fn map<U: Send + Sync>(self, f: impl FnOnce(Arc<T>) -> U) -> Message<U> {
184 let payload = f(self.payload);
185 Message {
186 token: self.token,
187 meta: self.meta,
188 payload: Arc::new(payload),
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use crate::clock::TickClock;
197
198 #[test]
199 fn token_generator_monotonic() {
200 let token_gen = TokenGenerator::new();
201 let t1 = token_gen.next();
202 let t2 = token_gen.next();
203 assert!(t2.value() > t1.value());
204 }
205
206 #[test]
207 fn message_round_trip() {
208 let clock = TickClock::default();
209 let token = Token::new(1);
210 let meta = MessageMeta::new(clock.now_tick(), Sequence::ZERO);
211 let msg = Message::new(token, meta, "payload");
212 assert_eq!(msg.token.value(), 1);
213 assert_eq!(msg.meta.sequence.value(), 0);
214 assert_eq!(msg.payload.as_ref(), &"payload");
215 }
216}