rivven-protocol 0.0.22

Wire protocol types for Rivven distributed event streaming
Documentation
# rivven-protocol

> Wire protocol types for the Rivven distributed event streaming platform.

## Overview

This crate defines canonical wire protocol types for client-server communication. Both `rivven-client` and `rivvend` depend on this crate to ensure wire compatibility.

## Features

| Category | Types |
|:---------|:------|
| **Requests** | Produce, Consume, CreateTopic, DeleteTopic, Metadata, etc. |
| **Responses** | ProduceResponse, ConsumeResponse, MetadataResponse, etc. |
| **Messages** | `MessageData` with key, value, headers, timestamp |
| **Metadata** | `BrokerInfo`, `TopicMetadata`, `PartitionMetadata` |

## Protocol Constants

| Constant | Value | Description |
|:---------|:------|:------------|
| `PROTOCOL_VERSION` | 1 | Current wire protocol version |
| `MAX_MESSAGE_SIZE` | 64 MB | Maximum serialized message size |

## Usage

```rust
use rivven_protocol::{Request, Response, WireFormat};

// Create a produce request
let request = Request::Publish {
    topic: "events".to_string(),
    partition: Some(0),
    key: None,
    value: bytes::Bytes::from_static(b"Hello, Rivven!"),
};

// Serialize with wire format byte prefix (recommended)
let wire_bytes = request.to_wire(WireFormat::Postcard)?;

// Deserialize with auto-detection
let (request, format) = Request::from_wire(&wire_bytes)?;
assert_eq!(format, WireFormat::Postcard);
```

## Wire Format

Messages use a **format-prefixed** wire protocol with correlation IDs:

```
┌─────────────────┬──────────────────┬──────────────────┬─────────────────────────┐
│ Length (4 bytes)│ Format (1 byte)  │ Correlation (4B) │ Payload (N bytes)       │
│ Big-endian u32  │ 0x00 = postcard  │ Big-endian u32   │ Serialized Request/Resp │
│                 │ 0x01 = protobuf  │                  │                         │
└─────────────────┴──────────────────┴──────────────────┴─────────────────────────┘
```

### Format Byte Values

| Byte | Format | Description |
|:-----|:-------|:------------|
| `0x00` | postcard | Rust-native, fastest (~50ns serialize) |
| `0x01` | protobuf | Cross-language compatible |

### Serialization Methods

| Method | Description |
|:-------|:------------|
| `to_wire(format, correlation_id)` | Serialize with format byte + correlation ID |
| `from_wire(data)` | Deserialize with auto-format detection, returns (msg, format, correlation_id) |
| `to_bytes()` | Postcard only, no format prefix (internal use) |
| `from_bytes(data)` | Postcard only, no format prefix (internal use) |

### Connection Handshake

Clients should send a `Handshake` request as the first message:

```rust
// Send version handshake
let handshake = Request::Handshake {
    protocol_version: PROTOCOL_VERSION,
    client_id: "my-app".into(),
};
let wire = handshake.to_wire(WireFormat::Postcard, 0)?;

// Server responds with compatibility info
// Response::HandshakeResult { server_version, compatible, message }
```

### Server Behavior

The server auto-detects the wire format from the first byte and responds in the same format the client used.

### Protobuf Feature

Enable protobuf serialization with the `protobuf` feature:

```toml
[dependencies]
rivven-protocol = { version = "0.0.22", features = ["protobuf"] }
```

```rust
use rivven_protocol::{Request, WireFormat};

// Serialize request as protobuf
let request = Request::Ping;
let wire_bytes = request.to_wire(WireFormat::Protobuf)?;

// First byte is 0x01 (protobuf)
assert_eq!(wire_bytes[0], 0x01);
```

## Cross-Language Support

### Architecture

The Rivven broker uses **postcard** for maximum performance with Rust clients. For other languages, the protocol is documented via **Protocol Buffers**:

```
┌─────────────────┐                    ┌─────────────────┐
│   Rust Client   │───── postcard ────►│                 │
│ (rivven-client) │◄──── postcard ─────│     rivvend     │
└─────────────────┘                    │    (broker)     │
                                       └─────────────────┘
```

### Multi-Language Clients

For Go, Java, Python, and other languages, use the proto file as the **wire format specification**:

