rivven-protocol
Wire protocol types for the Rivven distributed event streaming platform.
Overview
This crate defines canonical wire protocol types for client-server communication. Both rivven-client and rivvend depend on this crate to ensure wire compatibility.
Features
| Category | Types |
|---|---|
| Requests | Produce, Consume, CreateTopic, DeleteTopic, Metadata, etc. |
| Responses | ProduceResponse, ConsumeResponse, MetadataResponse, etc. |
| Messages | MessageData with key, value, headers, timestamp |
| Metadata | BrokerInfo, TopicMetadata, PartitionMetadata |
Protocol Constants
| Constant | Value | Description |
|---|---|---|
PROTOCOL_VERSION |
1 | Current wire protocol version |
MAX_MESSAGE_SIZE |
64 MB | Maximum serialized message size |
Usage
use ;
// Create a produce request
let request = Publish ;
// Serialize with wire format byte prefix (recommended)
let wire_bytes = request.to_wire?;
// Deserialize with auto-detection
let = from_wire?;
assert_eq!;
Wire Format
Messages use a format-prefixed wire protocol with correlation IDs:
┌─────────────────┬──────────────────┬──────────────────┬─────────────────────────┐
│ Length (4 bytes)│ Format (1 byte) │ Correlation (4B) │ Payload (N bytes) │
│ Big-endian u32 │ 0x00 = postcard │ Big-endian u32 │ Serialized Request/Resp │
│ │ 0x01 = protobuf │ │ │
└─────────────────┴──────────────────┴──────────────────┴─────────────────────────┘
Format Byte Values
| Byte | Format | Description |
|---|---|---|
0x00 |
postcard | Rust-native, fastest (~50ns serialize) |
0x01 |
protobuf | Cross-language compatible |
Serialization Methods
| Method | Description |
|---|---|
to_wire(format, correlation_id) |
Serialize with format byte + correlation ID |
from_wire(data) |
Deserialize with auto-format detection, returns (msg, format, correlation_id) |
to_bytes() |
Postcard only, no format prefix (internal use) |
from_bytes(data) |
Postcard only, no format prefix (internal use) |
Connection Handshake
Clients should send a Handshake request as the first message:
// Send version handshake
let handshake = Handshake ;
let wire = handshake.to_wire?;
// Server responds with compatibility info
// Response::HandshakeResult { server_version, compatible, message }
Server Behavior
The server auto-detects the wire format from the first byte and responds in the same format the client used.
Protobuf Feature
Enable protobuf serialization with the protobuf feature:
[]
= { = "0.0.22", = ["protobuf"] }
use ;
// Serialize request as protobuf
let request = Ping;
let wire_bytes = request.to_wire?;
// First byte is 0x01 (protobuf)
assert_eq!;
Cross-Language Support
Architecture
The Rivven broker uses postcard for maximum performance with Rust clients. For other languages, the protocol is documented via Protocol Buffers:
┌─────────────────┐ ┌─────────────────┐
│ Rust Client │───── postcard ────►│ │
│ (rivven-client) │◄──── postcard ─────│ rivvend │
└─────────────────┘ │ (broker) │
└─────────────────┘
Multi-Language Clients
For Go, Java, Python, and other languages, use the proto file as the wire format specification:
| Format | Use Case | Performance |
|---|---|---|
| postcard | Rust clients (native) | Fastest (~50ns serialize) |
| protobuf | Reference spec for other languages | Fast (~200ns serialize) |
Protobuf Schema
The proto file at proto/rivven.proto documents the wire format. Client implementers specify their own package names:
package rivven.protocol.v1;
message PublishRequest {
string topic = 1;
optional uint32 partition = 2;
Record record = 3;
optional uint64 leader_epoch = 4; // Epoch fencing for partition leaders
}
message Record {
bytes key = 1;
bytes value = 2;
repeated RecordHeader headers = 3;
int64 timestamp = 4;
bool has_key = 5; // Distinguishes None key from empty key
}
Generate Client Stubs
# Go
# Java
# Python
# TypeScript (ts-proto)
Implementing a Client
To implement a client in another language:
- Generate code from
proto/rivven.proto - Connect to broker on port 9092
- Use length-prefixed framing:
[4-byte length][1-byte format][payload] - Use format byte
0x01for protobuf - Serialize requests using protobuf
- Parse responses using protobuf (server responds in same format)
Proto Message Types
The protobuf schema defines 31+ message types covering all major protocol operations:
| Category | Message Types |
|---|---|
| Core | Request, Response, Record, ErrorCode |
| Publish/Consume | PublishRequest, BatchPublishRequest, ConsumeResponse |
| Metadata | GetMetadataResponse, GetClusterMetadataResponse, GetOffsetBoundsResponse |
| Consumer Groups | ListGroupsResponse, DescribeGroupResponse, DeleteGroupResponse, JoinGroupRequest/Result, SyncGroupRequest/Result, HeartbeatRequest/Result, LeaveGroupRequest/Result |
| Transactions | BeginTransactionResponse, CommitTransactionResponse, AbortTransactionResponse |
| Idempotent | InitProducerIdResponse, IdempotentPublishResponse |
| Admin | AlterTopicConfigRequest, DescribeTopicConfigsResponse, CreatePartitionsRequest, DeleteRecordsRequest |
| Time Queries | GetOffsetForTimestampRequest / Response |
Type Safety
Wire protocol conversions use validated and saturating casts to prevent silent truncation of untrusted wire data:
safe_producer_epoch(): Validatesu32 → u16conversion for producer epoch — returnsProtocolError::InvalidFormaton overflow- Saturating casts:
max_messages(usize → u32),expires_in(u64 → u32), andport(u32 → u16) usetry_from().unwrap_or(T::MAX)for safe saturation - Credential redaction: Custom
Debugimpl redactspasswordandauth_bytesfields in log output - Variant stability: Enum discriminant order is tested to ensure backward compatibility across protocol versions
- Node ID mapping: Proto
ClusterMetadatamaps string node IDs (e.g.,"node-1") to deterministicu32values via FNV-1a hashing (node_id_to_u32) whenparse::<u32>()fails — the same mapping is used for bothBrokerInfo.idand partitionleader/replicas/isrfields, ensuring consistency. FNV-1a is stable across Rust toolchain versions and platforms (unlikeDefaultHasher). - Post-deserialization validation: All
from_bytes(),from_wire(), andfrom_proto_bytes()methods perform bounds validation on deserializedRequestandResponsefields (string lengths, list sizes, auth payload limits) immediately after parsing, rejecting malformed payloads withProtocolErrorbefore any handler logic executes
License
Apache-2.0. See LICENSE.