Crate streamweave_message

Crate streamweave_message 

Source
Expand description

Β§streamweave-message

Crates.io Documentation License: CC BY-SA 4.0

Message envelope types for StreamWeave
Unique identifiers and metadata for exactly-once processing and message tracking.

The streamweave-message package provides message envelope types that wrap stream items with unique identifiers and metadata. This enables features like message deduplication, offset tracking, exactly-once processing guarantees, and message flow tracking in pipelines and graphs.

§✨ Key Features

  • Message Envelope: Wrap payloads with unique IDs and metadata
  • MessageId Types: UUID, Sequence, Custom, and Content-Hash identifiers
  • MessageMetadata: Rich metadata (timestamp, source, partition, offset, key, headers)
  • ID Generators: UUID, Sequence, and Content-Hash generators
  • Message Operations: Map, transform, and unwrap messages
  • Exactly-Once Processing: Enable deduplication and idempotency

Β§πŸ“¦ Installation

Add this to your Cargo.toml:

[dependencies]
streamweave-message = "0.3.0"

Β§πŸš€ Quick Start

Β§Creating Messages

use streamweave_message::{Message, MessageId, MessageMetadata};

// Create a simple message with UUID
let msg = Message::new(42, MessageId::new_uuid());

// Create a message with sequence ID
let msg = Message::new("hello", MessageId::new_sequence(1));

// Create a message with metadata
let metadata = MessageMetadata::with_timestamp_now()
    .source("my-source")
    .partition(0)
    .offset(100);

let msg = Message::with_metadata(
    "payload",
    MessageId::new_uuid(),
    metadata,
);

Β§Using ID Generators

use streamweave_message::{UuidGenerator, SequenceGenerator, IdGenerator};

// UUID generator (globally unique)
let uuid_gen = UuidGenerator::new();
let id1 = uuid_gen.next_id();
let id2 = uuid_gen.next_id();

// Sequence generator (monotonically increasing)
let seq_gen = SequenceGenerator::new();
let seq1 = seq_gen.next_id();  // Sequence(0)
let seq2 = seq_gen.next_id();  // Sequence(1)

// Sequence generator starting at specific value
let seq_gen = SequenceGenerator::starting_at(1000);
let seq = seq_gen.next_id();  // Sequence(1000)

Β§πŸ“– API Overview

Β§Message Type

The Message<T> type wraps a payload with an ID and metadata:

pub struct Message<T> {
    id: MessageId,
    payload: T,
    metadata: MessageMetadata,
}

Key Methods:

  • new(payload, id) - Create message with payload and ID
  • with_metadata(payload, id, metadata) - Create message with full metadata
  • id() - Get message ID
  • payload() - Get payload reference
  • metadata() - Get metadata reference
  • map(f) - Transform payload while preserving ID and metadata
  • into_payload() - Extract payload, discarding envelope
  • into_parts() - Extract all components

Β§MessageId Enum

The MessageId enum supports multiple ID types:

pub enum MessageId {
    Uuid(u128),           // UUID-based (128-bit)
    Sequence(u64),        // Sequence-based (64-bit)
    Custom(String),       // Custom string identifier
    ContentHash(u64),     // Content-hash based
}

ID Types:

  • Uuid: Globally unique, good for distributed systems
  • Sequence: Monotonically increasing, good for ordered processing
  • Custom: User-provided identifier (e.g., from source system)
  • ContentHash: Derived from content, useful for idempotency

Β§MessageMetadata

The MessageMetadata struct provides rich metadata:

pub struct MessageMetadata {
    pub timestamp: Option<Duration>,      // When message was created
    pub source: Option<String>,            // Source (topic, file, etc.)
    pub partition: Option<u32>,            // Partition/shard information
    pub offset: Option<u64>,                // Offset within partition
    pub key: Option<String>,                // Routing/grouping key
    pub headers: Vec<(String, String)>,    // Additional headers
}

Β§ID Generators

Multiple ID generator implementations:

UuidGenerator:

  • Generates UUIDv4-style identifiers
  • Globally unique
  • Thread-safe

SequenceGenerator:

  • Generates monotonically increasing sequence numbers
  • Thread-safe using atomic operations
  • Supports starting at specific value
  • Can be reset

ContentHashGenerator:

  • Generates IDs based on message content
  • Useful for content-based idempotency
  • Same content = same ID

Β§πŸ“š Usage Examples

Β§Creating Messages with Different ID Types

use streamweave_message::{Message, MessageId};

// UUID-based message
let msg = Message::new(42, MessageId::new_uuid());

// Sequence-based message
let msg = Message::new("data", MessageId::new_sequence(1));

// Custom ID message
let msg = Message::new(100, MessageId::new_custom("event-123"));

// Content-hash based message
let content = b"my content";
let msg = Message::new(content, MessageId::from_content(content));

Β§Working with Metadata

use streamweave_message::{Message, MessageId, MessageMetadata};

// Create metadata with builder pattern
let metadata = MessageMetadata::with_timestamp_now()
    .source("kafka-topic")
    .partition(3)
    .offset(1000)
    .key("user-123")
    .header("content-type", "application/json")
    .header("correlation-id", "req-456");

let msg = Message::with_metadata(
    "payload data",
    MessageId::new_uuid(),
    metadata,
);

// Access metadata
assert_eq!(msg.metadata().source, Some("kafka-topic".to_string()));
assert_eq!(msg.metadata().partition, Some(3));
assert_eq!(msg.metadata().get_header("content-type"), Some("application/json"));

Β§Transforming Messages

use streamweave_message::Message;

let msg = Message::new(42, MessageId::new_sequence(1));

