rivven-schema 0.0.22

Schema Registry for Rivven - Confluent-compatible schema management
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
# rivven-schema

> High-performance Schema Registry for the Rivven event streaming platform.

## Overview

`rivven-schema` provides schema management with Avro, JSON Schema, and Protobuf support. It offers an industry-standard REST API for drop-in compatibility.

## Features

| Category | Features |
|:---------|:---------|
| **Formats** | Avro, JSON Schema, Protobuf |
| **Evolution** | Forward, backward, full, and transitive compatibility |
| **Storage** | In-memory, broker-backed, AWS Glue |
| **API** | Industry-standard REST API |
| **Auth** | Basic, Bearer, JWT/OIDC, API Keys |
| **Cache** | Bounded in-memory cache (configurable max 10K entries, auto-eviction) |
| **K8s** | Health checks (`/health`, `/health/live`, `/health/ready`) |

> **Note**: The Schema Registry stores, versions, and validates schemas. It does **not**
> encode/decode message data — that's the job of producers and consumers.
> Use `rivven-connect` for Avro/Protobuf/JSON codecs.

## Deployment Modes

Rivven supports **3 schema modes** for maximum flexibility:

| Mode | Description | Use Case |
|------|-------------|----------|
| **Broker-backed** | Store schemas in rivven broker topics | Production (self-hosted) |
| **External** | Connect to an external compatible registry | Production, multi-cluster |
| **External (AWS Glue)** | Connect to AWS Glue Schema Registry | AWS-native deployments |
| **In-memory** | Fast, volatile storage | Development, testing |

> **Note**: The broker (rivvend) is schema-agnostic. It only handles raw bytes. All schema operations are handled by rivven-schema or external registries.

## Quick Start

### As a Library

```rust
use rivven_schema::{SchemaRegistry, RegistryConfig, SchemaType};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create an in-memory registry
    let config = RegistryConfig::memory();
    let registry = SchemaRegistry::new(config).await?;

    // Register a schema
    let avro_schema = r#"{
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "id", "type": "long"},
            {"name": "name", "type": "string"}
        ]
    }"#;
    
    let schema_id = registry.register("user-value", SchemaType::Avro, avro_schema).await?;
    println!("Registered schema with ID: {}", schema_id.0);

    // Retrieve the schema
    let schema = registry.get_by_id(schema_id).await?;
    println!("Schema: {}", schema.schema);

    // Check compatibility (for evolving schemas)
    let new_schema = r#"{
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "id", "type": "long"},
            {"name": "name", "type": "string"},
            {"name": "email", "type": ["null", "string"], "default": null}
        ]
    }"#;
    let result = registry.check_compatibility("user-value", SchemaType::Avro, new_schema, None).await?;
    println!("Compatible: {}", result.is_compatible);

    Ok(())
}
```

### Broker-Backed Storage (Production)

Enable durable storage by using rivven broker topics (using `_schemas` topic):

```bash
cargo build -p rivven-schema --features broker
```

```rust,ignore
use rivven_schema::{RegistryConfig, BrokerStorageConfig, SchemaRegistry};

// Configure broker-backed storage
let broker_config = BrokerStorageConfig::new("localhost:9092")
    .with_topic("_schemas")            // Custom topic name (default: "_schemas")
    .with_replication_factor(3);       // Replication for durability

let config = RegistryConfig::broker(broker_config);
let registry = SchemaRegistry::new(config).await?;
```

Benefits:
- **Durability**: Schemas survive registry restarts
- **Replication**: Schemas replicated across broker nodes  
- **No external dependencies**: Uses rivven broker itself
- **Compaction**: Only latest schema versions retained

### As a Standalone Server

```bash
# Start with in-memory storage
rivven-schema serve --port 8081
```

## Authentication

Authentication is always compiled in and supports enterprise-grade methods:

| Method | Header | Use Case |
|--------|--------|----------|
| **HTTP Basic Auth** | `Authorization: Basic base64(user:pass)` | Simple deployments |
| **Bearer Token** | `Authorization: Bearer <session-id>` | Session-based auth |
| **JWT/OIDC** | `Authorization: Bearer <jwt>` | Enterprise SSO (requires `jwt` feature) |
| **API Keys** | `X-API-Key: <key>` | Service-to-service auth |

### JWT/OIDC Support

For JWT/OIDC token validation, enable the `jwt` feature:

```bash
cargo build -p rivven-schema --features jwt
```

Supports:
- HS256, RS256, ES256 algorithms
- Configurable issuer and audience validation
- JWKS endpoint support for key rotation
- Custom claims mapping (groups, roles)

Authentication integrates with rivven-core's RBAC system, supporting:
- Per-subject access control (read/write/admin permissions)
- Anonymous read access (configurable)
- Rate limiting and lockout protection

