Expand description
Β§streamweave-message
Message envelope types for StreamWeave
Unique identifiers and metadata for exactly-once processing and message tracking.
The streamweave-message package provides message envelope types that wrap stream items with unique identifiers and metadata. This enables features like message deduplication, offset tracking, exactly-once processing guarantees, and message flow tracking in pipelines and graphs.
Β§β¨ Key Features
- Message Envelope: Wrap payloads with unique IDs and metadata
- MessageId Types: UUID, Sequence, Custom, and Content-Hash identifiers
- MessageMetadata: Rich metadata (timestamp, source, partition, offset, key, headers)
- ID Generators: UUID, Sequence, and Content-Hash generators
- Message Operations: Map, transform, and unwrap messages
- Exactly-Once Processing: Enable deduplication and idempotency
Β§π¦ Installation
Add this to your Cargo.toml:
[dependencies]
streamweave-message = "0.3.0"Β§π Quick Start
Β§Creating Messages
use streamweave_message::{Message, MessageId, MessageMetadata};
// Create a simple message with UUID
let msg = Message::new(42, MessageId::new_uuid());
// Create a message with sequence ID
let msg = Message::new("hello", MessageId::new_sequence(1));
// Create a message with metadata
let metadata = MessageMetadata::with_timestamp_now()
.source("my-source")
.partition(0)
.offset(100);
let msg = Message::with_metadata(
"payload",
MessageId::new_uuid(),
metadata,
);Β§Using ID Generators
use streamweave_message::{UuidGenerator, SequenceGenerator, IdGenerator};
// UUID generator (globally unique)
let uuid_gen = UuidGenerator::new();
let id1 = uuid_gen.next_id();
let id2 = uuid_gen.next_id();
// Sequence generator (monotonically increasing)
let seq_gen = SequenceGenerator::new();
let seq1 = seq_gen.next_id(); // Sequence(0)
let seq2 = seq_gen.next_id(); // Sequence(1)
// Sequence generator starting at specific value
let seq_gen = SequenceGenerator::starting_at(1000);
let seq = seq_gen.next_id(); // Sequence(1000)Β§π API Overview
Β§Message Type
The Message<T> type wraps a payload with an ID and metadata:
pub struct Message<T> {
id: MessageId,
payload: T,
metadata: MessageMetadata,
}Key Methods:
new(payload, id)- Create message with payload and IDwith_metadata(payload, id, metadata)- Create message with full metadataid()- Get message IDpayload()- Get payload referencemetadata()- Get metadata referencemap(f)- Transform payload while preserving ID and metadatainto_payload()- Extract payload, discarding envelopeinto_parts()- Extract all components
Β§MessageId Enum
The MessageId enum supports multiple ID types:
pub enum MessageId {
Uuid(u128), // UUID-based (128-bit)
Sequence(u64), // Sequence-based (64-bit)
Custom(String), // Custom string identifier
ContentHash(u64), // Content-hash based
}ID Types:
- Uuid: Globally unique, good for distributed systems
- Sequence: Monotonically increasing, good for ordered processing
- Custom: User-provided identifier (e.g., from source system)
- ContentHash: Derived from content, useful for idempotency
Β§MessageMetadata
The MessageMetadata struct provides rich metadata:
pub struct MessageMetadata {
pub timestamp: Option<Duration>, // When message was created
pub source: Option<String>, // Source (topic, file, etc.)
pub partition: Option<u32>, // Partition/shard information
pub offset: Option<u64>, // Offset within partition
pub key: Option<String>, // Routing/grouping key
pub headers: Vec<(String, String)>, // Additional headers
}Β§ID Generators
Multiple ID generator implementations:
UuidGenerator:
- Generates UUIDv4-style identifiers
- Globally unique
- Thread-safe
SequenceGenerator:
- Generates monotonically increasing sequence numbers
- Thread-safe using atomic operations
- Supports starting at specific value
- Can be reset
ContentHashGenerator:
- Generates IDs based on message content
- Useful for content-based idempotency
- Same content = same ID
Β§π Usage Examples
Β§Creating Messages with Different ID Types
use streamweave_message::{Message, MessageId};
// UUID-based message
let msg = Message::new(42, MessageId::new_uuid());
// Sequence-based message
let msg = Message::new("data", MessageId::new_sequence(1));
// Custom ID message
let msg = Message::new(100, MessageId::new_custom("event-123"));
// Content-hash based message
let content = b"my content";
let msg = Message::new(content, MessageId::from_content(content));Β§Working with Metadata
use streamweave_message::{Message, MessageId, MessageMetadata};
// Create metadata with builder pattern
let metadata = MessageMetadata::with_timestamp_now()
.source("kafka-topic")
.partition(3)
.offset(1000)
.key("user-123")
.header("content-type", "application/json")
.header("correlation-id", "req-456");
let msg = Message::with_metadata(
"payload data",
MessageId::new_uuid(),
metadata,
);
// Access metadata
assert_eq!(msg.metadata().source, Some("kafka-topic".to_string()));
assert_eq!(msg.metadata().partition, Some(3));
assert_eq!(msg.metadata().get_header("content-type"), Some("application/json"));Β§Transforming Messages
use streamweave_message::Message;
let msg = Message::new(42, MessageId::new_sequence(1));
// Map payload while preserving ID and metadata
let doubled = msg.map(|x| x * 2);
assert_eq!(*doubled.payload(), 84);
assert_eq!(*doubled.id(), MessageId::new_sequence(1));
// Map with access to message ID
let with_id = msg.map_with_id(|id, payload| {
format!("{}:{}", id, payload)
});
// Replace payload
let new_msg = msg.with_payload("new payload");Β§Using ID Generators
use streamweave_message::{UuidGenerator, SequenceGenerator, IdGenerator};
// UUID generator
let uuid_gen = UuidGenerator::new();
for _ in 0..10 {
let id = uuid_gen.next_id();
// Each ID is unique
}
// Sequence generator
let seq_gen = SequenceGenerator::new();
let id1 = seq_gen.next_id(); // Sequence(0)
let id2 = seq_gen.next_id(); // Sequence(1)
// Sequence generator with starting value
let seq_gen = SequenceGenerator::starting_at(100);
let id = seq_gen.next_id(); // Sequence(100)
// Reset sequence
seq_gen.reset();
let id = seq_gen.next_id(); // Sequence(0)
// Get current value without incrementing
let current = seq_gen.current();Β§Message Flow in Pipelines
use streamweave_message::{Message, MessageId, MessageMetadata};
use streamweave::Transformer;
// Wrap items in messages
let messages: Vec<Message<i32>> = vec![1, 2, 3]
.into_iter()
.enumerate()
.map(|(i, x)| {
Message::with_metadata(
x,
MessageId::new_sequence(i as u64),
MessageMetadata::with_timestamp_now()
.source("input")
)
})
.collect();
// Process messages (ID and metadata preserved)
let processed: Vec<Message<i32>> = messages
.into_iter()
.map(|msg| msg.map(|x| x * 2))
.collect();
// Unwrap payloads when needed
let payloads: Vec<i32> = processed
.into_iter()
.map(|msg| msg.into_payload())
.collect();Β§Message Deduplication
use streamweave_message::{Message, MessageId};
use std::collections::HashSet;
// Track seen message IDs
let mut seen = HashSet::new();
let messages = vec![
Message::new(1, MessageId::new_sequence(1)),
Message::new(2, MessageId::new_sequence(2)),
Message::new(1, MessageId::new_sequence(1)), // Duplicate
];
for msg in messages {
if seen.insert(msg.id().clone()) {
// Process unique message
println!("Processing: {:?}", msg.payload());
} else {
// Skip duplicate
println!("Skipping duplicate: {:?}", msg.id());
}
}Β§Message Routing by Key
use streamweave_message::{Message, MessageId, MessageMetadata};
let messages = vec![
Message::with_metadata(
"data1",
MessageId::new_uuid(),
MessageMetadata::new().key("user-1"),
),
Message::with_metadata(
"data2",
MessageId::new_uuid(),
MessageMetadata::new().key("user-2"),
),
Message::with_metadata(
"data3",
MessageId::new_uuid(),
MessageMetadata::new().key("user-1"),
),
];
// Route messages by key
let mut user1_messages = vec![];
let mut user2_messages = vec![];
for msg in messages {
match msg.metadata().key.as_deref() {
Some("user-1") => user1_messages.push(msg),
Some("user-2") => user2_messages.push(msg),
_ => {}
}
}Β§ποΈ Architecture
Messages flow through pipelines and graphs with their envelope intact:
βββββββββββββββ
β Producer ββββproducesβββ> Message<T>
βββββββββββββββ
β
β Message flows through
βΌ
βββββββββββββββ
β Transformer ββββtransformsβββ> Message<U> (ID preserved)
βββββββββββββββ
β
β Message flows through
βΌ
βββββββββββββββ
β Consumer ββββconsumesβββ> (can extract payload or keep envelope)
βββββββββββββββMessage Envelope Structure:
Message<T>
βββ MessageId (unique identifier)
βββ Payload<T> (actual data)
βββ MessageMetadata
βββ timestamp
βββ source
βββ partition
βββ offset
βββ key
βββ headersΒ§π Dependencies
streamweave-message depends on:
serde- Serialization supportserde_json- JSON serializationchrono- Timestamp supportstreamweave(optional) - Integration with core traits
Β§π― Use Cases
Message envelopes are used for:
- Exactly-Once Processing: Unique IDs enable deduplication
- Offset Tracking: Track position in source streams
- Message Routing: Route by key or partition
- Idempotency: Content-hash IDs for content-based deduplication
- Message Correlation: Track messages through complex pipelines
- Audit Trails: Metadata provides full message history
Β§π Error Handling
Messages work seamlessly with the error handling system:
use streamweave_message::Message;
use streamweave_error::StreamError;
// Error context can include the message
let error_context = ErrorContext {
timestamp: chrono::Utc::now(),
item: Some(msg.clone()), // Include message in error context
component_name: "processor".to_string(),
component_type: "Transformer".to_string(),
};Β§β‘ Performance Considerations
- Zero-Copy: Message operations are designed for efficiency
- Clone Efficiency: Messages clone efficiently when needed
- Thread-Safe: ID generators are thread-safe
- Minimal Overhead: Envelope adds minimal overhead to payloads
Β§π Examples
For more examples, see:
Β§π Documentation
Β§π See Also
- streamweave - Core traits
- streamweave-offset - Offset management
- streamweave-transaction - Transaction support
Β§π€ Contributing
Contributions are welcome! Please see the Contributing Guide for details.
Β§π License
This project is licensed under the CC BY-SA 4.0 license.
Re-exportsΒ§
pub use message::*;
ModulesΒ§
- message
- Message envelope types for exactly-once processing.