daedalus_core/
messages.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use serde::{Deserialize, Serialize};
5
6use crate::clock::Tick;
7use crate::ids::ChannelId;
8
9#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
17#[serde(transparent)]
18pub struct Sequence(u64);
19
20impl Sequence {
21 pub const ZERO: Sequence = Sequence(0);
22
23 pub fn new(value: u64) -> Self {
24 Sequence(value)
25 }
26
27 pub fn value(self) -> u64 {
28 self.0
29 }
30
31 pub fn next(self) -> Self {
32 Sequence(self.0.saturating_add(1))
33 }
34}
35
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
44#[serde(transparent)]
45pub struct Token(u64);
46
47impl Token {
48 pub fn new(raw: u64) -> Self {
49 Token(raw)
50 }
51
52 pub fn value(self) -> u64 {
53 self.0
54 }
55}
56
57#[derive(Debug, Default)]
69pub struct TokenGenerator {
70 counter: AtomicU64,
71}
72
73impl TokenGenerator {
74 pub fn new() -> Self {
75 Self {
76 counter: AtomicU64::new(1),
77 }
78 }
79
80 pub fn next(&self) -> Token {
81 let id = self.counter.fetch_add(1, Ordering::Relaxed);
82 Token::new(id)
83 }
84}
85
86#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
96pub struct Watermark {
97 sequence: Sequence,
98 tick: Tick,
99}
100
101impl Watermark {
102 pub fn new(sequence: Sequence, tick: Tick) -> Self {
103 Self { sequence, tick }
104 }
105
106 pub fn sequence(&self) -> Sequence {
107 self.sequence
108 }
109
110 pub fn tick(&self) -> Tick {
111 self.tick
112 }
113}
114
115#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
124pub struct MessageMeta {
125 pub created_at: Tick,
126 pub source: Option<ChannelId>,
127 pub sequence: Sequence,
128}
129
130impl MessageMeta {
131 pub fn new(created_at: Tick, sequence: Sequence) -> Self {
132 Self {
133 created_at,
134 source: None,
135 sequence,
136 }
137 }
138
139 pub fn with_source(mut self, source: ChannelId) -> Self {
140 self.source = Some(source);
141 self
142 }
143}
144
145impl Default for MessageMeta {
146 fn default() -> Self {
147 Self {
148 created_at: Tick::ZERO,
149 source: None,
150 sequence: Sequence::ZERO,
151 }
152 }
153}
154
155#[derive(Clone, Debug, Serialize, Deserialize)]
169pub struct Message<T: Send + Sync> {
170 pub token: Token,
171 pub meta: MessageMeta,
172 pub payload: Arc<T>,
173}
174
175impl<T: Send + Sync> Message<T> {
176 pub fn new(token: Token, meta: MessageMeta, payload: T) -> Self {
177 Self {
178 token,
179 meta,
180 payload: Arc::new(payload),
181 }
182 }
183
184 pub fn map<U: Send + Sync>(self, f: impl FnOnce(Arc<T>) -> U) -> Message<U> {
186 let payload = f(self.payload);
187 Message {
188 token: self.token,
189 meta: self.meta,
190 payload: Arc::new(payload),
191 }
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::clock::TickClock;
199
200 #[test]
201 fn token_generator_monotonic() {
202 let token_gen = TokenGenerator::new();
203 let t1 = token_gen.next();
204 let t2 = token_gen.next();
205 assert!(t2.value() > t1.value());
206 }
207
208 #[test]
209 fn message_round_trip() {
210 let clock = TickClock::default();
211 let token = Token::new(1);
212 let meta = MessageMeta::new(clock.now_tick(), Sequence::ZERO);
213 let msg = Message::new(token, meta, "payload");
214 assert_eq!(msg.token.value(), 1);
215 assert_eq!(msg.meta.sequence.value(), 0);
216 assert_eq!(msg.payload.as_ref(), &"payload");
217 }
218}