streamweave_message/
message.rs

1//! Message envelope types for exactly-once processing.
2//!
3//! This module provides types for wrapping stream items with unique identifiers
4//! and metadata, enabling features like deduplication, offset tracking, and
5//! exactly-once processing guarantees.
6//!
7//! # Overview
8//!
9//! The core types are:
10//!
11//! - [`MessageId`]: A unique identifier for messages
12//! - [`Message<T>`]: A wrapper that adds an ID and metadata to a payload
13//! - [`MessageMetadata`]: Additional information about a message
14//!
15//! # Example
16//!
17//! ```rust
18//! use streamweave::message::{Message, MessageId, MessageMetadata, IdGenerator, UuidGenerator};
19//!
20//! // Create a message with a UUID
21//! let generator = UuidGenerator::new();
22//! let msg = Message::new(42, generator.next_id());
23//!
24//! assert_eq!(*msg.payload(), 42);
25//! ```
26
27use std::fmt::{self, Display, Formatter};
28use std::hash::{Hash, Hasher};
29use std::sync::Arc;
30use std::sync::atomic::{AtomicU64, Ordering};
31use std::time::{Duration, SystemTime, UNIX_EPOCH};
32
33/// A unique identifier for messages.
34///
35/// Message IDs can be generated using various strategies:
36/// - UUID: Globally unique, good for distributed systems
37/// - Sequence: Monotonically increasing, good for ordered processing
38/// - Custom: User-provided identifier (e.g., from source system)
39#[derive(Clone, Debug, Eq, PartialEq)]
40pub enum MessageId {
41  /// A UUID-based identifier (128-bit).
42  Uuid(u128),
43
44  /// A sequence-based identifier (64-bit).
45  Sequence(u64),
46
47  /// A custom string identifier.
48  Custom(String),
49
50  /// A hash-based identifier derived from content.
51  ContentHash(u64),
52}
53
54impl MessageId {
55  /// Create a new UUID-based message ID using UUIDv4.
56  #[must_use]
57  pub fn new_uuid() -> Self {
58    // Use a simple random UUID implementation
59    let high = rand_u64();
60    let low = rand_u64();
61    // Set version 4 and variant bits
62    let uuid = ((high & 0xFFFFFFFFFFFF0FFF) | 0x0000000000004000) as u128
63      | (((low & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000) as u128) << 64;
64    MessageId::Uuid(uuid)
65  }
66
67  /// Create a new sequence-based message ID.
68  #[must_use]
69  pub const fn new_sequence(seq: u64) -> Self {
70    MessageId::Sequence(seq)
71  }
72
73  /// Create a custom message ID from a string.
74  #[must_use]
75  pub fn new_custom(id: impl Into<String>) -> Self {
76    MessageId::Custom(id.into())
77  }
78
79  /// Create a content-hash message ID from the given bytes.
80  #[must_use]
81  pub fn from_content(content: &[u8]) -> Self {
82    use std::collections::hash_map::DefaultHasher;
83    let mut hasher = DefaultHasher::new();
84    content.hash(&mut hasher);
85    MessageId::ContentHash(hasher.finish())
86  }
87
88  /// Returns true if this is a UUID-based ID.
89  #[must_use]
90  pub const fn is_uuid(&self) -> bool {
91    matches!(self, MessageId::Uuid(_))
92  }
93
94  /// Returns true if this is a sequence-based ID.
95  #[must_use]
96  pub const fn is_sequence(&self) -> bool {
97    matches!(self, MessageId::Sequence(_))
98  }
99
100  /// Returns true if this is a custom ID.
101  #[must_use]
102  pub const fn is_custom(&self) -> bool {
103    matches!(self, MessageId::Custom(_))
104  }
105
106  /// Returns true if this is a content-hash ID.
107  #[must_use]
108  pub const fn is_content_hash(&self) -> bool {
109    matches!(self, MessageId::ContentHash(_))
110  }
111}
112
113impl Display for MessageId {
114  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
115    match self {
116      MessageId::Uuid(uuid) => {
117        // Format as standard UUID string
118        write!(
119          f,
120          "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
121          (uuid >> 96) as u32,
122          (uuid >> 80) as u16,
123          (uuid >> 64) as u16,
124          (uuid >> 48) as u16,
125          (uuid & 0xFFFFFFFFFFFF) as u64
126        )
127      }
128      MessageId::Sequence(seq) => write!(f, "seq:{}", seq),
129      MessageId::Custom(id) => write!(f, "custom:{}", id),
130      MessageId::ContentHash(hash) => write!(f, "hash:{:016x}", hash),
131    }
132  }
133}
134
135impl Hash for MessageId {
136  fn hash<H: Hasher>(&self, state: &mut H) {
137    std::mem::discriminant(self).hash(state);
138    match self {
139      MessageId::Uuid(uuid) => uuid.hash(state),
140      MessageId::Sequence(seq) => seq.hash(state),
141      MessageId::Custom(id) => id.hash(state),
142      MessageId::ContentHash(hash) => hash.hash(state),
143    }
144  }
145}
146
147impl Default for MessageId {
148  fn default() -> Self {
149    MessageId::new_uuid()
150  }
151}
152
153/// Metadata associated with a message.
154///
155/// Uses `Arc<str>` for string fields to enable zero-copy string sharing
156/// in fan-out scenarios and reduce memory usage for repeated strings.
157#[derive(Clone, Debug, Default)]
158pub struct MessageMetadata {
159  /// When the message was created (as Duration since UNIX_EPOCH).
160  pub timestamp: Option<Duration>,
161
162  /// The source of the message (e.g., topic, file, etc.).
163  /// Uses `Arc<str>` for zero-copy string sharing.
164  pub source: Option<Arc<str>>,
165
166  /// Partition or shard information.
167  pub partition: Option<u32>,
168
169  /// Offset within the partition/source.
170  pub offset: Option<u64>,
171
172  /// User-defined key for routing/grouping.
173  /// Uses `Arc<str>` for zero-copy string sharing.
174  pub key: Option<Arc<str>>,
175
176  /// Additional headers/attributes.
177  /// Uses `Arc<str>` for both keys and values to enable zero-copy sharing.
178  pub headers: Vec<(Arc<str>, Arc<str>)>,
179}
180
181impl MessageMetadata {
182  /// Create new empty metadata.
183  #[must_use]
184  pub fn new() -> Self {
185    Self::default()
186  }
187
188  /// Create metadata with the current timestamp.
189  #[must_use]
190  pub fn with_timestamp_now() -> Self {
191    Self {
192      timestamp: SystemTime::now().duration_since(UNIX_EPOCH).ok(),
193      ..Default::default()
194    }
195  }
196
197  /// Set the timestamp.
198  #[must_use]
199  pub fn timestamp(mut self, ts: Duration) -> Self {
200    self.timestamp = Some(ts);
201    self
202  }
203
204  /// Set the source.
205  ///
206  /// Accepts any type that can be converted to `Arc<str>`, including
207  /// `String`, `&str`, and `Arc<str>`.
208  #[must_use]
209  pub fn source(mut self, source: impl Into<Arc<str>>) -> Self {
210    self.source = Some(source.into());
211    self
212  }
213
214  /// Set the partition.
215  #[must_use]
216  pub fn partition(mut self, partition: u32) -> Self {
217    self.partition = Some(partition);
218    self
219  }
220
221  /// Set the offset.
222  #[must_use]
223  pub fn offset(mut self, offset: u64) -> Self {
224    self.offset = Some(offset);
225    self
226  }
227
228  /// Set the key.
229  ///
230  /// Accepts any type that can be converted to `Arc<str>`, including
231  /// `String`, `&str`, and `Arc<str>`.
232  #[must_use]
233  pub fn key(mut self, key: impl Into<Arc<str>>) -> Self {
234    self.key = Some(key.into());
235    self
236  }
237
238  /// Add a header.
239  ///
240  /// Accepts any types that can be converted to `Arc<str>`, including
241  /// `String`, `&str`, and `Arc<str>`.
242  #[must_use]
243  pub fn header(mut self, name: impl Into<Arc<str>>, value: impl Into<Arc<str>>) -> Self {
244    self.headers.push((name.into(), value.into()));
245    self
246  }
247
248  /// Get a header by name.
249  ///
250  /// Returns a `&str` reference to the header value if found.
251  #[must_use]
252  pub fn get_header(&self, name: &str) -> Option<&str> {
253    self
254      .headers
255      .iter()
256      .find(|(k, _)| k.as_ref() == name)
257      .map(|(_, v)| v.as_ref())
258  }
259
260  /// Get the source as a string reference.
261  ///
262  /// Returns `None` if no source is set, or `Some(&str)` with the source value.
263  #[must_use]
264  pub fn get_source(&self) -> Option<&str> {
265    self.source.as_deref()
266  }
267
268  /// Get the key as a string reference.
269  ///
270  /// Returns `None` if no key is set, or `Some(&str)` with the key value.
271  #[must_use]
272  pub fn get_key(&self) -> Option<&str> {
273    self.key.as_deref()
274  }
275
276  /// Create metadata with shared source (for zero-copy scenarios).
277  ///
278  /// This method accepts an `Arc<str>` for the source, enabling zero-copy
279  /// sharing of source strings across multiple messages.
280  ///
281  /// This is now equivalent to `source()` since MessageMetadata uses `Arc<str>` directly.
282  #[must_use]
283  pub fn with_shared_source(mut self, source: Arc<str>) -> Self {
284    self.source = Some(source);
285    self
286  }
287
288  /// Create metadata with shared key (for zero-copy scenarios).
289  ///
290  /// This method accepts an `Arc<str>` for the key, enabling zero-copy
291  /// sharing of key strings across multiple messages.
292  ///
293  /// This is now equivalent to `key()` since MessageMetadata uses `Arc<str>` directly.
294  #[must_use]
295  pub fn with_shared_key(mut self, key: Arc<str>) -> Self {
296    self.key = Some(key);
297    self
298  }
299
300  /// Add a header with shared strings (for zero-copy scenarios).
301  ///
302  /// This method accepts `Arc<str>` for header name and value, enabling
303  /// zero-copy sharing of header strings across multiple messages.
304  ///
305  /// This is now equivalent to `header()` since MessageMetadata uses `Arc<str>` directly.
306  #[must_use]
307  pub fn with_shared_header(mut self, name: Arc<str>, value: Arc<str>) -> Self {
308    self.headers.push((name, value));
309    self
310  }
311
312  /// Create metadata with source from a borrowed string.
313  ///
314  /// This is a convenience method that accepts `&str` and converts it to `Arc<str>`.
315  #[must_use]
316  pub fn with_source_borrowed(mut self, source: &str) -> Self {
317    self.source = Some(Arc::from(source));
318    self
319  }
320
321  /// Create metadata with interned source string.
322  ///
323  /// This method accepts a `&str` and a string interner, interns the string,
324  /// and stores the interned `Arc<str>` as the source. This enables automatic
325  /// string deduplication for repeated source values.
326  ///
327  /// # Arguments
328  ///
329  /// * `source` - The source string to intern
330  /// * `interner` - The string interner to use (must implement `StringInternerTrait`)
331  ///
332  /// # Returns
333  ///
334  /// `Self` for method chaining
335  ///
336  /// # Note
337  ///
338  /// This method accepts any type that implements `StringInternerTrait`, which
339  /// is provided by `streamweave-graph::StringInterner`. For use with the graph
340  /// package, pass a `&StringInterner` from `streamweave-graph`.
341  #[must_use]
342  pub fn with_source_interned<I: StringInternerTrait>(
343    mut self,
344    source: &str,
345    interner: &I,
346  ) -> Self {
347    self.source = Some(interner.get_or_intern(source));
348    self
349  }
350
351  /// Create metadata with interned key string.
352  ///
353  /// This method accepts a `&str` and a string interner, interns the string,
354  /// and stores the interned `Arc<str>` as the key. This enables automatic
355  /// string deduplication for repeated key values.
356  ///
357  /// # Arguments
358  ///
359  /// * `key` - The key string to intern
360  /// * `interner` - The string interner to use (must implement `StringInternerTrait`)
361  ///
362  /// # Returns
363  ///
364  /// `Self` for method chaining
365  #[must_use]
366  pub fn with_key_interned<I: StringInternerTrait>(mut self, key: &str, interner: &I) -> Self {
367    self.key = Some(interner.get_or_intern(key));
368    self
369  }
370
371  /// Add a header with interned strings.
372  ///
373  /// This method accepts header name and value as `&str`, interns both strings,
374  /// and adds the interned `Arc<str>` pair to headers. This enables automatic
375  /// string deduplication for repeated header keys and values.
376  ///
377  /// # Arguments
378  ///
379  /// * `name` - The header name to intern
380  /// * `value` - The header value to intern
381  /// * `interner` - The string interner to use (must implement `StringInternerTrait`)
382  ///
383  /// # Returns
384  ///
385  /// `Self` for method chaining
386  #[must_use]
387  pub fn add_header_interned<I: StringInternerTrait>(
388    mut self,
389    name: &str,
390    value: &str,
391    interner: &I,
392  ) -> Self {
393    self
394      .headers
395      .push((interner.get_or_intern(name), interner.get_or_intern(value)));
396    self
397  }
398}
399
400/// A message envelope that wraps a payload with an ID and metadata.
401///
402/// This is the primary type for exactly-once processing. It provides:
403/// - A unique identifier for deduplication
404/// - Metadata for tracking and routing
405/// - The actual payload
406///
407/// # Type Parameters
408///
409/// - `T`: The payload type
410///
411/// # Example
412///
413/// ```rust
414/// use streamweave::message::{Message, MessageId, MessageMetadata};
415///
416/// // Create a simple message
417/// let msg = Message::new(42, MessageId::new_uuid());
418///
419/// // Create a message with metadata
420/// let msg = Message::with_metadata(
421///     "hello",
422///     MessageId::new_sequence(1),
423///     MessageMetadata::with_timestamp_now().source("my-source")
424/// );
425/// ```
426#[derive(Clone, Debug)]
427pub struct Message<T> {
428  id: MessageId,
429  payload: T,
430  metadata: MessageMetadata,
431}
432
433impl<T> Message<T> {
434  /// Create a new message with the given payload and ID.
435  #[must_use]
436  pub fn new(payload: T, id: MessageId) -> Self {
437    Self {
438      id,
439      payload,
440      metadata: MessageMetadata::with_timestamp_now(),
441    }
442  }
443
444  /// Create a new message with payload, ID, and metadata.
445  #[must_use]
446  pub fn with_metadata(payload: T, id: MessageId, metadata: MessageMetadata) -> Self {
447    Self {
448      id,
449      payload,
450      metadata,
451    }
452  }
453
454  /// Get the message ID.
455  #[must_use]
456  pub fn id(&self) -> &MessageId {
457    &self.id
458  }
459
460  /// Get the payload.
461  #[must_use]
462  pub fn payload(&self) -> &T {
463    &self.payload
464  }
465
466  /// Get a mutable reference to the payload.
467  pub fn payload_mut(&mut self) -> &mut T {
468    &mut self.payload
469  }
470
471  /// Get the metadata.
472  #[must_use]
473  pub fn metadata(&self) -> &MessageMetadata {
474    &self.metadata
475  }
476
477  /// Get a mutable reference to the metadata.
478  pub fn metadata_mut(&mut self) -> &mut MessageMetadata {
479    &mut self.metadata
480  }
481
482  /// Consume the message and return its components.
483  #[must_use]
484  pub fn into_parts(self) -> (MessageId, T, MessageMetadata) {
485    (self.id, self.payload, self.metadata)
486  }
487
488  /// Consume the message and return just the payload.
489  #[must_use]
490  pub fn into_payload(self) -> T {
491    self.payload
492  }
493
494  /// Map the payload to a new type.
495  #[must_use]
496  pub fn map<U, F>(self, f: F) -> Message<U>
497  where
498    F: FnOnce(T) -> U,
499  {
500    Message {
501      id: self.id,
502      payload: f(self.payload),
503      metadata: self.metadata,
504    }
505  }
506
507  /// Map the payload to a new type, with access to the message ID.
508  #[must_use]
509  pub fn map_with_id<U, F>(self, f: F) -> Message<U>
510  where
511    F: FnOnce(&MessageId, T) -> U,
512  {
513    Message {
514      payload: f(&self.id, self.payload),
515      id: self.id,
516      metadata: self.metadata,
517    }
518  }
519
520  /// Replace the payload with a new value.
521  #[must_use]
522  pub fn with_payload<U>(self, payload: U) -> Message<U> {
523    Message {
524      id: self.id,
525      payload,
526      metadata: self.metadata,
527    }
528  }
529}
530
531impl<T: Default> Default for Message<T> {
532  fn default() -> Self {
533    Self::new(T::default(), MessageId::new_uuid())
534  }
535}
536
537impl<T: PartialEq> PartialEq for Message<T> {
538  fn eq(&self, other: &Self) -> bool {
539    self.id == other.id && self.payload == other.payload
540  }
541}
542
543impl<T: Eq> Eq for Message<T> {}
544
545impl<T: Hash> Hash for Message<T> {
546  fn hash<H: Hasher>(&self, state: &mut H) {
547    self.id.hash(state);
548    self.payload.hash(state);
549  }
550}
551
552/// Trait for types that generate message IDs.
553pub trait IdGenerator: Send + Sync {
554  /// Generate the next message ID.
555  fn next_id(&self) -> MessageId;
556}
557
558/// UUID-based ID generator.
559///
560/// Generates unique UUIDv4-style identifiers.
561#[derive(Debug, Default)]
562pub struct UuidGenerator;
563
564impl UuidGenerator {
565  /// Create a new UUID generator.
566  #[must_use]
567  pub fn new() -> Self {
568    Self
569  }
570}
571
572impl IdGenerator for UuidGenerator {
573  fn next_id(&self) -> MessageId {
574    MessageId::new_uuid()
575  }
576}
577
578/// Sequence-based ID generator.
579///
580/// Generates monotonically increasing sequence numbers.
581/// Thread-safe using atomic operations.
582#[derive(Debug)]
583pub struct SequenceGenerator {
584  counter: AtomicU64,
585}
586
587impl SequenceGenerator {
588  /// Create a new sequence generator starting at 0.
589  #[must_use]
590  pub fn new() -> Self {
591    Self {
592      counter: AtomicU64::new(0),
593    }
594  }
595
596  /// Create a new sequence generator starting at the given value.
597  #[must_use]
598  pub fn starting_at(start: u64) -> Self {
599    Self {
600      counter: AtomicU64::new(start),
601    }
602  }
603
604  /// Get the current sequence number without incrementing.
605  #[must_use]
606  pub fn current(&self) -> u64 {
607    self.counter.load(Ordering::Relaxed)
608  }
609
610  /// Reset the sequence to 0.
611  pub fn reset(&self) {
612    self.counter.store(0, Ordering::Relaxed);
613  }
614
615  /// Reset the sequence to a specific value.
616  pub fn reset_to(&self, value: u64) {
617    self.counter.store(value, Ordering::Relaxed);
618  }
619}
620
621impl Default for SequenceGenerator {
622  fn default() -> Self {
623    Self::new()
624  }
625}
626
627impl IdGenerator for SequenceGenerator {
628  fn next_id(&self) -> MessageId {
629    MessageId::Sequence(self.counter.fetch_add(1, Ordering::Relaxed))
630  }
631}
632
633/// Content-hash ID generator.
634///
635/// Generates IDs based on the content of the message.
636/// This is useful for idempotency based on message content.
637#[derive(Debug, Default)]
638pub struct ContentHashGenerator;
639
640impl ContentHashGenerator {
641  /// Create a new content hash generator.
642  #[must_use]
643  pub fn new() -> Self {
644    Self
645  }
646
647  /// Generate an ID from the given content.
648  #[must_use]
649  pub fn hash_content(&self, content: &[u8]) -> MessageId {
650    MessageId::from_content(content)
651  }
652}
653
654/// A shared ID generator that can be cloned across threads.
655pub type SharedIdGenerator = Arc<dyn IdGenerator>;
656
657/// Create a shared UUID generator.
658#[must_use]
659pub fn uuid_generator() -> SharedIdGenerator {
660  Arc::new(UuidGenerator::new())
661}
662
663/// Create a shared sequence generator.
664#[must_use]
665pub fn sequence_generator() -> SharedIdGenerator {
666  Arc::new(SequenceGenerator::new())
667}
668
669/// Create a shared sequence generator starting at the given value.
670#[must_use]
671pub fn sequence_generator_from(start: u64) -> SharedIdGenerator {
672  Arc::new(SequenceGenerator::starting_at(start))
673}
674
675// Simple random number generator for UUIDs
676// This uses a basic xorshift algorithm seeded from system time
677fn rand_u64() -> u64 {
678  use std::cell::Cell;
679  use std::hash::{Hash, Hasher};
680
681  thread_local! {
682    static STATE: Cell<u64> = {
683      // Seed from time and thread ID (hashed for uniqueness)
684      let time_seed = std::time::SystemTime::now()
685        .duration_since(std::time::UNIX_EPOCH)
686        .map(|d| d.as_nanos() as u64)
687        .unwrap_or(0x12345678DEADBEEF);
688
689      let mut hasher = std::collections::hash_map::DefaultHasher::new();
690      std::thread::current().id().hash(&mut hasher);
691      let thread_seed = hasher.finish();
692
693      Cell::new(time_seed ^ thread_seed)
694    };
695  }
696
697  STATE.with(|state| {
698    let mut x = state.get();
699    x ^= x << 13;
700    x ^= x >> 7;
701    x ^= x << 17;
702    state.set(x);
703    x
704  })
705}
706
707/// A shared message wrapper for zero-copy message sharing.
708///
709/// This type wraps `Arc<Message<T>>` to enable zero-copy sharing of messages
710/// in fan-out scenarios where one message needs to be sent to multiple consumers.
711///
712/// # Example
713///
714/// ```rust
715/// use streamweave_message::{Message, MessageId, SharedMessage};
716///
717/// let msg = Message::new(42, MessageId::new_uuid());
718/// let shared = SharedMessage::from(msg);
719/// let cloned = shared.clone(); // Zero-cost clone of Arc
720/// ```
721#[derive(Clone, Debug)]
722pub struct SharedMessage<T> {
723  inner: Arc<Message<T>>,
724}
725
726impl<T> SharedMessage<T> {
727  /// Create a SharedMessage from an owned Message.
728  #[must_use]
729  pub fn from(message: Message<T>) -> Self {
730    Self {
731      inner: Arc::new(message),
732    }
733  }
734
735  /// Create a SharedMessage from an `Arc<Message<T>>`.
736  #[must_use]
737  pub fn from_arc(arc: Arc<Message<T>>) -> Self {
738    Self { inner: arc }
739  }
740
741  /// Get the inner Arc.
742  #[must_use]
743  pub fn into_arc(self) -> Arc<Message<T>> {
744    self.inner
745  }
746
747  /// Try to unwrap the Arc, returning the owned Message if this is the only reference.
748  ///
749  /// Returns `Ok(Message<T>)` if this is the only reference, `Err(SharedMessage<T>)` otherwise.
750  pub fn try_unwrap(self) -> Result<Message<T>, Self> {
751    Arc::try_unwrap(self.inner).map_err(|arc| Self { inner: arc })
752  }
753
754  /// Get the message ID.
755  #[must_use]
756  pub fn id(&self) -> &MessageId {
757    self.inner.id()
758  }
759
760  /// Get the payload.
761  #[must_use]
762  pub fn payload(&self) -> &T {
763    self.inner.payload()
764  }
765
766  /// Get the metadata.
767  #[must_use]
768  pub fn metadata(&self) -> &MessageMetadata {
769    self.inner.metadata()
770  }
771}
772
773impl<T> std::ops::Deref for SharedMessage<T> {
774  type Target = Message<T>;
775
776  fn deref(&self) -> &Self::Target {
777    &self.inner
778  }
779}
780
781impl<T> std::convert::AsRef<Message<T>> for SharedMessage<T> {
782  fn as_ref(&self) -> &Message<T> {
783    &self.inner
784  }
785}
786
787impl<T: PartialEq> PartialEq for SharedMessage<T> {
788  fn eq(&self, other: &Self) -> bool {
789    self.inner.as_ref() == other.inner.as_ref()
790  }
791}
792
793impl<T: Eq> Eq for SharedMessage<T> {}
794
795impl<T: Hash> Hash for SharedMessage<T> {
796  fn hash<H: Hasher>(&self, state: &mut H) {
797    self.inner.hash(state);
798  }
799}
800
801impl<T> From<Message<T>> for SharedMessage<T> {
802  fn from(message: Message<T>) -> Self {
803    Self::from(message)
804  }
805}
806
807impl<T> From<Arc<Message<T>>> for SharedMessage<T> {
808  fn from(arc: Arc<Message<T>>) -> Self {
809    Self::from_arc(arc)
810  }
811}
812
813impl<T> From<SharedMessage<T>> for Arc<Message<T>> {
814  fn from(shared: SharedMessage<T>) -> Self {
815    shared.into_arc()
816  }
817}
818
819// Note: ZeroCopyShare trait implementation for Message<T> is provided by the
820// blanket implementation in streamweave-graph for all types that are
821// Clone + Send + Sync + 'static. Message<T> automatically implements
822// ZeroCopyShare when used with the graph package, enabling zero-copy
823// sharing via Arc<Message<T>> in fan-out scenarios.
824
825// Note: Arc::from() already provides conversion from String and &str to Arc<str>
826// No need to implement From trait due to orphan rule restrictions
827
828/// Trait for string interning functionality.
829///
830/// This trait abstracts over string interner implementations, allowing
831/// `MessageMetadata` to work with any interner type. The `streamweave-graph`
832/// package provides a concrete `StringInterner` implementation that implements
833/// this trait.
834pub trait StringInternerTrait {
835  /// Get an interned string, or intern a new one if it doesn't exist.
836  ///
837  /// # Arguments
838  ///
839  /// * `s` - The string to intern or retrieve
840  ///
841  /// # Returns
842  ///
843  /// An `Arc<str>` containing the interned string
844  fn get_or_intern(&self, s: &str) -> Arc<str>;
845}