### Cedar Policy-Based Authorization

For fine-grained, policy-as-code authorization, enable the `cedar` feature:

```bash
cargo build -p rivven-schema --features cedar
```

Cedar authorization is fully wired and evaluates every request through
`cedar_policy::Authorizer::is_authorized()`. Policies are expressed in the Cedar
language and checked at runtime — this is **not** a stub.

Cedar provides powerful policy expressions with fine-grained access control:

```cedar
// Allow schema admins full access
permit(
  principal in Rivven::Group::"schema-admins",
  action,
  resource is Rivven::Schema
);

// Allow teams to manage their own schemas
permit(
  principal,
  action in [Rivven::Action::"create", Rivven::Action::"alter"],
  resource is Rivven::Schema
) when {
  resource.name.startsWith(principal.team + "-")
};

// Deny deletions outside maintenance windows
forbid(
  principal,
  action == Rivven::Action::"delete",
  resource is Rivven::Schema
) unless {
  context.timestamp.hour >= 2 && context.timestamp.hour <= 6
};
```

```rust,ignore
use rivven_schema::{SchemaServer, ServerConfig, AuthConfig, CedarAuthorizer};
use rivven_core::AuthManager;
use std::sync::Arc;

// Create Cedar authorizer with policies
let authorizer = Arc::new(CedarAuthorizer::new()?);
authorizer.add_policy("schema-admin", r#"
permit(
  principal in Rivven::Group::"schema-admins",
  action,
  resource is Rivven::Schema
);
"#)?;

// Configure server with Cedar
let config = ServerConfig::default()
    .with_auth(AuthConfig::required().with_cedar());

let server = SchemaServer::with_cedar(registry, config, auth_manager, authorizer);
```

### Programmatic Authentication Setup

```rust,ignore
use rivven_schema::{SchemaServer, ServerConfig, AuthConfig};
use rivven_core::AuthManager;
use std::sync::Arc;

// Create auth manager with users
let auth_manager = Arc::new(AuthManager::new());
auth_manager.create_principal("admin", "Secret@123", PrincipalType::User, ["admin"])?;

// Configure server with authentication
let config = ServerConfig::default()
    .with_auth(AuthConfig::required());

let server = SchemaServer::with_auth(registry, config, auth_manager);
```

### CLI Commands

```bash
# Check server health
rivven-schema health --url http://localhost:8081

# Register a schema
rivven-schema register --url http://localhost:8081 --subject user-value --schema schema.avsc

# Get a schema by ID
rivven-schema get --url http://localhost:8081 --id 1

# List all subjects
rivven-schema subjects --url http://localhost:8081

# Check compatibility
rivven-schema compat --url http://localhost:8081 --subject user-value --schema new-schema.avsc
```

## REST API

The server implements a standard Schema Registry REST API plus enterprise extensions:

### Core Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/` | GET | Get server info |
| `/subjects` | GET | List all subjects |
| `/subjects/{subject}/versions` | GET | List versions for a subject |
| `/subjects/{subject}/versions` | POST | Register a new schema (with optional references) |
| `/subjects/{subject}/versions/{version}` | GET | Get schema by subject and version |
| `/subjects/{subject}/versions/{version}/referencedby` | GET | Get schemas referencing this version |
| `/schemas/ids/{id}` | GET | Get schema by global ID |
| `/compatibility/subjects/{subject}/versions/{version}` | POST | Check compatibility |
| `/config` | GET/PUT | Get/set global compatibility config |
| `/config/{subject}` | GET/PUT | Get/set subject compatibility config |

### Version State Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/subjects/{subject}/versions/{version}/state` | GET/PUT | Get/set version state |
| `/subjects/{subject}/versions/{version}/deprecate` | POST | Mark version as deprecated |
| `/subjects/{subject}/versions/{version}/disable` | POST | Disable version |
| `/subjects/{subject}/versions/{version}/enable` | POST | Re-enable version |

### Subject Recovery Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/subjects/deleted` | GET | List soft-deleted subjects |
| `/subjects/{subject}/undelete` | POST | Restore a soft-deleted subject |

### Content Validation Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/subjects/{subject}/validate` | POST | Validate schema without registering |
| `/config/validation/rules` | GET/POST | List/add validation rules |
| `/config/validation/rules/{name}` | DELETE | Delete validation rule |

### Schema Context Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/contexts` | GET/POST | List/create contexts |
| `/contexts/{context}` | GET/DELETE | Get/delete context |
| `/contexts/{context}/subjects` | GET | List subjects in context |

### Monitoring Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/stats` | GET | Get registry statistics |
| `/health` | GET | Health check |
| `/health/live` | GET | Liveness probe |
| `/health/ready` | GET | Readiness probe |

