danube-client 0.12.0

The async client for Danube Messaging Broker platform
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
# Danube Client Examples

This directory contains examples demonstrating various features of the Danube messaging platform and client library.

## Table of Contents

1. [Basic Examples]#basic-examples
2. [Key-Shared Examples]#key-shared-examples
3. [Schema Registry Examples]#schema-registry-examples
4. [Advanced Features]#advanced-features
5. [Running Examples]#running-examples

---

## Basic Examples

### 1. **simple_producer_consumer.rs**
**Purpose**: Demonstrates the simplest way to send and receive raw byte messages without schema validation.

**Key Features**:
- Basic producer/consumer setup
- Raw byte message passing
- Message acknowledgment
- Single producer-consumer pair

**Use Case**: Quick prototyping, simple message passing, when schema validation is not needed.

```bash
cargo run --example simple_producer_consumer
```

---

### 2. **partitions_producer.rs** & **partitions_consumer.rs**
**Purpose**: Shows how to use topic partitioning for horizontal scaling and parallel processing.

**Key Features**:
- Creating partitioned topics
- Automatic message distribution across partitions
- Parallel message consumption

**Use Case**: High-throughput scenarios where messages can be processed independently in parallel.

```bash
# Terminal 1 - Start consumer
cargo run --example partitions_consumer

# Terminal 2 - Start producer
cargo run --example partitions_producer
```

---

### 3. **reliable_dispatch_producer.rs** & **reliable_dispatch_consumer.rs**
**Purpose**: Demonstrates reliable message delivery with acknowledgments and retry mechanisms.

**Key Features**:
- Reliable dispatch mode for guaranteed delivery
- Automatic retry on failures
- Message acknowledgment tracking

**Use Case**: Critical messages that must be delivered (e.g., payment notifications, order processing).

```bash
# Terminal 1 - Start consumer
cargo run --example reliable_dispatch_consumer

# Terminal 2 - Start producer
cargo run --example reliable_dispatch_producer
```

---

## Key-Shared Examples

### 4. **key_shared_producer.rs** & **key_shared_consumer.rs**
**Purpose**: Demonstrates Key-Shared subscriptions where messages are routed to consumers based on their routing key. All messages with the same key are guaranteed to reach the same consumer.

**Key Features**:
- Routing key-based message distribution via `send_with_key()`
- Consistent hashing assigns keys to consumers automatically
- Per-key ordering with multi-consumer parallelism
- Consumer elasticity — keys redistribute when consumers join/leave

**Use Case**: Order processing (group by order ID), per-user event streams, multi-tenant workloads.

**What to expect**: Start two consumers, then the producer. Each consumer will receive a distinct subset of routing keys. For example, consumer_1 might get all "payment" and "invoice" events while consumer_2 gets "shipping" and "return" events. The key assignment is deterministic — re-running produces the same distribution.

```bash
# Terminal 1 - Start first consumer
cargo run --example key_shared_consumer -- consumer_1

# Terminal 2 - Start second consumer
cargo run --example key_shared_consumer -- consumer_2

# Terminal 3 - Start producer (sends 10 events with 4 different keys)
cargo run --example key_shared_producer
```

**Example output** (consumer_1):
```
✅ Consumer 'consumer_1' subscribed to '/default/orders_topic' (Key-Shared)
📥 [consumer_1] key=payment    | offset=0 | 'Payment received for order #1001'
📥 [consumer_1] key=payment    | offset=2 | 'Payment received for order #1002'
📥 [consumer_1] key=invoice    | offset=6 | 'Invoice generated for order #1001'
📥 [consumer_1] key=payment    | offset=5 | 'Payment received for order #1003'
📥 [consumer_1] key=invoice    | offset=9 | 'Invoice generated for order #1002'
```

---

### 5. **key_shared_filtered_consumer.rs**
**Purpose**: Demonstrates key filtering — consumers declare which routing key patterns they want to handle, giving explicit control over key-to-consumer assignment instead of relying on automatic consistent hashing.

**Key Features**:
- Glob-based key filter patterns (`"payment"`, `"ship*"`, `"eu-west-?"`)
- Each consumer only receives messages matching its declared filters
- Multiple filters per consumer
- Combines with reliable dispatch for at-least-once delivery

**Use Case**: Microservice specialization — a "payments" service handles only payment/invoice events while a "logistics" service handles shipping/returns. No key overlap, no wasted processing.

**What to expect**: The "payments" consumer receives only messages with keys matching `"payment"` or `"invoice"`. The "logistics" consumer receives only messages with keys matching `"ship*"` (e.g., shipping, shipment) or `"return"`. Keys not matching any filter are handled by hash fallback.

