1use std::fmt::{self, Display, Formatter};
28use std::hash::{Hash, Hasher};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::{Duration, SystemTime, UNIX_EPOCH};
32
33#[derive(Clone, Debug, Eq, PartialEq)]
40pub enum MessageId {
41 Uuid(u128),
43
44 Sequence(u64),
46
47 Custom(String),
49
50 ContentHash(u64),
52}
53
54impl MessageId {
55 #[must_use]
57 pub fn new_uuid() -> Self {
58 let high = rand_u64();
60 let low = rand_u64();
61 let uuid = ((high & 0xFFFFFFFFFFFF0FFF) | 0x0000000000004000) as u128
63 | (((low & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000) as u128) << 64;
64 MessageId::Uuid(uuid)
65 }
66
67 #[must_use]
69 pub const fn new_sequence(seq: u64) -> Self {
70 MessageId::Sequence(seq)
71 }
72
73 #[must_use]
75 pub fn new_custom(id: impl Into<String>) -> Self {
76 MessageId::Custom(id.into())
77 }
78
79 #[must_use]
81 pub fn from_content(content: &[u8]) -> Self {
82 use std::collections::hash_map::DefaultHasher;
83 let mut hasher = DefaultHasher::new();
84 content.hash(&mut hasher);
85 MessageId::ContentHash(hasher.finish())
86 }
87
88 #[must_use]
90 pub const fn is_uuid(&self) -> bool {
91 matches!(self, MessageId::Uuid(_))
92 }
93
94 #[must_use]
96 pub const fn is_sequence(&self) -> bool {
97 matches!(self, MessageId::Sequence(_))
98 }
99
100 #[must_use]
102 pub const fn is_custom(&self) -> bool {
103 matches!(self, MessageId::Custom(_))
104 }
105
106 #[must_use]
108 pub const fn is_content_hash(&self) -> bool {
109 matches!(self, MessageId::ContentHash(_))
110 }
111}
112
113impl Display for MessageId {
114 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
115 match self {
116 MessageId::Uuid(uuid) => {
117 write!(
119 f,
120 "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
121 (uuid >> 96) as u32,
122 (uuid >> 80) as u16,
123 (uuid >> 64) as u16,
124 (uuid >> 48) as u16,
125 (uuid & 0xFFFFFFFFFFFF) as u64
126 )
127 }
128 MessageId::Sequence(seq) => write!(f, "seq:{}", seq),
129 MessageId::Custom(id) => write!(f, "custom:{}", id),
130 MessageId::ContentHash(hash) => write!(f, "hash:{:016x}", hash),
131 }
132 }
133}
134
135impl Hash for MessageId {
136 fn hash<H: Hasher>(&self, state: &mut H) {
137 std::mem::discriminant(self).hash(state);
138 match self {
139 MessageId::Uuid(uuid) => uuid.hash(state),
140 MessageId::Sequence(seq) => seq.hash(state),
141 MessageId::Custom(id) => id.hash(state),
142 MessageId::ContentHash(hash) => hash.hash(state),
143 }
144 }
145}
146
147impl Default for MessageId {
148 fn default() -> Self {
149 MessageId::new_uuid()
150 }
151}
152
153#[derive(Clone, Debug, Default)]
155pub struct MessageMetadata {
156 pub timestamp: Option<Duration>,
158
159 pub source: Option<String>,
161
162 pub partition: Option<u32>,
164
165 pub offset: Option<u64>,
167
168 pub key: Option<String>,
170
171 pub headers: Vec<(String, String)>,
173}
174
175impl MessageMetadata {
176 #[must_use]
178 pub fn new() -> Self {
179 Self::default()
180 }
181
182 #[must_use]
184 pub fn with_timestamp_now() -> Self {
185 Self {
186 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).ok(),
187 ..Default::default()
188 }
189 }
190
191 #[must_use]
193 pub fn timestamp(mut self, ts: Duration) -> Self {
194 self.timestamp = Some(ts);
195 self
196 }
197
198 #[must_use]
200 pub fn source(mut self, source: impl Into<String>) -> Self {
201 self.source = Some(source.into());
202 self
203 }
204
205 #[must_use]
207 pub fn partition(mut self, partition: u32) -> Self {
208 self.partition = Some(partition);
209 self
210 }
211
212 #[must_use]
214 pub fn offset(mut self, offset: u64) -> Self {
215 self.offset = Some(offset);
216 self
217 }
218
219 #[must_use]
221 pub fn key(mut self, key: impl Into<String>) -> Self {
222 self.key = Some(key.into());
223 self
224 }
225
226 #[must_use]
228 pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
229 self.headers.push((name.into(), value.into()));
230 self
231 }
232
233 #[must_use]
235 pub fn get_header(&self, name: &str) -> Option<&str> {
236 self
237 .headers
238 .iter()
239 .find(|(k, _)| k == name)
240 .map(|(_, v)| v.as_str())
241 }
242}
243
244#[derive(Clone, Debug)]
271pub struct Message<T> {
272 id: MessageId,
273 payload: T,
274 metadata: MessageMetadata,
275}
276
277impl<T> Message<T> {
278 #[must_use]
280 pub fn new(payload: T, id: MessageId) -> Self {
281 Self {
282 id,
283 payload,
284 metadata: MessageMetadata::with_timestamp_now(),
285 }
286 }
287
288 #[must_use]
290 pub fn with_metadata(payload: T, id: MessageId, metadata: MessageMetadata) -> Self {
291 Self {
292 id,
293 payload,
294 metadata,
295 }
296 }
297
298 #[must_use]
300 pub fn id(&self) -> &MessageId {
301 &self.id
302 }
303
304 #[must_use]
306 pub fn payload(&self) -> &T {
307 &self.payload
308 }
309
310 pub fn payload_mut(&mut self) -> &mut T {
312 &mut self.payload
313 }
314
315 #[must_use]
317 pub fn metadata(&self) -> &MessageMetadata {
318 &self.metadata
319 }
320
321 pub fn metadata_mut(&mut self) -> &mut MessageMetadata {
323 &mut self.metadata
324 }
325
326 #[must_use]
328 pub fn into_parts(self) -> (MessageId, T, MessageMetadata) {
329 (self.id, self.payload, self.metadata)
330 }
331
332 #[must_use]
334 pub fn into_payload(self) -> T {
335 self.payload
336 }
337
338 #[must_use]
340 pub fn map<U, F>(self, f: F) -> Message<U>
341 where
342 F: FnOnce(T) -> U,
343 {
344 Message {
345 id: self.id,
346 payload: f(self.payload),
347 metadata: self.metadata,
348 }
349 }
350
351 #[must_use]
353 pub fn map_with_id<U, F>(self, f: F) -> Message<U>
354 where
355 F: FnOnce(&MessageId, T) -> U,
356 {
357 Message {
358 payload: f(&self.id, self.payload),
359 id: self.id,
360 metadata: self.metadata,
361 }
362 }
363
364 #[must_use]
366 pub fn with_payload<U>(self, payload: U) -> Message<U> {
367 Message {
368 id: self.id,
369 payload,
370 metadata: self.metadata,
371 }
372 }
373}
374
375impl<T: Default> Default for Message<T> {
376 fn default() -> Self {
377 Self::new(T::default(), MessageId::new_uuid())
378 }
379}
380
381impl<T: PartialEq> PartialEq for Message<T> {
382 fn eq(&self, other: &Self) -> bool {
383 self.id == other.id && self.payload == other.payload
384 }
385}
386
387impl<T: Eq> Eq for Message<T> {}
388
389impl<T: Hash> Hash for Message<T> {
390 fn hash<H: Hasher>(&self, state: &mut H) {
391 self.id.hash(state);
392 self.payload.hash(state);
393 }
394}
395
396pub trait IdGenerator: Send + Sync {
398 fn next_id(&self) -> MessageId;
400}
401
402#[derive(Debug, Default)]
406pub struct UuidGenerator;
407
408impl UuidGenerator {
409 #[must_use]
411 pub fn new() -> Self {
412 Self
413 }
414}
415
416impl IdGenerator for UuidGenerator {
417 fn next_id(&self) -> MessageId {
418 MessageId::new_uuid()
419 }
420}
421
422#[derive(Debug)]
427pub struct SequenceGenerator {
428 counter: AtomicU64,
429}
430
431impl SequenceGenerator {
432 #[must_use]
434 pub fn new() -> Self {
435 Self {
436 counter: AtomicU64::new(0),
437 }
438 }
439
440 #[must_use]
442 pub fn starting_at(start: u64) -> Self {
443 Self {
444 counter: AtomicU64::new(start),
445 }
446 }
447
448 #[must_use]
450 pub fn current(&self) -> u64 {
451 self.counter.load(Ordering::Relaxed)
452 }
453
454 pub fn reset(&self) {
456 self.counter.store(0, Ordering::Relaxed);
457 }
458
459 pub fn reset_to(&self, value: u64) {
461 self.counter.store(value, Ordering::Relaxed);
462 }
463}
464
465impl Default for SequenceGenerator {
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471impl IdGenerator for SequenceGenerator {
472 fn next_id(&self) -> MessageId {
473 MessageId::Sequence(self.counter.fetch_add(1, Ordering::Relaxed))
474 }
475}
476
477#[derive(Debug, Default)]
482pub struct ContentHashGenerator;
483
484impl ContentHashGenerator {
485 #[must_use]
487 pub fn new() -> Self {
488 Self
489 }
490
491 #[must_use]
493 pub fn hash_content(&self, content: &[u8]) -> MessageId {
494 MessageId::from_content(content)
495 }
496}
497
498pub type SharedIdGenerator = Arc<dyn IdGenerator>;
500
501#[must_use]
503pub fn uuid_generator() -> SharedIdGenerator {
504 Arc::new(UuidGenerator::new())
505}
506
507#[must_use]
509pub fn sequence_generator() -> SharedIdGenerator {
510 Arc::new(SequenceGenerator::new())
511}
512
513#[must_use]
515pub fn sequence_generator_from(start: u64) -> SharedIdGenerator {
516 Arc::new(SequenceGenerator::starting_at(start))
517}
518
519fn rand_u64() -> u64 {
522 use std::cell::Cell;
523 use std::hash::{Hash, Hasher};
524
525 thread_local! {
526 static STATE: Cell<u64> = {
527 let time_seed = std::time::SystemTime::now()
529 .duration_since(std::time::UNIX_EPOCH)
530 .map(|d| d.as_nanos() as u64)
531 .unwrap_or(0x12345678DEADBEEF);
532
533 let mut hasher = std::collections::hash_map::DefaultHasher::new();
534 std::thread::current().id().hash(&mut hasher);
535 let thread_seed = hasher.finish();
536
537 Cell::new(time_seed ^ thread_seed)
538 };
539 }
540
541 STATE.with(|state| {
542 let mut x = state.get();
543 x ^= x << 13;
544 x ^= x >> 7;
545 x ^= x << 17;
546 state.set(x);
547 x
548 })
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 #[test]
556 fn test_message_id_uuid() {
557 let id = MessageId::new_uuid();
558 assert!(id.is_uuid());
559 assert!(!id.is_sequence());
560 assert!(!id.is_custom());
561 assert!(!id.is_content_hash());
562 }
563
564 #[test]
565 fn test_message_id_sequence() {
566 let id = MessageId::new_sequence(42);
567 assert!(id.is_sequence());
568 assert!(!id.is_uuid());
569
570 if let MessageId::Sequence(seq) = id {
571 assert_eq!(seq, 42);
572 } else {
573 panic!("Expected sequence ID");
574 }
575 }
576
577 #[test]
578 fn test_message_id_custom() {
579 let id = MessageId::new_custom("my-custom-id");
580 assert!(id.is_custom());
581
582 if let MessageId::Custom(s) = id {
583 assert_eq!(s, "my-custom-id");
584 } else {
585 panic!("Expected custom ID");
586 }
587 }
588
589 #[test]
590 fn test_message_id_content_hash() {
591 let id1 = MessageId::from_content(b"hello");
592 let id2 = MessageId::from_content(b"hello");
593 let id3 = MessageId::from_content(b"world");
594
595 assert!(id1.is_content_hash());
596 assert_eq!(id1, id2); assert_ne!(id1, id3); }
599
600 #[test]
601 fn test_message_id_display() {
602 let uuid = MessageId::Uuid(0x12345678_1234_4567_89AB_CDEF01234567);
603 let formatted = format!("{}", uuid);
604 assert!(formatted.contains("-")); let seq = MessageId::new_sequence(42);
607 assert_eq!(format!("{}", seq), "seq:42");
608
609 let custom = MessageId::new_custom("test");
610 assert_eq!(format!("{}", custom), "custom:test");
611
612 let hash = MessageId::ContentHash(0xDEADBEEF);
613 assert!(format!("{}", hash).starts_with("hash:"));
614 }
615
616 #[test]
617 fn test_message_creation() {
618 let msg = Message::new(42, MessageId::new_sequence(1));
619 assert_eq!(*msg.payload(), 42);
620 assert!(msg.id().is_sequence());
621 }
622
623 #[test]
624 fn test_message_with_metadata() {
625 let metadata = MessageMetadata::with_timestamp_now()
626 .source("test-source")
627 .partition(3);
628
629 let msg = Message::with_metadata("hello", MessageId::new_uuid(), metadata);
630
631 assert_eq!(*msg.payload(), "hello");
632 assert_eq!(msg.metadata().source, Some("test-source".to_string()));
633 assert_eq!(msg.metadata().partition, Some(3));
634 }
635
636 #[test]
637 fn test_message_map() {
638 let msg = Message::new(42, MessageId::new_sequence(1));
639 let mapped = msg.map(|x| x * 2);
640
641 assert_eq!(*mapped.payload(), 84);
642 assert_eq!(*mapped.id(), MessageId::new_sequence(1)); }
644
645 #[test]
646 fn test_message_into_parts() {
647 let msg = Message::new("test", MessageId::new_sequence(5));
648 let (id, payload, _metadata) = msg.into_parts();
649
650 assert_eq!(id, MessageId::new_sequence(5));
651 assert_eq!(payload, "test");
652 }
653
654 #[test]
655 fn test_uuid_generator() {
656 let generator = UuidGenerator::new();
657 let id1 = generator.next_id();
658 let id2 = generator.next_id();
659
660 assert!(id1.is_uuid());
661 assert!(id2.is_uuid());
662 assert_ne!(id1, id2); }
664
665 #[test]
666 fn test_sequence_generator() {
667 let generator = SequenceGenerator::new();
668 let id1 = generator.next_id();
669 let id2 = generator.next_id();
670 let id3 = generator.next_id();
671
672 assert_eq!(id1, MessageId::Sequence(0));
673 assert_eq!(id2, MessageId::Sequence(1));
674 assert_eq!(id3, MessageId::Sequence(2));
675 }
676
677 #[test]
678 fn test_sequence_generator_starting_at() {
679 let generator = SequenceGenerator::starting_at(100);
680 let id = generator.next_id();
681 assert_eq!(id, MessageId::Sequence(100));
682 }
683
684 #[test]
685 fn test_sequence_generator_reset() {
686 let generator = SequenceGenerator::starting_at(50);
687 generator.next_id();
688 generator.next_id();
689 assert_eq!(generator.current(), 52);
690
691 generator.reset();
692 assert_eq!(generator.current(), 0);
693
694 generator.reset_to(1000);
695 assert_eq!(generator.current(), 1000);
696 }
697
698 #[test]
699 fn test_metadata_builder() {
700 let metadata = MessageMetadata::new()
701 .source("my-source")
702 .partition(5)
703 .offset(100)
704 .key("my-key")
705 .header("content-type", "application/json");
706
707 assert_eq!(metadata.source, Some("my-source".to_string()));
708 assert_eq!(metadata.partition, Some(5));
709 assert_eq!(metadata.offset, Some(100));
710 assert_eq!(metadata.key, Some("my-key".to_string()));
711 assert_eq!(
712 metadata.get_header("content-type"),
713 Some("application/json")
714 );
715 assert_eq!(metadata.get_header("non-existent"), None);
716 }
717
718 #[test]
719 fn test_shared_generators() {
720 let uuid_gen = uuid_generator();
721 let seq_gen = sequence_generator();
722 let seq_gen_from = sequence_generator_from(1000);
723
724 assert!(uuid_gen.next_id().is_uuid());
725 assert_eq!(seq_gen.next_id(), MessageId::Sequence(0));
726 assert_eq!(seq_gen_from.next_id(), MessageId::Sequence(1000));
727 }
728
729 #[test]
730 fn test_message_equality() {
731 let id = MessageId::new_sequence(1);
732 let msg1 = Message::new(42, id.clone());
733 let msg2 = Message::new(42, id.clone());
734 let msg3 = Message::new(43, id);
735
736 assert_eq!(msg1, msg2);
737 assert_ne!(msg1, msg3);
738 }
739
740 #[test]
741 fn test_concurrent_sequence_generator() {
742 use std::sync::Arc;
743 use std::thread;
744
745 let generator = Arc::new(SequenceGenerator::new());
746 let mut handles = vec![];
747
748 for _ in 0..10 {
749 let generator_clone = Arc::clone(&generator);
750 handles.push(thread::spawn(move || {
751 let mut ids = vec![];
752 for _ in 0..100 {
753 ids.push(generator_clone.next_id());
754 }
755 ids
756 }));
757 }
758
759 let mut all_ids: Vec<MessageId> = handles
760 .into_iter()
761 .flat_map(|h| h.join().unwrap())
762 .collect();
763
764 all_ids.sort_by_key(|id| {
765 if let MessageId::Sequence(seq) = id {
766 *seq
767 } else {
768 panic!("Expected sequence ID")
769 }
770 });
771
772 for (i, id) in all_ids.iter().enumerate() {
774 assert_eq!(*id, MessageId::Sequence(i as u64));
775 }
776 }
777}