## Compatibility Modes

| Mode | Description |
|------|-------------|
| `BACKWARD` | New schema can read old data (default) |
| `BACKWARD_TRANSITIVE` | New schema can read all previous data |
| `FORWARD` | Old schema can read new data |
| `FORWARD_TRANSITIVE` | All previous schemas can read new data |
| `FULL` | Both backward and forward compatible |
| `FULL_TRANSITIVE` | Both backward and forward compatible with all versions |
| `NONE` | No compatibility checking |

## Schema Formats

### Avro (Recommended)

```json
{
    "type": "record",
    "name": "User",
    "namespace": "com.example",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": null}
    ]
}
```

### JSON Schema

```json
{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"}
    },
    "required": ["id", "name"]
}
```

> **Recursive compatibility:** JSON Schema compatibility checking recursively validates nested `properties`, `items` (array schemas), `additionalProperties`, and `enum` values — not just top-level fields. Incompatibilities are reported with dotted paths (e.g., `address.street`).
```

## Wire Format

The registry uses a standard wire format for encoded messages:

```
+--------+----------------+------------------+
| Magic  | Schema ID      | Avro Payload     |
| (1 B)  | (4 B BE)       | (variable)       |
+--------+----------------+------------------+
| 0x00   | [schema_id]    | [avro_bytes]     |
+--------+----------------+------------------+
```

This allows consumers to look up the schema by ID before deserializing.

## Feature Flags

| Feature | Default | Description |
|---------|---------|-------------|
| `server` | ✅ | HTTP server with industry-standard REST API |
| `cli` | ✅ | Command-line interface |
| `avro` | ✅ | Avro schema parsing and compatibility checking |
| `json-schema` | ✅ | JSON Schema support with validation |
| `protobuf` | ✅ | Protobuf schema parsing and compatibility checking |
| `external` | ❌ | External Schema Registry client |
| `glue` | ❌ | AWS Glue Schema Registry client |
| `jwt` | ❌ | JWT/OIDC token validation (HS256, RS256, ES256) |
| `cedar` | ❌ | Cedar policy-based authorization |
| `metrics` | ❌ | Prometheus metrics for monitoring |

> **Note**: For encoding/decoding data with Avro/Protobuf codecs, use `rivven-connect`.

## Advanced Features

### Schema Contexts (Multi-Tenancy)

Schema contexts provide namespace isolation for multi-tenant deployments:

```rust
use rivven_schema::{SchemaRegistry, SchemaContext, RegistryConfig};

// Create a tenant context
let tenant_ctx = SchemaContext::new("tenant-acme")
    .with_description("ACME Corp schemas");
registry.create_context(tenant_ctx)?;

// Register schema in context using qualified subject name
// Format: :.context:subject
let schema_id = registry.register(
    ":.tenant-acme:user-value",
    SchemaType::Avro,
    schema
).await?;

// List subjects in context
let subjects = registry.list_subjects_in_context("tenant-acme");
```

### Version States (Schema Lifecycle)

Manage schema version lifecycle with states:

```rust
use rivven_schema::{SchemaVersion, VersionState};

// Deprecate a version (warns clients)
registry.deprecate_version("user-value", SchemaVersion::new(1)).await?;

// Disable a version (blocks usage)
registry.disable_version("user-value", SchemaVersion::new(1)).await?;

// Re-enable a version
registry.enable_version("user-value", SchemaVersion::new(1)).await?;
```

| State | Description | Behavior |
|-------|-------------|----------|
| **Enabled** | Active, fully usable | Default state |
| **Deprecated** | Discouraged but usable | Returns warning |
| **Disabled** | Blocked from use | Returns 403 |

### Subject Recovery (Undelete)

Soft-deleted subjects can be recovered within a configurable retention period:

```rust
use rivven_schema::SchemaRegistry;

// Soft delete a subject (default)
let deleted_versions = registry.delete_subject("user-value", false).await?;
println!("Deleted versions: {:?}", deleted_versions);

// List deleted subjects available for recovery
let deleted = registry.list_deleted_subjects().await?;
for subject in &deleted {
    println!("Can recover: {}", subject);
}

// Recover a deleted subject
let restored_versions = registry.undelete_subject("user-value").await?;
println!("Restored versions: {:?}", restored_versions);

// Permanent delete (cannot be recovered)
registry.delete_subject("user-value", true).await?;
```

**Note:** Soft-deleted subjects are moved to a recoverable state. Permanent deletes cannot be undone.

### Content Validation Rules

Enforce content rules beyond compatibility checking:

```rust
use rivven_schema::{ValidationRule, ValidationRuleType, ValidationLevel};