```bash
# Terminal 1 - Payments consumer (filters: "payment", "invoice")
cargo run --example key_shared_filtered_consumer -- payments

# Terminal 2 - Logistics consumer (filters: "ship*", "return")
cargo run --example key_shared_filtered_consumer -- logistics

# Terminal 3 - Start producer
cargo run --example key_shared_producer
```

**Example output** (payments consumer):
```
✅ Consumer 'consumer_payments' subscribed (Key-Shared, filters: ["payment", "invoice"])
📥 [consumer_payments] key=payment    | 'Payment received for order #1001'
📥 [consumer_payments] key=payment    | 'Payment received for order #1002'
📥 [consumer_payments] key=invoice    | 'Invoice generated for order #1001'
📥 [consumer_payments] key=payment    | 'Payment received for order #1003'
📥 [consumer_payments] key=invoice    | 'Invoice generated for order #1002'
```

**Example output** (logistics consumer):
```
✅ Consumer 'consumer_logistics' subscribed (Key-Shared, filters: ["ship*", "return"])
📥 [consumer_logistics] key=shipping  | 'Order #1001 shipped via express'
📥 [consumer_logistics] key=return    | 'Return request for order #998'
📥 [consumer_logistics] key=shipping  | 'Order #1002 shipped via standard'
📥 [consumer_logistics] key=return    | 'Return approved for order #998'
📥 [consumer_logistics] key=shipping  | 'Order #1003 shipped via express'
```

---

## Schema Registry Examples

### 6. **json_producer.rs** & **json_consumer.rs**
**Purpose**: Shows how to use JSON Schema for message validation with typed data structures.

**Key Features**:
- JSON Schema registration in Schema Registry
- Automatic serialization/deserialization
- Type-safe message passing
- Schema validation on both producer and consumer sides

**Use Case**: Applications using JSON for structured data with schema evolution needs.

```bash
# Terminal 1 - Start consumer
cargo run --example json_consumer

# Terminal 2 - Start producer
cargo run --example json_producer
```

**Schema Type**: `json_schema`

---

### 7. **json_consumer_validated.rs**
**Purpose**: Demonstrates consumer-side schema validation against the Schema Registry at startup.

**Key Features**:
- Fetches schema from registry before consuming
- Validates Rust struct against JSON Schema definition
- Fails at startup if struct doesn't match schema
- Prevents runtime deserialization errors
- Schema version tracking and logging

**Use Case**: Production consumers that need to ensure their struct definitions match the producer's schema, preventing silent data loss or deserialization failures.

```bash
# Run the producer first to register schema
cargo run --example json_producer

# Then run the validated consumer
cargo run --example json_consumer_validated
```

**What it validates**:
- ✅ Field names match schema properties
- ✅ Field types are compatible (string, integer, etc.)
- ✅ Required fields are present
- ✅ No extra unexpected fields

**Dependencies**: Requires `jsonschema` crate for validation:
```toml
[dependencies]
jsonschema = "0.18"
```

**When to use**:
- **Development**: Catch schema mismatches early
- **CI/CD**: Validate before deployment
- **Production**: Ensure consumer compatibility

**Output Example**:
```
🔍 Fetching schema from registry for subject: my-app-events
📋 Retrieved schema version: 1
✅ Struct validated successfully against schema v1
✅ Consumer validated against schema version: 1
   Safe to proceed with typed deserialization
```

---

### 8. **avro_producer.rs** & **avro_consumer.rs**
**Purpose**: Demonstrates Apache Avro schema usage for efficient binary serialization.

**Key Features**:
- Avro schema registration
- Compact binary encoding
- Schema evolution support
- Strongly-typed data structures

**Use Case**: High-performance applications requiring efficient serialization and schema evolution.

```bash
# Terminal 1 - Start consumer
cargo run --example avro_consumer

# Terminal 2 - Start producer
cargo run --example avro_producer
```

**Schema Type**: `avro`

**Example Schema**:
```json
{
    "type": "record",
    "name": "UserEvent",
    "namespace": "com.example.events",
    "fields": [
        {"name": "user_id", "type": "string"},
        {"name": "action", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "metadata", "type": ["null", "string"], "default": null}
    ]
}
```

---

### 9. **schema_evolution.rs**
**Purpose**: Comprehensive demonstration of schema evolution and compatibility checking.

**Key Features**:
- Schema version management
- Compatibility checking (backward/forward/full)
- Safe schema evolution
- Listing all schema versions
- Retrieving latest schema

