1use std::fmt::{self, Display, Formatter};
28use std::hash::{Hash, Hasher};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::{Duration, SystemTime, UNIX_EPOCH};
32
33#[derive(Clone, Debug, Eq, PartialEq)]
40pub enum MessageId {
41 Uuid(u128),
43
44 Sequence(u64),
46
47 Custom(String),
49
50 ContentHash(u64),
52}
53
54impl MessageId {
55 #[must_use]
57 pub fn new_uuid() -> Self {
58 let high = rand_u64();
60 let low = rand_u64();
61 let uuid = ((high & 0xFFFFFFFFFFFF0FFF) | 0x0000000000004000) as u128
63 | (((low & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000) as u128) << 64;
64 MessageId::Uuid(uuid)
65 }
66
67 #[must_use]
69 pub const fn new_sequence(seq: u64) -> Self {
70 MessageId::Sequence(seq)
71 }
72
73 #[must_use]
75 pub fn new_custom(id: impl Into<String>) -> Self {
76 MessageId::Custom(id.into())
77 }
78
79 #[must_use]
81 pub fn from_content(content: &[u8]) -> Self {
82 use std::collections::hash_map::DefaultHasher;
83 let mut hasher = DefaultHasher::new();
84 content.hash(&mut hasher);
85 MessageId::ContentHash(hasher.finish())
86 }
87
88 #[must_use]
90 pub const fn is_uuid(&self) -> bool {
91 matches!(self, MessageId::Uuid(_))
92 }
93
94 #[must_use]
96 pub const fn is_sequence(&self) -> bool {
97 matches!(self, MessageId::Sequence(_))
98 }
99
100 #[must_use]
102 pub const fn is_custom(&self) -> bool {
103 matches!(self, MessageId::Custom(_))
104 }
105
106 #[must_use]
108 pub const fn is_content_hash(&self) -> bool {
109 matches!(self, MessageId::ContentHash(_))
110 }
111}
112
113impl Display for MessageId {
114 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
115 match self {
116 MessageId::Uuid(uuid) => {
117 write!(
119 f,
120 "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
121 (uuid >> 96) as u32,
122 (uuid >> 80) as u16,
123 (uuid >> 64) as u16,
124 (uuid >> 48) as u16,
125 (uuid & 0xFFFFFFFFFFFF) as u64
126 )
127 }
128 MessageId::Sequence(seq) => write!(f, "seq:{}", seq),
129 MessageId::Custom(id) => write!(f, "custom:{}", id),
130 MessageId::ContentHash(hash) => write!(f, "hash:{:016x}", hash),
131 }
132 }
133}
134
135impl Hash for MessageId {
136 fn hash<H: Hasher>(&self, state: &mut H) {
137 std::mem::discriminant(self).hash(state);
138 match self {
139 MessageId::Uuid(uuid) => uuid.hash(state),
140 MessageId::Sequence(seq) => seq.hash(state),
141 MessageId::Custom(id) => id.hash(state),
142 MessageId::ContentHash(hash) => hash.hash(state),
143 }
144 }
145}
146
147impl Default for MessageId {
148 fn default() -> Self {
149 MessageId::new_uuid()
150 }
151}
152
153#[derive(Clone, Debug, Default)]
158pub struct MessageMetadata {
159 pub timestamp: Option<Duration>,
161
162 pub source: Option<Arc<str>>,
165
166 pub partition: Option<u32>,
168
169 pub offset: Option<u64>,
171
172 pub key: Option<Arc<str>>,
175
176 pub headers: Vec<(Arc<str>, Arc<str>)>,
179}
180
181impl MessageMetadata {
182 #[must_use]
184 pub fn new() -> Self {
185 Self::default()
186 }
187
188 #[must_use]
190 pub fn with_timestamp_now() -> Self {
191 Self {
192 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).ok(),
193 ..Default::default()
194 }
195 }
196
197 #[must_use]
199 pub fn timestamp(mut self, ts: Duration) -> Self {
200 self.timestamp = Some(ts);
201 self
202 }
203
204 #[must_use]
209 pub fn source(mut self, source: impl Into<Arc<str>>) -> Self {
210 self.source = Some(source.into());
211 self
212 }
213
214 #[must_use]
216 pub fn partition(mut self, partition: u32) -> Self {
217 self.partition = Some(partition);
218 self
219 }
220
221 #[must_use]
223 pub fn offset(mut self, offset: u64) -> Self {
224 self.offset = Some(offset);
225 self
226 }
227
228 #[must_use]
233 pub fn key(mut self, key: impl Into<Arc<str>>) -> Self {
234 self.key = Some(key.into());
235 self
236 }
237
238 #[must_use]
243 pub fn header(mut self, name: impl Into<Arc<str>>, value: impl Into<Arc<str>>) -> Self {
244 self.headers.push((name.into(), value.into()));
245 self
246 }
247
248 #[must_use]
252 pub fn get_header(&self, name: &str) -> Option<&str> {
253 self
254 .headers
255 .iter()
256 .find(|(k, _)| k.as_ref() == name)
257 .map(|(_, v)| v.as_ref())
258 }
259
260 #[must_use]
264 pub fn get_source(&self) -> Option<&str> {
265 self.source.as_deref()
266 }
267
268 #[must_use]
272 pub fn get_key(&self) -> Option<&str> {
273 self.key.as_deref()
274 }
275
276 #[must_use]
283 pub fn with_shared_source(mut self, source: Arc<str>) -> Self {
284 self.source = Some(source);
285 self
286 }
287
288 #[must_use]
295 pub fn with_shared_key(mut self, key: Arc<str>) -> Self {
296 self.key = Some(key);
297 self
298 }
299
300 #[must_use]
307 pub fn with_shared_header(mut self, name: Arc<str>, value: Arc<str>) -> Self {
308 self.headers.push((name, value));
309 self
310 }
311
312 #[must_use]
316 pub fn with_source_borrowed(mut self, source: &str) -> Self {
317 self.source = Some(Arc::from(source));
318 self
319 }
320
321 #[must_use]
342 pub fn with_source_interned<I: StringInternerTrait>(
343 mut self,
344 source: &str,
345 interner: &I,
346 ) -> Self {
347 self.source = Some(interner.get_or_intern(source));
348 self
349 }
350
351 #[must_use]
366 pub fn with_key_interned<I: StringInternerTrait>(mut self, key: &str, interner: &I) -> Self {
367 self.key = Some(interner.get_or_intern(key));
368 self
369 }
370
371 #[must_use]
387 pub fn add_header_interned<I: StringInternerTrait>(
388 mut self,
389 name: &str,
390 value: &str,
391 interner: &I,
392 ) -> Self {
393 self
394 .headers
395 .push((interner.get_or_intern(name), interner.get_or_intern(value)));
396 self
397 }
398}
399
400#[derive(Clone, Debug)]
427pub struct Message<T> {
428 id: MessageId,
429 payload: T,
430 metadata: MessageMetadata,
431}
432
433impl<T> Message<T> {
434 #[must_use]
436 pub fn new(payload: T, id: MessageId) -> Self {
437 Self {
438 id,
439 payload,
440 metadata: MessageMetadata::with_timestamp_now(),
441 }
442 }
443
444 #[must_use]
446 pub fn with_metadata(payload: T, id: MessageId, metadata: MessageMetadata) -> Self {
447 Self {
448 id,
449 payload,
450 metadata,
451 }
452 }
453
454 #[must_use]
456 pub fn id(&self) -> &MessageId {
457 &self.id
458 }
459
460 #[must_use]
462 pub fn payload(&self) -> &T {
463 &self.payload
464 }
465
466 pub fn payload_mut(&mut self) -> &mut T {
468 &mut self.payload
469 }
470
471 #[must_use]
473 pub fn metadata(&self) -> &MessageMetadata {
474 &self.metadata
475 }
476
477 pub fn metadata_mut(&mut self) -> &mut MessageMetadata {
479 &mut self.metadata
480 }
481
482 #[must_use]
484 pub fn into_parts(self) -> (MessageId, T, MessageMetadata) {
485 (self.id, self.payload, self.metadata)
486 }
487
488 #[must_use]
490 pub fn into_payload(self) -> T {
491 self.payload
492 }
493
494 #[must_use]
496 pub fn map<U, F>(self, f: F) -> Message<U>
497 where
498 F: FnOnce(T) -> U,
499 {
500 Message {
501 id: self.id,
502 payload: f(self.payload),
503 metadata: self.metadata,
504 }
505 }
506
507 #[must_use]
509 pub fn map_with_id<U, F>(self, f: F) -> Message<U>
510 where
511 F: FnOnce(&MessageId, T) -> U,
512 {
513 Message {
514 payload: f(&self.id, self.payload),
515 id: self.id,
516 metadata: self.metadata,
517 }
518 }
519
520 #[must_use]
522 pub fn with_payload<U>(self, payload: U) -> Message<U> {
523 Message {
524 id: self.id,
525 payload,
526 metadata: self.metadata,
527 }
528 }
529}
530
531impl<T: Default> Default for Message<T> {
532 fn default() -> Self {
533 Self::new(T::default(), MessageId::new_uuid())
534 }
535}
536
537impl<T: PartialEq> PartialEq for Message<T> {
538 fn eq(&self, other: &Self) -> bool {
539 self.id == other.id && self.payload == other.payload
540 }
541}
542
543impl<T: Eq> Eq for Message<T> {}
544
545impl<T: Hash> Hash for Message<T> {
546 fn hash<H: Hasher>(&self, state: &mut H) {
547 self.id.hash(state);
548 self.payload.hash(state);
549 }
550}
551
552pub trait IdGenerator: Send + Sync {
554 fn next_id(&self) -> MessageId;
556}
557
558#[derive(Debug, Default)]
562pub struct UuidGenerator;
563
564impl UuidGenerator {
565 #[must_use]
567 pub fn new() -> Self {
568 Self
569 }
570}
571
572impl IdGenerator for UuidGenerator {
573 fn next_id(&self) -> MessageId {
574 MessageId::new_uuid()
575 }
576}
577
578#[derive(Debug)]
583pub struct SequenceGenerator {
584 counter: AtomicU64,
585}
586
587impl SequenceGenerator {
588 #[must_use]
590 pub fn new() -> Self {
591 Self {
592 counter: AtomicU64::new(0),
593 }
594 }
595
596 #[must_use]
598 pub fn starting_at(start: u64) -> Self {
599 Self {
600 counter: AtomicU64::new(start),
601 }
602 }
603
604 #[must_use]
606 pub fn current(&self) -> u64 {
607 self.counter.load(Ordering::Relaxed)
608 }
609
610 pub fn reset(&self) {
612 self.counter.store(0, Ordering::Relaxed);
613 }
614
615 pub fn reset_to(&self, value: u64) {
617 self.counter.store(value, Ordering::Relaxed);
618 }
619}
620
621impl Default for SequenceGenerator {
622 fn default() -> Self {
623 Self::new()
624 }
625}
626
627impl IdGenerator for SequenceGenerator {
628 fn next_id(&self) -> MessageId {
629 MessageId::Sequence(self.counter.fetch_add(1, Ordering::Relaxed))
630 }
631}
632
633#[derive(Debug, Default)]
638pub struct ContentHashGenerator;
639
640impl ContentHashGenerator {
641 #[must_use]
643 pub fn new() -> Self {
644 Self
645 }
646
647 #[must_use]
649 pub fn hash_content(&self, content: &[u8]) -> MessageId {
650 MessageId::from_content(content)
651 }
652}
653
654pub type SharedIdGenerator = Arc<dyn IdGenerator>;
656
657#[must_use]
659pub fn uuid_generator() -> SharedIdGenerator {
660 Arc::new(UuidGenerator::new())
661}
662
663#[must_use]
665pub fn sequence_generator() -> SharedIdGenerator {
666 Arc::new(SequenceGenerator::new())
667}
668
669#[must_use]
671pub fn sequence_generator_from(start: u64) -> SharedIdGenerator {
672 Arc::new(SequenceGenerator::starting_at(start))
673}
674
675fn rand_u64() -> u64 {
678 use std::cell::Cell;
679 use std::hash::{Hash, Hasher};
680
681 thread_local! {
682 static STATE: Cell<u64> = {
683 let time_seed = std::time::SystemTime::now()
685 .duration_since(std::time::UNIX_EPOCH)
686 .map(|d| d.as_nanos() as u64)
687 .unwrap_or(0x12345678DEADBEEF);
688
689 let mut hasher = std::collections::hash_map::DefaultHasher::new();
690 std::thread::current().id().hash(&mut hasher);
691 let thread_seed = hasher.finish();
692
693 Cell::new(time_seed ^ thread_seed)
694 };
695 }
696
697 STATE.with(|state| {
698 let mut x = state.get();
699 x ^= x << 13;
700 x ^= x >> 7;
701 x ^= x << 17;
702 state.set(x);
703 x
704 })
705}
706
707#[derive(Clone, Debug)]
722pub struct SharedMessage<T> {
723 inner: Arc<Message<T>>,
724}
725
726impl<T> SharedMessage<T> {
727 #[must_use]
729 pub fn from(message: Message<T>) -> Self {
730 Self {
731 inner: Arc::new(message),
732 }
733 }
734
735 #[must_use]
737 pub fn from_arc(arc: Arc<Message<T>>) -> Self {
738 Self { inner: arc }
739 }
740
741 #[must_use]
743 pub fn into_arc(self) -> Arc<Message<T>> {
744 self.inner
745 }
746
747 pub fn try_unwrap(self) -> Result<Message<T>, Self> {
751 Arc::try_unwrap(self.inner).map_err(|arc| Self { inner: arc })
752 }
753
754 #[must_use]
756 pub fn id(&self) -> &MessageId {
757 self.inner.id()
758 }
759
760 #[must_use]
762 pub fn payload(&self) -> &T {
763 self.inner.payload()
764 }
765
766 #[must_use]
768 pub fn metadata(&self) -> &MessageMetadata {
769 self.inner.metadata()
770 }
771}
772
773impl<T> std::ops::Deref for SharedMessage<T> {
774 type Target = Message<T>;
775
776 fn deref(&self) -> &Self::Target {
777 &self.inner
778 }
779}
780
781impl<T> std::convert::AsRef<Message<T>> for SharedMessage<T> {
782 fn as_ref(&self) -> &Message<T> {
783 &self.inner
784 }
785}
786
787impl<T: PartialEq> PartialEq for SharedMessage<T> {
788 fn eq(&self, other: &Self) -> bool {
789 self.inner.as_ref() == other.inner.as_ref()
790 }
791}
792
793impl<T: Eq> Eq for SharedMessage<T> {}
794
795impl<T: Hash> Hash for SharedMessage<T> {
796 fn hash<H: Hasher>(&self, state: &mut H) {
797 self.inner.hash(state);
798 }
799}
800
801impl<T> From<Message<T>> for SharedMessage<T> {
802 fn from(message: Message<T>) -> Self {
803 Self::from(message)
804 }
805}
806
807impl<T> From<Arc<Message<T>>> for SharedMessage<T> {
808 fn from(arc: Arc<Message<T>>) -> Self {
809 Self::from_arc(arc)
810 }
811}
812
813impl<T> From<SharedMessage<T>> for Arc<Message<T>> {
814 fn from(shared: SharedMessage<T>) -> Self {
815 shared.into_arc()
816 }
817}
818
819pub trait StringInternerTrait {
835 fn get_or_intern(&self, s: &str) -> Arc<str>;
845}