// Add a max size rule
registry.add_validation_rule(
    ValidationRule::new("max-size", ValidationRuleType::MaxSize, r#"{"max_bytes": 102400}"#)
        .with_level(ValidationLevel::Error)
);

// Validate before registering
let report = registry.validate_schema(SchemaType::Avro, "subject", schema)?;
if !report.is_valid() {
    println!("Errors: {:?}", report.error_messages());
}
```

Available rule types: `MaxSize`, `NamingConvention`, `FieldRequired`, `FieldType`, `Regex`, `JsonSchema`.

### Schema References (Cross-Schema Dependencies)

Schema references allow schemas to reference types defined in other schemas. This is essential for:
- Sharing common types (e.g., `Address`, `Money`) across multiple schemas
- Managing complex domain models with reusable building blocks
- Protobuf imports and JSON Schema `$ref` support

```rust
use rivven_schema::{SchemaRegistry, SchemaType, SchemaReference, RegistryConfig};

// Register a base schema first
let address_schema = r#"{
    "type": "record",
    "name": "Address",
    "fields": [
        {"name": "street", "type": "string"},
        {"name": "city", "type": "string"}
    ]
}"#;
let address_id = registry.register("address-value", SchemaType::Avro, address_schema).await?;

// Register a schema that references Address
let user_schema = r#"{
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "address": {"$ref": "Address"}
    }
}"#;
let refs = vec![
    SchemaReference {
        name: "Address".to_string(),
        subject: "address-value".to_string(),
        version: 1,
    }
];
let user_id = registry.register_with_references(
    "user-value",
    SchemaType::Json,
    user_schema,
    refs
).await?;

// Find all schemas that reference a given schema
let referencing = registry.get_schemas_referencing("address-value", SchemaVersion::new(1)).await?;
```

**API Endpoints:**

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/subjects/{subject}/versions` | POST | Register schema with `references` array |
| `/subjects/{subject}/versions/{version}/referencedby` | GET | Get schemas referencing this version |

**Request body for registration with references:**
```json
{
    "schema": "{...}",
    "schemaType": "JSON",
    "references": [
        {"name": "Address", "subject": "address-value", "version": 1}
    ]
}
```

### Prometheus Metrics

Enable monitoring with the `metrics` feature:

```bash
cargo build -p rivven-schema --features metrics
```

```rust
use rivven_schema::{SchemaRegistry, RegistryConfig, MetricsConfig};

let registry = SchemaRegistry::with_metrics(
    RegistryConfig::memory(),
    MetricsConfig::default()
).await?;
```

Metrics include: `schemas_registered_total`, `schemas_lookups_total`, `compatibility_checks_total`, `operation_duration_seconds`, etc.

## Standard Wire Format

### Avro Wire Format

When using Avro with Schema Registry, data is encoded with a 5-byte header:

```
+--------+----------------+------------------+
| Magic  | Schema ID      | Avro Payload     |
| (1 B)  | (4 B BE)       | (variable)       |
+--------+----------------+------------------+
| 0x00   | [schema_id]    | [avro_bytes]     |
+--------+----------------+------------------+
```

### Protobuf Wire Format

For Protobuf, the format includes a message index (varint):

```
+--------+----------------+-------------+------------------+
| Magic  | Schema ID      | Msg Index   | Protobuf Payload |
| (1 B)  | (4 B BE)       | (varint)    | (variable)       |
+--------+----------------+-------------+------------------+
| 0x00   | [schema_id]    | 0x00        | [proto_bytes]    |
+--------+----------------+-------------+------------------+
```

This format is compatible with standard producers/consumers using common serializers.

## Best Practices

1. **Use Avro in Production**: Schema evolution with compatibility checking
2. **Subject Naming**: Use `{topic}-key` and `{topic}-value` convention
3. **Compatibility Level**: Start with `BACKWARD` for safe evolution
4. **Versioning**: Never delete schemas, only deprecate
5. **Deduplication**: Same schema content gets the same ID across subjects

## Integration with Rivven Connect

```rust
use rivven_connect::schema::{SchemaRegistryClient, SchemaRegistryConfig, SchemaType, Subject};

// Create registry client (multiple modes available)
let config = SchemaRegistryConfig::external("http://localhost:8081");
let registry = SchemaRegistryClient::from_config_async(&config, None).await?;

// Register a schema
let schema_id = registry.register(
    &Subject::value("users"),
    SchemaType::Avro,
    r#"{"type":"record","name":"User","fields":[...]}"#
).await?;
```

## Documentation

- [Schema Registry]https://rivven.hupe1980.github.io/rivven/docs/schema-registry
- [Architecture]https://rivven.hupe1980.github.io/rivven/docs/architecture

## License

Apache-2.0. See [LICENSE](../../LICENSE).