streamweave-message
Message envelope types for StreamWeave
Unique identifiers and metadata for exactly-once processing and message tracking.
The streamweave-message package provides message envelope types that wrap stream items with unique identifiers and metadata. This enables features like message deduplication, offset tracking, exactly-once processing guarantees, and message flow tracking in pipelines and graphs.
β¨ Key Features
- Message Envelope: Wrap payloads with unique IDs and metadata
- MessageId Types: UUID, Sequence, Custom, and Content-Hash identifiers
- MessageMetadata: Rich metadata (timestamp, source, partition, offset, key, headers)
- ID Generators: UUID, Sequence, and Content-Hash generators
- Message Operations: Map, transform, and unwrap messages
- Exactly-Once Processing: Enable deduplication and idempotency
π¦ Installation
Add this to your Cargo.toml:
[]
= "0.3.0"
π Quick Start
Creating Messages
use ;
// Create a simple message with UUID
let msg = new;
// Create a message with sequence ID
let msg = new;
// Create a message with metadata
let metadata = with_timestamp_now
.source
.partition
.offset;
let msg = with_metadata;
Using ID Generators
use ;
// UUID generator (globally unique)
let uuid_gen = new;
let id1 = uuid_gen.next_id;
let id2 = uuid_gen.next_id;
// Sequence generator (monotonically increasing)
let seq_gen = new;
let seq1 = seq_gen.next_id; // Sequence(0)
let seq2 = seq_gen.next_id; // Sequence(1)
// Sequence generator starting at specific value
let seq_gen = starting_at;
let seq = seq_gen.next_id; // Sequence(1000)
π API Overview
Message Type
The Message<T> type wraps a payload with an ID and metadata:
Key Methods:
new(payload, id)- Create message with payload and IDwith_metadata(payload, id, metadata)- Create message with full metadataid()- Get message IDpayload()- Get payload referencemetadata()- Get metadata referencemap(f)- Transform payload while preserving ID and metadatainto_payload()- Extract payload, discarding envelopeinto_parts()- Extract all components
MessageId Enum
The MessageId enum supports multiple ID types:
ID Types:
- Uuid: Globally unique, good for distributed systems
- Sequence: Monotonically increasing, good for ordered processing
- Custom: User-provided identifier (e.g., from source system)
- ContentHash: Derived from content, useful for idempotency
MessageMetadata
The MessageMetadata struct provides rich metadata:
ID Generators
Multiple ID generator implementations:
UuidGenerator:
- Generates UUIDv4-style identifiers
- Globally unique
- Thread-safe
SequenceGenerator:
- Generates monotonically increasing sequence numbers
- Thread-safe using atomic operations
- Supports starting at specific value
- Can be reset
ContentHashGenerator:
- Generates IDs based on message content
- Useful for content-based idempotency
- Same content = same ID
π Usage Examples
Creating Messages with Different ID Types
use ;
// UUID-based message
let msg = new;
// Sequence-based message
let msg = new;
// Custom ID message
let msg = new;
// Content-hash based message
let content = b"my content";
let msg = new;
Working with Metadata
use ;
// Create metadata with builder pattern
let metadata = with_timestamp_now
.source
.partition
.offset
.key
.header
.header;
let msg = with_metadata;
// Access metadata
assert_eq!;
assert_eq!;
assert_eq!;
Transforming Messages
use Message;
let msg = new;
// Map payload while preserving ID and metadata
let doubled = msg.map;
assert_eq!;
assert_eq!;
// Map with access to message ID
let with_id = msg.map_with_id;
// Replace payload
let new_msg = msg.with_payload;
Using ID Generators
use ;
// UUID generator
let uuid_gen = new;
for _ in 0..10
// Sequence generator
let seq_gen = new;
let id1 = seq_gen.next_id; // Sequence(0)
let id2 = seq_gen.next_id; // Sequence(1)
// Sequence generator with starting value
let seq_gen = starting_at;
let id = seq_gen.next_id; // Sequence(100)
// Reset sequence
seq_gen.reset;
let id = seq_gen.next_id; // Sequence(0)
// Get current value without incrementing
let current = seq_gen.current;
Message Flow in Pipelines
use ;
use Transformer;
// Wrap items in messages
let messages: = vec!
.into_iter
.enumerate
.map
.collect;
// Process messages (ID and metadata preserved)
let processed: = messages
.into_iter
.map
.collect;
// Unwrap payloads when needed
let payloads: = processed
.into_iter
.map
.collect;
Message Deduplication
use ;
use HashSet;
// Track seen message IDs
let mut seen = new;
let messages = vec!;
for msg in messages
Message Routing by Key
use ;
let messages = vec!;
// Route messages by key
let mut user1_messages = vec!;
let mut user2_messages = vec!;
for msg in messages
ποΈ Architecture
Messages flow through pipelines and graphs with their envelope intact:
βββββββββββββββ
β Producer ββββproducesβββ> Message<T>
βββββββββββββββ
β
β Message flows through
βΌ
βββββββββββββββ
β Transformer ββββtransformsβββ> Message<U> (ID preserved)
βββββββββββββββ
β
β Message flows through
βΌ
βββββββββββββββ
β Consumer ββββconsumesβββ> (can extract payload or keep envelope)
βββββββββββββββ
Message Envelope Structure:
Message<T>
βββ MessageId (unique identifier)
βββ Payload<T> (actual data)
βββ MessageMetadata
βββ timestamp
βββ source
βββ partition
βββ offset
βββ key
βββ headers
π Dependencies
streamweave-message depends on:
serde- Serialization supportserde_json- JSON serializationchrono- Timestamp supportstreamweave(optional) - Integration with core traits
π― Use Cases
Message envelopes are used for:
- Exactly-Once Processing: Unique IDs enable deduplication
- Offset Tracking: Track position in source streams
- Message Routing: Route by key or partition
- Idempotency: Content-hash IDs for content-based deduplication
- Message Correlation: Track messages through complex pipelines
- Audit Trails: Metadata provides full message history
π Error Handling
Messages work seamlessly with the error handling system:
use Message;
use StreamError;
// Error context can include the message
let error_context = ErrorContext ;
β‘ Performance Considerations
- Zero-Copy: Message operations are designed for efficiency
- Clone Efficiency: Messages clone efficiently when needed
- Thread-Safe: ID generators are thread-safe
- Minimal Overhead: Envelope adds minimal overhead to payloads
π Examples
For more examples, see:
π Documentation
π See Also
- streamweave - Core traits
- streamweave-offset - Offset management
- streamweave-transaction - Transaction support
π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
π License
This project is licensed under the CC BY-SA 4.0 license.