streamweave_message/
message.rs

1//! Message envelope types for exactly-once processing.
2//!
3//! This module provides types for wrapping stream items with unique identifiers
4//! and metadata, enabling features like deduplication, offset tracking, and
5//! exactly-once processing guarantees.
6//!
7//! # Overview
8//!
9//! The core types are:
10//!
11//! - [`MessageId`]: A unique identifier for messages
12//! - [`Message<T>`]: A wrapper that adds an ID and metadata to a payload
13//! - [`MessageMetadata`]: Additional information about a message
14//!
15//! # Example
16//!
17//! ```rust
18//! use streamweave::message::{Message, MessageId, MessageMetadata, IdGenerator, UuidGenerator};
19//!
20//! // Create a message with a UUID
21//! let generator = UuidGenerator::new();
22//! let msg = Message::new(42, generator.next_id());
23//!
24//! assert_eq!(*msg.payload(), 42);
25//! ```
26
27use std::fmt::{self, Display, Formatter};
28use std::hash::{Hash, Hasher};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::{Duration, SystemTime, UNIX_EPOCH};
32
33/// A unique identifier for messages.
34///
35/// Message IDs can be generated using various strategies:
36/// - UUID: Globally unique, good for distributed systems
37/// - Sequence: Monotonically increasing, good for ordered processing
38/// - Custom: User-provided identifier (e.g., from source system)
39#[derive(Clone, Debug, Eq, PartialEq)]
40pub enum MessageId {
41  /// A UUID-based identifier (128-bit).
42  Uuid(u128),
43
44  /// A sequence-based identifier (64-bit).
45  Sequence(u64),
46
47  /// A custom string identifier.
48  Custom(String),
49
50  /// A hash-based identifier derived from content.
51  ContentHash(u64),
52}
53
54impl MessageId {
55  /// Create a new UUID-based message ID using UUIDv4.
56  #[must_use]
57  pub fn new_uuid() -> Self {
58    // Use a simple random UUID implementation
59    let high = rand_u64();
60    let low = rand_u64();
61    // Set version 4 and variant bits
62    let uuid = ((high & 0xFFFFFFFFFFFF0FFF) | 0x0000000000004000) as u128
63      | (((low & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000) as u128) << 64;
64    MessageId::Uuid(uuid)
65  }
66
67  /// Create a new sequence-based message ID.
68  #[must_use]
69  pub const fn new_sequence(seq: u64) -> Self {
70    MessageId::Sequence(seq)
71  }
72
73  /// Create a custom message ID from a string.
74  #[must_use]
75  pub fn new_custom(id: impl Into<String>) -> Self {
76    MessageId::Custom(id.into())
77  }
78
79  /// Create a content-hash message ID from the given bytes.
80  #[must_use]
81  pub fn from_content(content: &[u8]) -> Self {
82    use std::collections::hash_map::DefaultHasher;
83    let mut hasher = DefaultHasher::new();
84    content.hash(&mut hasher);
85    MessageId::ContentHash(hasher.finish())
86  }
87
88  /// Returns true if this is a UUID-based ID.
89  #[must_use]
90  pub const fn is_uuid(&self) -> bool {
91    matches!(self, MessageId::Uuid(_))
92  }
93
94  /// Returns true if this is a sequence-based ID.
95  #[must_use]
96  pub const fn is_sequence(&self) -> bool {
97    matches!(self, MessageId::Sequence(_))
98  }
99
100  /// Returns true if this is a custom ID.
101  #[must_use]
102  pub const fn is_custom(&self) -> bool {
103    matches!(self, MessageId::Custom(_))
104  }
105
106  /// Returns true if this is a content-hash ID.
107  #[must_use]
108  pub const fn is_content_hash(&self) -> bool {
109    matches!(self, MessageId::ContentHash(_))
110  }
111}
112
113impl Display for MessageId {
114  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
115    match self {
116      MessageId::Uuid(uuid) => {
117        // Format as standard UUID string
118        write!(
119          f,
120          "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
121          (uuid >> 96) as u32,
122          (uuid >> 80) as u16,
123          (uuid >> 64) as u16,
124          (uuid >> 48) as u16,
125          (uuid & 0xFFFFFFFFFFFF) as u64
126        )
127      }
128      MessageId::Sequence(seq) => write!(f, "seq:{}", seq),
129      MessageId::Custom(id) => write!(f, "custom:{}", id),
130      MessageId::ContentHash(hash) => write!(f, "hash:{:016x}", hash),
131    }
132  }
133}
134
135impl Hash for MessageId {
136  fn hash<H: Hasher>(&self, state: &mut H) {
137    std::mem::discriminant(self).hash(state);
138    match self {
139      MessageId::Uuid(uuid) => uuid.hash(state),
140      MessageId::Sequence(seq) => seq.hash(state),
141      MessageId::Custom(id) => id.hash(state),
142      MessageId::ContentHash(hash) => hash.hash(state),
143    }
144  }
145}
146
147impl Default for MessageId {
148  fn default() -> Self {
149    MessageId::new_uuid()
150  }
151}
152
153/// Metadata associated with a message.
154#[derive(Clone, Debug, Default)]
155pub struct MessageMetadata {
156  /// When the message was created (as Duration since UNIX_EPOCH).
157  pub timestamp: Option<Duration>,
158
159  /// The source of the message (e.g., topic, file, etc.).
160  pub source: Option<String>,
161
162  /// Partition or shard information.
163  pub partition: Option<u32>,
164
165  /// Offset within the partition/source.
166  pub offset: Option<u64>,
167
168  /// User-defined key for routing/grouping.
169  pub key: Option<String>,
170
171  /// Additional headers/attributes.
172  pub headers: Vec<(String, String)>,
173}
174
175impl MessageMetadata {
176  /// Create new empty metadata.
177  #[must_use]
178  pub fn new() -> Self {
179    Self::default()
180  }
181
182  /// Create metadata with the current timestamp.
183  #[must_use]
184  pub fn with_timestamp_now() -> Self {
185    Self {
186      timestamp: SystemTime::now().duration_since(UNIX_EPOCH).ok(),
187      ..Default::default()
188    }
189  }
190
191  /// Set the timestamp.
192  #[must_use]
193  pub fn timestamp(mut self, ts: Duration) -> Self {
194    self.timestamp = Some(ts);
195    self
196  }
197
198  /// Set the source.
199  #[must_use]
200  pub fn source(mut self, source: impl Into<String>) -> Self {
201    self.source = Some(source.into());
202    self
203  }
204
205  /// Set the partition.
206  #[must_use]
207  pub fn partition(mut self, partition: u32) -> Self {
208    self.partition = Some(partition);
209    self
210  }
211
212  /// Set the offset.
213  #[must_use]
214  pub fn offset(mut self, offset: u64) -> Self {
215    self.offset = Some(offset);
216    self
217  }
218
219  /// Set the key.
220  #[must_use]
221  pub fn key(mut self, key: impl Into<String>) -> Self {
222    self.key = Some(key.into());
223    self
224  }
225
226  /// Add a header.
227  #[must_use]
228  pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
229    self.headers.push((name.into(), value.into()));
230    self
231  }
232
233  /// Get a header by name.
234  #[must_use]
235  pub fn get_header(&self, name: &str) -> Option<&str> {
236    self
237      .headers
238      .iter()
239      .find(|(k, _)| k == name)
240      .map(|(_, v)| v.as_str())
241  }
242}
243
244/// A message envelope that wraps a payload with an ID and metadata.
245///
246/// This is the primary type for exactly-once processing. It provides:
247/// - A unique identifier for deduplication
248/// - Metadata for tracking and routing
249/// - The actual payload
250///
251/// # Type Parameters
252///
253/// - `T`: The payload type
254///
255/// # Example
256///
257/// ```rust
258/// use streamweave::message::{Message, MessageId, MessageMetadata};
259///
260/// // Create a simple message
261/// let msg = Message::new(42, MessageId::new_uuid());
262///
263/// // Create a message with metadata
264/// let msg = Message::with_metadata(
265///     "hello",
266///     MessageId::new_sequence(1),
267///     MessageMetadata::with_timestamp_now().source("my-source")
268/// );
269/// ```
270#[derive(Clone, Debug)]
271pub struct Message<T> {
272  id: MessageId,
273  payload: T,
274  metadata: MessageMetadata,
275}
276
277impl<T> Message<T> {
278  /// Create a new message with the given payload and ID.
279  #[must_use]
280  pub fn new(payload: T, id: MessageId) -> Self {
281    Self {
282      id,
283      payload,
284      metadata: MessageMetadata::with_timestamp_now(),
285    }
286  }
287
288  /// Create a new message with payload, ID, and metadata.
289  #[must_use]
290  pub fn with_metadata(payload: T, id: MessageId, metadata: MessageMetadata) -> Self {
291    Self {
292      id,
293      payload,
294      metadata,
295    }
296  }
297
298  /// Get the message ID.
299  #[must_use]
300  pub fn id(&self) -> &MessageId {
301    &self.id
302  }
303
304  /// Get the payload.
305  #[must_use]
306  pub fn payload(&self) -> &T {
307    &self.payload
308  }
309
310  /// Get a mutable reference to the payload.
311  pub fn payload_mut(&mut self) -> &mut T {
312    &mut self.payload
313  }
314
315  /// Get the metadata.
316  #[must_use]
317  pub fn metadata(&self) -> &MessageMetadata {
318    &self.metadata
319  }
320
321  /// Get a mutable reference to the metadata.
322  pub fn metadata_mut(&mut self) -> &mut MessageMetadata {
323    &mut self.metadata
324  }
325
326  /// Consume the message and return its components.
327  #[must_use]
328  pub fn into_parts(self) -> (MessageId, T, MessageMetadata) {
329    (self.id, self.payload, self.metadata)
330  }
331
332  /// Consume the message and return just the payload.
333  #[must_use]
334  pub fn into_payload(self) -> T {
335    self.payload
336  }
337
338  /// Map the payload to a new type.
339  #[must_use]
340  pub fn map<U, F>(self, f: F) -> Message<U>
341  where
342    F: FnOnce(T) -> U,
343  {
344    Message {
345      id: self.id,
346      payload: f(self.payload),
347      metadata: self.metadata,
348    }
349  }
350
351  /// Map the payload to a new type, with access to the message ID.
352  #[must_use]
353  pub fn map_with_id<U, F>(self, f: F) -> Message<U>
354  where
355    F: FnOnce(&MessageId, T) -> U,
356  {
357    Message {
358      payload: f(&self.id, self.payload),
359      id: self.id,
360      metadata: self.metadata,
361    }
362  }
363
364  /// Replace the payload with a new value.
365  #[must_use]
366  pub fn with_payload<U>(self, payload: U) -> Message<U> {
367    Message {
368      id: self.id,
369      payload,
370      metadata: self.metadata,
371    }
372  }
373}
374
375impl<T: Default> Default for Message<T> {
376  fn default() -> Self {
377    Self::new(T::default(), MessageId::new_uuid())
378  }
379}
380
381impl<T: PartialEq> PartialEq for Message<T> {
382  fn eq(&self, other: &Self) -> bool {
383    self.id == other.id && self.payload == other.payload
384  }
385}
386
387impl<T: Eq> Eq for Message<T> {}
388
389impl<T: Hash> Hash for Message<T> {
390  fn hash<H: Hasher>(&self, state: &mut H) {
391    self.id.hash(state);
392    self.payload.hash(state);
393  }
394}
395
396/// Trait for types that generate message IDs.
397pub trait IdGenerator: Send + Sync {
398  /// Generate the next message ID.
399  fn next_id(&self) -> MessageId;
400}
401
402/// UUID-based ID generator.
403///
404/// Generates unique UUIDv4-style identifiers.
405#[derive(Debug, Default)]
406pub struct UuidGenerator;
407
408impl UuidGenerator {
409  /// Create a new UUID generator.
410  #[must_use]
411  pub fn new() -> Self {
412    Self
413  }
414}
415
416impl IdGenerator for UuidGenerator {
417  fn next_id(&self) -> MessageId {
418    MessageId::new_uuid()
419  }
420}
421
422/// Sequence-based ID generator.
423///
424/// Generates monotonically increasing sequence numbers.
425/// Thread-safe using atomic operations.
426#[derive(Debug)]
427pub struct SequenceGenerator {
428  counter: AtomicU64,
429}
430
431impl SequenceGenerator {
432  /// Create a new sequence generator starting at 0.
433  #[must_use]
434  pub fn new() -> Self {
435    Self {
436      counter: AtomicU64::new(0),
437    }
438  }
439
440  /// Create a new sequence generator starting at the given value.
441  #[must_use]
442  pub fn starting_at(start: u64) -> Self {
443    Self {
444      counter: AtomicU64::new(start),
445    }
446  }
447
448  /// Get the current sequence number without incrementing.
449  #[must_use]
450  pub fn current(&self) -> u64 {
451    self.counter.load(Ordering::Relaxed)
452  }
453
454  /// Reset the sequence to 0.
455  pub fn reset(&self) {
456    self.counter.store(0, Ordering::Relaxed);
457  }
458
459  /// Reset the sequence to a specific value.
460  pub fn reset_to(&self, value: u64) {
461    self.counter.store(value, Ordering::Relaxed);
462  }
463}
464
465impl Default for SequenceGenerator {
466  fn default() -> Self {
467    Self::new()
468  }
469}
470
471impl IdGenerator for SequenceGenerator {
472  fn next_id(&self) -> MessageId {
473    MessageId::Sequence(self.counter.fetch_add(1, Ordering::Relaxed))
474  }
475}
476
477/// Content-hash ID generator.
478///
479/// Generates IDs based on the content of the message.
480/// This is useful for idempotency based on message content.
481#[derive(Debug, Default)]
482pub struct ContentHashGenerator;
483
484impl ContentHashGenerator {
485  /// Create a new content hash generator.
486  #[must_use]
487  pub fn new() -> Self {
488    Self
489  }
490
491  /// Generate an ID from the given content.
492  #[must_use]
493  pub fn hash_content(&self, content: &[u8]) -> MessageId {
494    MessageId::from_content(content)
495  }
496}
497
498/// A shared ID generator that can be cloned across threads.
499pub type SharedIdGenerator = Arc<dyn IdGenerator>;
500
501/// Create a shared UUID generator.
502#[must_use]
503pub fn uuid_generator() -> SharedIdGenerator {
504  Arc::new(UuidGenerator::new())
505}
506
507/// Create a shared sequence generator.
508#[must_use]
509pub fn sequence_generator() -> SharedIdGenerator {
510  Arc::new(SequenceGenerator::new())
511}
512
513/// Create a shared sequence generator starting at the given value.
514#[must_use]
515pub fn sequence_generator_from(start: u64) -> SharedIdGenerator {
516  Arc::new(SequenceGenerator::starting_at(start))
517}
518
519// Simple random number generator for UUIDs
520// This uses a basic xorshift algorithm seeded from system time
521fn rand_u64() -> u64 {
522  use std::cell::Cell;
523  use std::hash::{Hash, Hasher};
524
525  thread_local! {
526    static STATE: Cell<u64> = {
527      // Seed from time and thread ID (hashed for uniqueness)
528      let time_seed = std::time::SystemTime::now()
529        .duration_since(std::time::UNIX_EPOCH)
530        .map(|d| d.as_nanos() as u64)
531        .unwrap_or(0x12345678DEADBEEF);
532
533      let mut hasher = std::collections::hash_map::DefaultHasher::new();
534      std::thread::current().id().hash(&mut hasher);
535      let thread_seed = hasher.finish();
536
537      Cell::new(time_seed ^ thread_seed)
538    };
539  }
540
541  STATE.with(|state| {
542    let mut x = state.get();
543    x ^= x << 13;
544    x ^= x >> 7;
545    x ^= x << 17;
546    state.set(x);
547    x
548  })
549}
550
551#[cfg(test)]
552mod tests {
553  use super::*;
554
555  #[test]
556  fn test_message_id_uuid() {
557    let id = MessageId::new_uuid();
558    assert!(id.is_uuid());
559    assert!(!id.is_sequence());
560    assert!(!id.is_custom());
561    assert!(!id.is_content_hash());
562  }
563
564  #[test]
565  fn test_message_id_sequence() {
566    let id = MessageId::new_sequence(42);
567    assert!(id.is_sequence());
568    assert!(!id.is_uuid());
569
570    if let MessageId::Sequence(seq) = id {
571      assert_eq!(seq, 42);
572    } else {
573      panic!("Expected sequence ID");
574    }
575  }
576
577  #[test]
578  fn test_message_id_custom() {
579    let id = MessageId::new_custom("my-custom-id");
580    assert!(id.is_custom());
581
582    if let MessageId::Custom(s) = id {
583      assert_eq!(s, "my-custom-id");
584    } else {
585      panic!("Expected custom ID");
586    }
587  }
588
589  #[test]
590  fn test_message_id_content_hash() {
591    let id1 = MessageId::from_content(b"hello");
592    let id2 = MessageId::from_content(b"hello");
593    let id3 = MessageId::from_content(b"world");
594
595    assert!(id1.is_content_hash());
596    assert_eq!(id1, id2); // Same content = same hash
597    assert_ne!(id1, id3); // Different content = different hash
598  }
599
600  #[test]
601  fn test_message_id_display() {
602    let uuid = MessageId::Uuid(0x12345678_1234_4567_89AB_CDEF01234567);
603    let formatted = format!("{}", uuid);
604    assert!(formatted.contains("-")); // UUID format has dashes
605
606    let seq = MessageId::new_sequence(42);
607    assert_eq!(format!("{}", seq), "seq:42");
608
609    let custom = MessageId::new_custom("test");
610    assert_eq!(format!("{}", custom), "custom:test");
611
612    let hash = MessageId::ContentHash(0xDEADBEEF);
613    assert!(format!("{}", hash).starts_with("hash:"));
614  }
615
616  #[test]
617  fn test_message_creation() {
618    let msg = Message::new(42, MessageId::new_sequence(1));
619    assert_eq!(*msg.payload(), 42);
620    assert!(msg.id().is_sequence());
621  }
622
623  #[test]
624  fn test_message_with_metadata() {
625    let metadata = MessageMetadata::with_timestamp_now()
626      .source("test-source")
627      .partition(3);
628
629    let msg = Message::with_metadata("hello", MessageId::new_uuid(), metadata);
630
631    assert_eq!(*msg.payload(), "hello");
632    assert_eq!(msg.metadata().source, Some("test-source".to_string()));
633    assert_eq!(msg.metadata().partition, Some(3));
634  }
635
636  #[test]
637  fn test_message_map() {
638    let msg = Message::new(42, MessageId::new_sequence(1));
639    let mapped = msg.map(|x| x * 2);
640
641    assert_eq!(*mapped.payload(), 84);
642    assert_eq!(*mapped.id(), MessageId::new_sequence(1)); // ID preserved
643  }
644
645  #[test]
646  fn test_message_into_parts() {
647    let msg = Message::new("test", MessageId::new_sequence(5));
648    let (id, payload, _metadata) = msg.into_parts();
649
650    assert_eq!(id, MessageId::new_sequence(5));
651    assert_eq!(payload, "test");
652  }
653
654  #[test]
655  fn test_uuid_generator() {
656    let generator = UuidGenerator::new();
657    let id1 = generator.next_id();
658    let id2 = generator.next_id();
659
660    assert!(id1.is_uuid());
661    assert!(id2.is_uuid());
662    assert_ne!(id1, id2); // UUIDs should be unique
663  }
664
665  #[test]
666  fn test_sequence_generator() {
667    let generator = SequenceGenerator::new();
668    let id1 = generator.next_id();
669    let id2 = generator.next_id();
670    let id3 = generator.next_id();
671
672    assert_eq!(id1, MessageId::Sequence(0));
673    assert_eq!(id2, MessageId::Sequence(1));
674    assert_eq!(id3, MessageId::Sequence(2));
675  }
676
677  #[test]
678  fn test_sequence_generator_starting_at() {
679    let generator = SequenceGenerator::starting_at(100);
680    let id = generator.next_id();
681    assert_eq!(id, MessageId::Sequence(100));
682  }
683
684  #[test]
685  fn test_sequence_generator_reset() {
686    let generator = SequenceGenerator::starting_at(50);
687    generator.next_id();
688    generator.next_id();
689    assert_eq!(generator.current(), 52);
690
691    generator.reset();
692    assert_eq!(generator.current(), 0);
693
694    generator.reset_to(1000);
695    assert_eq!(generator.current(), 1000);
696  }
697
698  #[test]
699  fn test_metadata_builder() {
700    let metadata = MessageMetadata::new()
701      .source("my-source")
702      .partition(5)
703      .offset(100)
704      .key("my-key")
705      .header("content-type", "application/json");
706
707    assert_eq!(metadata.source, Some("my-source".to_string()));
708    assert_eq!(metadata.partition, Some(5));
709    assert_eq!(metadata.offset, Some(100));
710    assert_eq!(metadata.key, Some("my-key".to_string()));
711    assert_eq!(
712      metadata.get_header("content-type"),
713      Some("application/json")
714    );
715    assert_eq!(metadata.get_header("non-existent"), None);
716  }
717
718  #[test]
719  fn test_shared_generators() {
720    let uuid_gen = uuid_generator();
721    let seq_gen = sequence_generator();
722    let seq_gen_from = sequence_generator_from(1000);
723
724    assert!(uuid_gen.next_id().is_uuid());
725    assert_eq!(seq_gen.next_id(), MessageId::Sequence(0));
726    assert_eq!(seq_gen_from.next_id(), MessageId::Sequence(1000));
727  }
728
729  #[test]
730  fn test_message_equality() {
731    let id = MessageId::new_sequence(1);
732    let msg1 = Message::new(42, id.clone());
733    let msg2 = Message::new(42, id.clone());
734    let msg3 = Message::new(43, id);
735
736    assert_eq!(msg1, msg2);
737    assert_ne!(msg1, msg3);
738  }
739
740  #[test]
741  fn test_concurrent_sequence_generator() {
742    use std::sync::Arc;
743    use std::thread;
744
745    let generator = Arc::new(SequenceGenerator::new());
746    let mut handles = vec![];
747
748    for _ in 0..10 {
749      let generator_clone = Arc::clone(&generator);
750      handles.push(thread::spawn(move || {
751        let mut ids = vec![];
752        for _ in 0..100 {
753          ids.push(generator_clone.next_id());
754        }
755        ids
756      }));
757    }
758
759    let mut all_ids: Vec<MessageId> = handles
760      .into_iter()
761      .flat_map(|h| h.join().unwrap())
762      .collect();
763
764    all_ids.sort_by_key(|id| {
765      if let MessageId::Sequence(seq) = id {
766        *seq
767      } else {
768        panic!("Expected sequence ID")
769      }
770    });
771
772    // Check all IDs are unique (sequence 0..1000)
773    for (i, id) in all_ids.iter().enumerate() {
774      assert_eq!(*id, MessageId::Sequence(i as u64));
775    }
776  }
777}