streamweave-message 0.4.0

Message envelope types for StreamWeave
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# streamweave-message

[![Crates.io](https://img.shields.io/crates/v/streamweave-message.svg)](https://crates.io/crates/streamweave-message)
[![Documentation](https://docs.rs/streamweave-message/badge.svg)](https://docs.rs/streamweave-message)
[![License: CC BY-SA 4.0](https://img.shields.io/badge/License-CC%20BY--SA%204.0-lightgrey.svg)](https://creativecommons.org/licenses/by-sa/4.0/)

**Message envelope types for StreamWeave**  
*Unique identifiers and metadata for exactly-once processing and message tracking.*

The `streamweave-message` package provides message envelope types that wrap stream items with unique identifiers and metadata. This enables features like message deduplication, offset tracking, exactly-once processing guarantees, and message flow tracking in pipelines and graphs.

## ✨ Key Features

- **Message Envelope**: Wrap payloads with unique IDs and metadata
- **MessageId Types**: UUID, Sequence, Custom, and Content-Hash identifiers
- **MessageMetadata**: Rich metadata (timestamp, source, partition, offset, key, headers)
- **ID Generators**: UUID, Sequence, and Content-Hash generators
- **Message Operations**: Map, transform, and unwrap messages
- **Exactly-Once Processing**: Enable deduplication and idempotency

## πŸ“¦ Installation

Add this to your `Cargo.toml`:

```toml
[dependencies]
streamweave-message = "0.3.0"
```

## πŸš€ Quick Start

### Creating Messages

```rust
use streamweave_message::{Message, MessageId, MessageMetadata};

// Create a simple message with UUID
let msg = Message::new(42, MessageId::new_uuid());

// Create a message with sequence ID
let msg = Message::new("hello", MessageId::new_sequence(1));

// Create a message with metadata
let metadata = MessageMetadata::with_timestamp_now()
    .source("my-source")
    .partition(0)
    .offset(100);

let msg = Message::with_metadata(
    "payload",
    MessageId::new_uuid(),
    metadata,
);
```

### Using ID Generators

```rust
use streamweave_message::{UuidGenerator, SequenceGenerator, IdGenerator};

// UUID generator (globally unique)
let uuid_gen = UuidGenerator::new();
let id1 = uuid_gen.next_id();
let id2 = uuid_gen.next_id();

// Sequence generator (monotonically increasing)
let seq_gen = SequenceGenerator::new();
let seq1 = seq_gen.next_id();  // Sequence(0)
let seq2 = seq_gen.next_id();  // Sequence(1)

// Sequence generator starting at specific value
let seq_gen = SequenceGenerator::starting_at(1000);
let seq = seq_gen.next_id();  // Sequence(1000)
```

## πŸ“– API Overview

### Message Type

The `Message<T>` type wraps a payload with an ID and metadata:

```rust
pub struct Message<T> {
    id: MessageId,
    payload: T,
    metadata: MessageMetadata,
}
```

**Key Methods:**
- `new(payload, id)` - Create message with payload and ID
- `with_metadata(payload, id, metadata)` - Create message with full metadata
- `id()` - Get message ID
- `payload()` - Get payload reference
- `metadata()` - Get metadata reference
- `map(f)` - Transform payload while preserving ID and metadata
- `into_payload()` - Extract payload, discarding envelope
- `into_parts()` - Extract all components

### MessageId Enum

The `MessageId` enum supports multiple ID types:

```rust
pub enum MessageId {
    Uuid(u128),           // UUID-based (128-bit)
    Sequence(u64),        // Sequence-based (64-bit)
    Custom(String),       // Custom string identifier
    ContentHash(u64),     // Content-hash based
}
```

**ID Types:**
- **Uuid**: Globally unique, good for distributed systems
- **Sequence**: Monotonically increasing, good for ordered processing
- **Custom**: User-provided identifier (e.g., from source system)
- **ContentHash**: Derived from content, useful for idempotency

### MessageMetadata

The `MessageMetadata` struct provides rich metadata:

```rust
pub struct MessageMetadata {
    pub timestamp: Option<Duration>,      // When message was created
    pub source: Option<String>,            // Source (topic, file, etc.)
    pub partition: Option<u32>,            // Partition/shard information
    pub offset: Option<u64>,                // Offset within partition
    pub key: Option<String>,                // Routing/grouping key
    pub headers: Vec<(String, String)>,    // Additional headers
}
```

### ID Generators

Multiple ID generator implementations:

**UuidGenerator:**
- Generates UUIDv4-style identifiers
- Globally unique
- Thread-safe

**SequenceGenerator:**
- Generates monotonically increasing sequence numbers
- Thread-safe using atomic operations
- Supports starting at specific value
- Can be reset

**ContentHashGenerator:**
- Generates IDs based on message content
- Useful for content-based idempotency
- Same content = same ID

## πŸ“š Usage Examples

### Creating Messages with Different ID Types

```rust
use streamweave_message::{Message, MessageId};

// UUID-based message
let msg = Message::new(42, MessageId::new_uuid());

// Sequence-based message
let msg = Message::new("data", MessageId::new_sequence(1));

// Custom ID message
let msg = Message::new(100, MessageId::new_custom("event-123"));

// Content-hash based message
let content = b"my content";
let msg = Message::new(content, MessageId::from_content(content));
```

### Working with Metadata

```rust
use streamweave_message::{Message, MessageId, MessageMetadata};

// Create metadata with builder pattern
let metadata = MessageMetadata::with_timestamp_now()
    .source("kafka-topic")
    .partition(3)
    .offset(1000)
    .key("user-123")
    .header("content-type", "application/json")
    .header("correlation-id", "req-456");

let msg = Message::with_metadata(
    "payload data",
    MessageId::new_uuid(),
    metadata,
);

// Access metadata
assert_eq!(msg.metadata().source, Some("kafka-topic".to_string()));
assert_eq!(msg.metadata().partition, Some(3));
assert_eq!(msg.metadata().get_header("content-type"), Some("application/json"));
```

### Transforming Messages

```rust
use streamweave_message::Message;

let msg = Message::new(42, MessageId::new_sequence(1));

// Map payload while preserving ID and metadata
let doubled = msg.map(|x| x * 2);
assert_eq!(*doubled.payload(), 84);
assert_eq!(*doubled.id(), MessageId::new_sequence(1));

// Map with access to message ID
let with_id = msg.map_with_id(|id, payload| {
    format!("{}:{}", id, payload)
});

// Replace payload
let new_msg = msg.with_payload("new payload");
```

### Using ID Generators

```rust
use streamweave_message::{UuidGenerator, SequenceGenerator, IdGenerator};

// UUID generator
let uuid_gen = UuidGenerator::new();
for _ in 0..10 {
    let id = uuid_gen.next_id();
    // Each ID is unique
}

// Sequence generator
let seq_gen = SequenceGenerator::new();
let id1 = seq_gen.next_id();  // Sequence(0)
let id2 = seq_gen.next_id();  // Sequence(1)

// Sequence generator with starting value
let seq_gen = SequenceGenerator::starting_at(100);
let id = seq_gen.next_id();  // Sequence(100)

// Reset sequence
seq_gen.reset();
let id = seq_gen.next_id();  // Sequence(0)

// Get current value without incrementing
let current = seq_gen.current();
```

### Message Flow in Pipelines

```rust
use streamweave_message::{Message, MessageId, MessageMetadata};
use streamweave::Transformer;

// Wrap items in messages
let messages: Vec<Message<i32>> = vec![1, 2, 3]
    .into_iter()
    .enumerate()
    .map(|(i, x)| {
        Message::with_metadata(
            x,
            MessageId::new_sequence(i as u64),
            MessageMetadata::with_timestamp_now()
                .source("input")
        )
    })
    .collect();

// Process messages (ID and metadata preserved)
let processed: Vec<Message<i32>> = messages
    .into_iter()
    .map(|msg| msg.map(|x| x * 2))
    .collect();

// Unwrap payloads when needed
let payloads: Vec<i32> = processed
    .into_iter()
    .map(|msg| msg.into_payload())
    .collect();
```

### Message Deduplication

```rust
use streamweave_message::{Message, MessageId};
use std::collections::HashSet;

// Track seen message IDs
let mut seen = HashSet::new();

let messages = vec![
    Message::new(1, MessageId::new_sequence(1)),
    Message::new(2, MessageId::new_sequence(2)),
    Message::new(1, MessageId::new_sequence(1)), // Duplicate
];

for msg in messages {
    if seen.insert(msg.id().clone()) {
        // Process unique message
        println!("Processing: {:?}", msg.payload());
    } else {
        // Skip duplicate
        println!("Skipping duplicate: {:?}", msg.id());
    }
}
```

### Message Routing by Key

```rust
use streamweave_message::{Message, MessageId, MessageMetadata};

let messages = vec![
    Message::with_metadata(
        "data1",
        MessageId::new_uuid(),
        MessageMetadata::new().key("user-1"),
    ),
    Message::with_metadata(
        "data2",
        MessageId::new_uuid(),
        MessageMetadata::new().key("user-2"),
    ),
    Message::with_metadata(
        "data3",
        MessageId::new_uuid(),
        MessageMetadata::new().key("user-1"),
    ),
];

// Route messages by key
let mut user1_messages = vec![];
let mut user2_messages = vec![];

for msg in messages {
    match msg.metadata().key.as_deref() {
        Some("user-1") => user1_messages.push(msg),
        Some("user-2") => user2_messages.push(msg),
        _ => {}
    }
}
```

## πŸ—οΈ Architecture

Messages flow through pipelines and graphs with their envelope intact:

```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Producer   │───produces───> Message<T>
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β”‚ Message flows through
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Transformer │───transforms───> Message<U> (ID preserved)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       β”‚ Message flows through
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Consumer   │───consumes───> (can extract payload or keep envelope)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```

**Message Envelope Structure:**
```
Message<T>
β”œβ”€β”€ MessageId (unique identifier)
β”œβ”€β”€ Payload<T> (actual data)
└── MessageMetadata
    β”œβ”€β”€ timestamp
    β”œβ”€β”€ source
    β”œβ”€β”€ partition
    β”œβ”€β”€ offset
    β”œβ”€β”€ key
    └── headers
```

## πŸ”— Dependencies

`streamweave-message` depends on:

- `serde` - Serialization support
- `serde_json` - JSON serialization
- `chrono` - Timestamp support
- `streamweave` (optional) - Integration with core traits

## 🎯 Use Cases

Message envelopes are used for:

1. **Exactly-Once Processing**: Unique IDs enable deduplication
2. **Offset Tracking**: Track position in source streams
3. **Message Routing**: Route by key or partition
4. **Idempotency**: Content-hash IDs for content-based deduplication
5. **Message Correlation**: Track messages through complex pipelines
6. **Audit Trails**: Metadata provides full message history

## πŸ” Error Handling

Messages work seamlessly with the error handling system:

```rust
use streamweave_message::Message;
use streamweave_error::StreamError;

// Error context can include the message
let error_context = ErrorContext {
    timestamp: chrono::Utc::now(),
    item: Some(msg.clone()),  // Include message in error context
    component_name: "processor".to_string(),
    component_type: "Transformer".to_string(),
};
```

## ⚑ Performance Considerations

- **Zero-Copy**: Message operations are designed for efficiency
- **Clone Efficiency**: Messages clone efficiently when needed
- **Thread-Safe**: ID generators are thread-safe
- **Minimal Overhead**: Envelope adds minimal overhead to payloads

## πŸ“ Examples

For more examples, see:
- [Exactly-Once Processing Example]https://github.com/Industrial/streamweave/tree/main/examples/exactly_once
- [Message Deduplication]https://github.com/Industrial/streamweave/tree/main/examples

## πŸ“– Documentation

- [Full API Documentation]https://docs.rs/streamweave-message
- [Repository]https://github.com/Industrial/streamweave/tree/main/packages/message
- [StreamWeave Main Documentation]https://docs.rs/streamweave

## πŸ”— See Also

- [streamweave]../streamweave/README.md - Core traits
- [streamweave-offset]../offset/README.md - Offset management
- [streamweave-transaction]../transaction/README.md - Transaction support

## 🀝 Contributing

Contributions are welcome! Please see the [Contributing Guide](https://github.com/Industrial/streamweave/blob/main/CONTRIBUTING.md) for details.

## πŸ“„ License

This project is licensed under the [CC BY-SA 4.0](https://creativecommons.org/licenses/by-sa/4.0/) license.