| Format | Use Case | Performance |
|:-------|:---------|:------------|
| **postcard** | Rust clients (native) | Fastest (~50ns serialize) |
| **protobuf** | Reference spec for other languages | Fast (~200ns serialize) |

### Protobuf Schema

The proto file at [`proto/rivven.proto`](proto/rivven.proto) documents the wire format. Client implementers specify their own package names:

```protobuf
package rivven.protocol.v1;

message PublishRequest {
  string topic = 1;
  optional uint32 partition = 2;
  Record record = 3;
  optional uint64 leader_epoch = 4;  // Epoch fencing for partition leaders
}

message Record {
  bytes key = 1;
  bytes value = 2;
  repeated RecordHeader headers = 3;
  int64 timestamp = 4;
  bool has_key = 5;  // Distinguishes None key from empty key
}
```

### Generate Client Stubs

```bash
# Go
protoc --go_out=. --go_opt=Mrivven.proto=github.com/yourorg/rivven-go/protocol proto/rivven.proto

# Java
protoc --java_out=. proto/rivven.proto

# Python
protoc --python_out=. proto/rivven.proto

# TypeScript (ts-proto)
protoc --plugin=./node_modules/.bin/protoc-gen-ts_proto --ts_proto_out=. proto/rivven.proto
```

### Implementing a Client

To implement a client in another language:

1. Generate code from `proto/rivven.proto`
2. Connect to broker on port 9092
3. Use length-prefixed framing: `[4-byte length][1-byte format][payload]`
4. Use format byte `0x01` for protobuf
5. Serialize requests using protobuf
6. Parse responses using protobuf (server responds in same format)

### Proto Message Types

The protobuf schema defines 31+ message types covering all major protocol operations:

| Category | Message Types |
|:---------|:--------------|
| **Core** | `Request`, `Response`, `Record`, `ErrorCode` |
| **Publish/Consume** | `PublishRequest`, `BatchPublishRequest`, `ConsumeResponse` |
| **Metadata** | `GetMetadataResponse`, `GetClusterMetadataResponse`, `GetOffsetBoundsResponse` |
| **Consumer Groups** | `ListGroupsResponse`, `DescribeGroupResponse`, `DeleteGroupResponse`, `JoinGroupRequest/Result`, `SyncGroupRequest/Result`, `HeartbeatRequest/Result`, `LeaveGroupRequest/Result` |
| **Transactions** | `BeginTransactionResponse`, `CommitTransactionResponse`, `AbortTransactionResponse` |
| **Idempotent** | `InitProducerIdResponse`, `IdempotentPublishResponse` |
| **Admin** | `AlterTopicConfigRequest`, `DescribeTopicConfigsResponse`, `CreatePartitionsRequest`, `DeleteRecordsRequest` |
| **Time Queries** | `GetOffsetForTimestampRequest` / `Response` |

## Type Safety

Wire protocol conversions use validated and saturating casts to prevent silent truncation of untrusted wire data:

- **`safe_producer_epoch()`**: Validates `u32 → u16` conversion for producer epoch — returns `ProtocolError::InvalidFormat` on overflow
- **Saturating casts**: `max_messages` (`usize → u32`), `expires_in` (`u64 → u32`), and `port` (`u32 → u16`) use `try_from().unwrap_or(T::MAX)` for safe saturation
- **Credential redaction**: Custom `Debug` impl redacts `password` and `auth_bytes` fields in log output
- **Variant stability**: Enum discriminant order is tested to ensure backward compatibility across protocol versions
- **Node ID mapping**: Proto `ClusterMetadata` maps string node IDs (e.g., `"node-1"`) to deterministic `u32` values via FNV-1a hashing (`node_id_to_u32`) when `parse::<u32>()` fails — the same mapping is used for both `BrokerInfo.id` and partition `leader`/`replicas`/`isr` fields, ensuring consistency. FNV-1a is stable across Rust toolchain versions and platforms (unlike `DefaultHasher`).
- **Post-deserialization validation**: All `from_bytes()`, `from_wire()`, and `from_proto_bytes()` methods perform bounds validation on deserialized `Request` and `Response` fields (string lengths, list sizes, auth payload limits) immediately after parsing, rejecting malformed payloads with `ProtocolError` before any handler logic executes

## License

Apache-2.0. See [LICENSE](../../LICENSE).