**Use Case**: Understanding how to evolve data schemas safely over time without breaking existing consumers.

```bash
cargo run --example schema_evolution
```

**Demonstrates**:
- **Compatible change**: Adding optional fields with defaults
-**Incompatible change**: Removing required fields
- Schema version history tracking
- Compatibility mode enforcement

---

## Advanced Features

### Topic Partitioning
Partitions allow horizontal scaling by distributing messages across multiple partitions:

```rust
let mut producer = client
    .new_producer()
    .with_topic("/default/my_topic")
    .with_name("my_producer")
    .with_partitions(3)  // Create 3 partitions
    .build();
```

### Reliable Dispatch
Ensures message delivery with automatic retries:

```rust
let mut producer = client
    .new_producer()
    .with_topic("/default/my_topic")
    .with_name("my_producer")
    .with_reliable_dispatch()  // Enable reliable delivery
    .build();
```

### Key-Shared Subscriptions
Route messages by key so each key is handled by exactly one consumer:

```rust
use danube_client::SubType;

// Producer: tag each message with a routing key
producer.send_with_key(data, None, "order-123").await?;

// Consumer: automatic key distribution via consistent hashing
let mut consumer = client
    .new_consumer()
    .with_topic("/default/my_topic")
    .with_consumer_name("worker_1")
    .with_subscription("my_sub")
    .with_subscription_type(SubType::KeyShared)
    .build()?;

// Consumer with key filters: only receive specific key patterns
let mut filtered = client
    .new_consumer()
    .with_topic("/default/my_topic")
    .with_consumer_name("payments_worker")
    .with_subscription("my_sub")
    .with_subscription_type(SubType::KeyShared)
    .with_key_filter("payment")      // exact match
    .with_key_filter("invoice-*")    // glob pattern
    .build()?;
```

### Schema Validation
Register schemas and enable validation:

```rust
use danube_client::{SchemaType, CompatibilityMode};

// 1. Get schema client from DanubeClient
let schema_client = client.schema();

// 2. Register schema
let schema_id = schema_client
    .register_schema("my-subject")
    .with_type(SchemaType::Avro)  // Type-safe enum: Avro, JsonSchema, Protobuf, etc.
    .with_schema_data(schema_bytes)
    .execute()
    .await?;

// 3. Check compatibility before evolution
let compat_result = schema_client
    .check_compatibility(
        "my-subject",
        new_schema_bytes,
        SchemaType::Avro,
        None,  // Use subject's default mode (or Some(CompatibilityMode::Full))
    )
    .await?;

// 4. Set compatibility mode for a subject
schema_client
    .set_compatibility_mode("critical-subject", CompatibilityMode::Full)
    .await?;

// 5. Create producer with schema subject
let mut producer = client
    .new_producer()
    .with_topic("/default/my_topic")
    .with_name("my_producer")
    .with_schema_subject("my-subject")  // Link to schema
    .build();

// 6. Consumer validates struct at startup (see json_consumer_validated.rs)
let schema_version = validate_struct_against_registry(
    &schema_client,
    "my-subject",
    &MyStruct::default(),
).await?;
```

---

## Supported Schema Types

| Type | Description | Use Case |
|------|-------------|----------|
| `SchemaType::Bytes` | Raw binary data (no validation) | Simple messaging, custom formats |
| `SchemaType::String` | UTF-8 text data | Plain text messages |
| `SchemaType::Number` | Numeric data (int, float, double) | Simple numeric values |
| `SchemaType::JsonSchema` | JSON with schema validation | Structured JSON data |
| `SchemaType::Avro` | Apache Avro binary format | High-performance, schema evolution |
| `SchemaType::Protobuf` | Protocol Buffers | Cross-language compatibility |

All schema types are available as type-safe enums for IDE auto-completion and compile-time validation.

---

## Compatibility Modes

Schema evolution is controlled by compatibility modes. Set these per-subject to define evolution rules:

| Mode | Description | Allows | Use Case |
|------|-------------|--------|----------|
| `CompatibilityMode::Backward` | New schema reads old data | Add optional fields, remove fields | **Default**. Consumers upgrade before producers |
| `CompatibilityMode::Forward` | Old schema reads new data | Add required fields, remove optional fields | Producers upgrade before consumers |
| `CompatibilityMode::Full` | Both directions | Only safe changes (add optional) | **Strictest**. Critical schemas |
| `CompatibilityMode::None` | No validation | Any change | Development/testing only |