// Map payload while preserving ID and metadata
let doubled = msg.map(|x| x * 2);
assert_eq!(*doubled.payload(), 84);
assert_eq!(*doubled.id(), MessageId::new_sequence(1));

// Map with access to message ID
let with_id = msg.map_with_id(|id, payload| {
    format!("{}:{}", id, payload)
});

// Replace payload
let new_msg = msg.with_payload("new payload");

Β§Using ID Generators

use streamweave_message::{UuidGenerator, SequenceGenerator, IdGenerator};

// UUID generator
let uuid_gen = UuidGenerator::new();
for _ in 0..10 {
    let id = uuid_gen.next_id();
    // Each ID is unique
}

// Sequence generator
let seq_gen = SequenceGenerator::new();
let id1 = seq_gen.next_id();  // Sequence(0)
let id2 = seq_gen.next_id();  // Sequence(1)

// Sequence generator with starting value
let seq_gen = SequenceGenerator::starting_at(100);
let id = seq_gen.next_id();  // Sequence(100)

// Reset sequence
seq_gen.reset();
let id = seq_gen.next_id();  // Sequence(0)

// Get current value without incrementing
let current = seq_gen.current();

Β§Message Flow in Pipelines

use streamweave_message::{Message, MessageId, MessageMetadata};
use streamweave::Transformer;

// Wrap items in messages
let messages: Vec<Message<i32>> = vec![1, 2, 3]
    .into_iter()
    .enumerate()
    .map(|(i, x)| {
        Message::with_metadata(
            x,
            MessageId::new_sequence(i as u64),
            MessageMetadata::with_timestamp_now()
                .source("input")
        )
    })
    .collect();

// Process messages (ID and metadata preserved)
let processed: Vec<Message<i32>> = messages
    .into_iter()
    .map(|msg| msg.map(|x| x * 2))
    .collect();

// Unwrap payloads when needed
let payloads: Vec<i32> = processed
    .into_iter()
    .map(|msg| msg.into_payload())
    .collect();

Β§Message Deduplication

use streamweave_message::{Message, MessageId};
use std::collections::HashSet;

// Track seen message IDs
let mut seen = HashSet::new();

let messages = vec![
    Message::new(1, MessageId::new_sequence(1)),
    Message::new(2, MessageId::new_sequence(2)),
    Message::new(1, MessageId::new_sequence(1)), // Duplicate
];

for msg in messages {
    if seen.insert(msg.id().clone()) {
        // Process unique message
        println!("Processing: {:?}", msg.payload());
    } else {
        // Skip duplicate
        println!("Skipping duplicate: {:?}", msg.id());
    }
}

Β§Message Routing by Key

use streamweave_message::{Message, MessageId, MessageMetadata};

let messages = vec![
    Message::with_metadata(
        "data1",
        MessageId::new_uuid(),
        MessageMetadata::new().key("user-1"),
    ),
    Message::with_metadata(
        "data2",
        MessageId::new_uuid(),
        MessageMetadata::new().key("user-2"),
    ),
    Message::with_metadata(
        "data3",
        MessageId::new_uuid(),
        MessageMetadata::new().key("user-1"),
    ),
];

// Route messages by key
let mut user1_messages = vec![];
let mut user2_messages = vec![];

for msg in messages {
    match msg.metadata().key.as_deref() {
        Some("user-1") => user1_messages.push(msg),
        Some("user-2") => user2_messages.push(msg),
        _ => {}
    }
}

Β§πŸ—οΈ Architecture

Messages flow through pipelines and graphs with their envelope intact:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Producer   │───produces───> Message<T>
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β”‚ Message flows through
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Transformer │───transforms───> Message<U> (ID preserved)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β”‚ Message flows through
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Consumer   │───consumes───> (can extract payload or keep envelope)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Message Envelope Structure:

Message<T>
β”œβ”€β”€ MessageId (unique identifier)
β”œβ”€β”€ Payload<T> (actual data)
└── MessageMetadata
    β”œβ”€β”€ timestamp
    β”œβ”€β”€ source
    β”œβ”€β”€ partition
    β”œβ”€β”€ offset
    β”œβ”€β”€ key
    └── headers

Β§πŸ”— Dependencies

streamweave-message depends on:

  • serde - Serialization support
  • serde_json - JSON serialization
  • chrono - Timestamp support
  • streamweave (optional) - Integration with core traits

§🎯 Use Cases

Message envelopes are used for:

  1. Exactly-Once Processing: Unique IDs enable deduplication
  2. Offset Tracking: Track position in source streams
  3. Message Routing: Route by key or partition
  4. Idempotency: Content-hash IDs for content-based deduplication
  5. Message Correlation: Track messages through complex pipelines
  6. Audit Trails: Metadata provides full message history

Β§πŸ” Error Handling

Messages work seamlessly with the error handling system:

use streamweave_message::Message;
use streamweave_error::StreamError;

// Error context can include the message
let error_context = ErrorContext {
    timestamp: chrono::Utc::now(),
    item: Some(msg.clone()),  // Include message in error context
    component_name: "processor".to_string(),
    component_type: "Transformer".to_string(),
};

§⚑ Performance Considerations

  • Zero-Copy: Message operations are designed for efficiency
  • Clone Efficiency: Messages clone efficiently when needed
  • Thread-Safe: ID generators are thread-safe
  • Minimal Overhead: Envelope adds minimal overhead to payloads

Β§πŸ“ Examples

For more examples, see:

Β§πŸ“– Documentation

Β§πŸ”— See Also

§🀝 Contributing

Contributions are welcome! Please see the Contributing Guide for details.

Β§πŸ“„ License

This project is licensed under the CC BY-SA 4.0 license.

Re-exportsΒ§

pub use message::*;

ModulesΒ§

message
Message envelope types for exactly-once processing.