**Example**:
```rust
// Set strict compatibility for critical schemas
schema_client
    .set_compatibility_mode("order-events", CompatibilityMode::Full)
    .await?;

// Development schemas can be flexible
schema_client
    .set_compatibility_mode("test-events", CompatibilityMode::None)
    .await?;
```

---

## Schema Registry API

### Register Schema
```rust
use danube_client::SchemaType;

let schema_id = schema_client
    .register_schema("my-subject")
    .with_type(SchemaType::Avro)  // Type-safe enum
    .with_schema_data(schema_bytes)
    .execute()
    .await?;
```

### Check Compatibility
```rust
use danube_client::{SchemaType, CompatibilityMode};

let result = schema_client
    .check_compatibility(
        "my-subject",
        new_schema_bytes,
        SchemaType::Avro,
        None,  // Optional: Some(CompatibilityMode::Full)
    )
    .await?;

if result.is_compatible {
    println!("✅ Safe to register!");
} else {
    eprintln!("❌ Incompatible: {:?}", result.errors);
}
```

### Set Compatibility Mode
```rust
use danube_client::CompatibilityMode;

schema_client
    .set_compatibility_mode("my-subject", CompatibilityMode::Full)
    .await?;
```

### List Versions
```rust
let versions = schema_client
    .list_versions("my-subject")
    .await?;

println!("Versions: {:?}", versions);  // e.g., [1, 2, 3]
```

### Get Latest Schema
```rust
let schema = schema_client
    .get_latest_schema("my-subject")
    .await?;

println!("Version: {}", schema.version);
println!("Type: {}", schema.schema_type);
```

---

## Running Examples

### Prerequisites

1. **Start the Danube broker**:
   ```bash
   # From the danube root directory
   cargo run --bin danube-broker
   ```

2. **For schema validation examples**, add `jsonschema` to your `Cargo.toml`:
   ```toml
   [dependencies]
   jsonschema = "0.18"
   ```

### Running Individual Examples

```bash
# Basic examples
cargo run --example simple_producer_consumer
cargo run --example partitions_producer
cargo run --example partitions_consumer

# Key-Shared examples
cargo run --example key_shared_producer
cargo run --example key_shared_consumer -- consumer_1
cargo run --example key_shared_filtered_consumer -- payments

# Schema registry examples
cargo run --example json_producer
cargo run --example json_consumer
cargo run --example json_consumer_validated  # Requires jsonschema crate
cargo run --example avro_producer
cargo run --example avro_consumer
cargo run --example schema_evolution
```

### Recommended Learning Path

1. **Start simple**: `simple_producer_consumer.rs` - understand basic messaging
2. **Add schemas**: `json_producer.rs` + `json_consumer.rs` - learn schema registration
3. **Validate schemas**: `json_consumer_validated.rs` - production-ready validation
4. **Schema evolution**: `schema_evolution.rs` - understand compatibility rules
5. **High performance**: `avro_producer.rs` + `avro_consumer.rs` - binary serialization
6. **Scale up**: `partitions_producer.rs` + `partitions_consumer.rs` - horizontal scaling
7. **Reliability**: `reliable_dispatch_*` examples - guaranteed delivery
8. **Key routing**: `key_shared_producer.rs` + `key_shared_consumer.rs` - key-based distribution
9. **Key filtering**: `key_shared_filtered_consumer.rs` - explicit key-to-consumer assignment

---

## Best Practices

### Schema Registry Usage

1. **Always register schemas** before producing messages
2. **Use type-safe enums** (`SchemaType`, `CompatibilityMode`) for API calls
3. **Validate consumer structs** at startup (see `json_consumer_validated.rs`)
4. **Set appropriate compatibility modes** per subject:
   - `Full` for critical schemas
   - `Backward` for most use cases (default)
   - `None` only for development

### Consumer Patterns

**Option 1: Manual Struct** (simple, fast)
- Define struct manually
- Serialize/deserialize with `serde_json`
- Good for: Prototypes, tightly-coupled services

**Option 2: Validated Struct** (recommended)
- Fetch schema from registry at startup
- Validate struct against schema
- Use typed struct for convenience
- Good for: Production services, schema evolution

**Option 3: Dynamic Validation** (flexible)
- Fetch schema and validate each message
- Use `serde_json::Value` for dynamic data
- Good for: Generic consumers, schema exploration

### Producer Patterns

1. **Register schema once** at startup
2. **Check compatibility** before schema updates
3. **Reuse schema_id** across restarts (idempotent registration)
4. **Set schema subject** on producer for automatic validation