pub enum Request {
Show 40 variants
Authenticate {
username: String,
password: String,
require_tls: bool,
},
SaslAuthenticate {
mechanism: Bytes,
auth_bytes: Bytes,
},
ScramClientFirst {
message: Bytes,
},
ScramClientFinal {
message: Bytes,
},
Publish {
topic: String,
partition: Option<u32>,
key: Option<Bytes>,
value: Bytes,
leader_epoch: Option<u64>,
},
Consume {
topic: String,
partition: u32,
offset: u64,
max_messages: u32,
isolation_level: Option<u8>,
max_wait_ms: Option<u64>,
},
CreateTopic {
name: String,
partitions: Option<u32>,
},
ListTopics,
DeleteTopic {
name: String,
},
CommitOffset {
consumer_group: String,
topic: String,
partition: u32,
offset: u64,
},
GetOffset {
consumer_group: String,
topic: String,
partition: u32,
},
GetMetadata {
topic: String,
},
GetClusterMetadata {
topics: Vec<String>,
},
Ping,
GetOffsetBounds {
topic: String,
partition: u32,
},
ListGroups,
DescribeGroup {
consumer_group: String,
},
DeleteGroup {
consumer_group: String,
},
GetOffsetForTimestamp {
topic: String,
partition: u32,
timestamp_ms: i64,
},
InitProducerId {
producer_id: Option<u64>,
},
IdempotentPublish {
topic: String,
partition: Option<u32>,
key: Option<Bytes>,
value: Bytes,
producer_id: u64,
producer_epoch: u16,
sequence: i32,
leader_epoch: Option<u64>,
},
BeginTransaction {
txn_id: String,
producer_id: u64,
producer_epoch: u16,
timeout_ms: Option<u64>,
},
AddPartitionsToTxn {
txn_id: String,
producer_id: u64,
producer_epoch: u16,
partitions: Vec<(String, u32)>,
},
TransactionalPublish {
txn_id: String,
topic: String,
partition: Option<u32>,
key: Option<Bytes>,
value: Bytes,
producer_id: u64,
producer_epoch: u16,
sequence: i32,
leader_epoch: Option<u64>,
},
AddOffsetsToTxn {
txn_id: String,
producer_id: u64,
producer_epoch: u16,
group_id: String,
offsets: Vec<(String, u32, i64)>,
},
CommitTransaction {
txn_id: String,
producer_id: u64,
producer_epoch: u16,
},
AbortTransaction {
txn_id: String,
producer_id: u64,
producer_epoch: u16,
},
DescribeQuotas {
entities: Vec<(String, Option<String>)>,
},
AlterQuotas {
alterations: Vec<QuotaAlteration>,
},
AlterTopicConfig {
topic: String,
configs: Vec<TopicConfigEntry>,
},
CreatePartitions {
topic: String,
new_partition_count: u32,
assignments: Vec<Vec<String>>,
},
DeleteRecords {
topic: String,
partition_offsets: Vec<(u32, u64)>,
},
DescribeTopicConfigs {
topics: Vec<String>,
},
Handshake {
protocol_version: u32,
client_id: String,
},
JoinGroup {
group_id: String,
member_id: String,
session_timeout_ms: u32,
rebalance_timeout_ms: u32,
protocol_type: String,
subscriptions: Vec<String>,
},
SyncGroup {
group_id: String,
generation_id: u32,
member_id: String,
assignments: SyncGroupAssignments,
},
Heartbeat {
group_id: String,
generation_id: u32,
member_id: String,
},
LeaveGroup {
group_id: String,
member_id: String,
},
PublishBatch {
topic: String,
partition: Option<u32>,
records: Vec<BatchRecord>,
leader_epoch: Option<u64>,
},
IdempotentPublishBatch {
topic: String,
partition: Option<u32>,
records: Vec<BatchRecord>,
producer_id: u64,
producer_epoch: u16,
base_sequence: i32,
leader_epoch: Option<u64>,
},
}Expand description
Protocol request messages
§Stability
WARNING: Variant order must remain stable for postcard serialization compatibility. Adding new variants should only be done at the end of the enum.
Variants§
Authenticate
Authenticate with username/password (SASL/PLAIN compatible).
§Security — transport encryption required
The password is sent in plaintext on the wire (SASL/PLAIN). This variant must only be used over a TLS-encrypted connection; otherwise the password is exposed to network observers.
§Deprecation
Prefer ScramClientFirst / ScramClientFinal (SCRAM-SHA-256
challenge-response) which never sends the password over the wire.
Fields
username: StringSaslAuthenticate
Authenticate with SASL bytes (for Kafka client compatibility)
ScramClientFirst
SCRAM-SHA-256: Client-first message
ScramClientFinal
SCRAM-SHA-256: Client-final message
Publish
Publish a message to a topic
Fields
Consume
Consume messages from a topic
Fields
CreateTopic
Create a new topic
ListTopics
List all topics
DeleteTopic
Delete a topic
CommitOffset
Commit consumer offset
GetOffset
Get consumer offset
GetMetadata
Get topic metadata
GetClusterMetadata
Get cluster metadata (all topics or specific ones)
Ping
Ping
GetOffsetBounds
Get offset bounds for a partition
ListGroups
List all consumer groups
DescribeGroup
Describe a consumer group (get all offsets)
DeleteGroup
Delete a consumer group
GetOffsetForTimestamp
Find offset for a timestamp
InitProducerId
Initialize idempotent producer (request producer ID and epoch)
Call this before sending idempotent produce requests. If reconnecting, provide the previous producer_id to bump epoch.
IdempotentPublish
Publish with idempotent semantics (exactly-once delivery)
Requires InitProducerId to have been called first.
Fields
BeginTransaction
Begin a new transaction
Fields
AddPartitionsToTxn
Add partitions to an active transaction
Fields
TransactionalPublish
Publish within a transaction (combines IdempotentPublish + transaction tracking)
Fields
AddOffsetsToTxn
Add consumer offsets to transaction (for exactly-once consume-transform-produce)
Fields
CommitTransaction
Commit a transaction
AbortTransaction
Abort a transaction
DescribeQuotas
Describe quotas for entities
Fields
AlterQuotas
Alter quotas for entities
Fields
alterations: Vec<QuotaAlteration>Quota alterations to apply Each item: (entity_type, entity_name, quota_key, quota_value) quota_key: “produce_bytes_rate”, “consume_bytes_rate”, “request_rate” quota_value: None to remove, Some(value) to set
AlterTopicConfig
Alter topic configuration
CreatePartitions
Create additional partitions for an existing topic
Fields
DeleteRecords
Delete records before a given offset (log truncation)
Fields
DescribeTopicConfigs
Describe topic configurations
Handshake
Protocol version handshake
Sent by the client as the first message after connecting. The server validates the protocol version and returns compatibility info.
Fields
JoinGroup
Join a consumer group. The coordinator assigns a member ID and, once all members have joined, elects a leader and triggers a rebalance. Returns the generation ID, member ID, and leader info.
Fields
session_timeout_ms: u32Session timeout in milliseconds — if no heartbeat is received within this period, the member is considered dead and a rebalance is triggered.
SyncGroup
Sync a consumer group — the leader sends partition assignments to the coordinator, and all members (including the leader) receive their individual assignments.
Fields
assignments: SyncGroupAssignmentsAssignments (only sent by leader; empty for followers).
Each entry is (member_id, Vec<(topic, Vec<partition>)>).
Heartbeat
Heartbeat — sent periodically by group members to keep their session alive. The response tells the member whether a rebalance is needed.
Fields
LeaveGroup
Leave a consumer group gracefully. Triggers an immediate rebalance for the remaining members.
PublishBatch
Publish a batch of records to a single (topic, partition) in one
wire message — eliminates per-record topic cloning and serialization
overhead compared to sending N individual Publish requests.
Fields
records: Vec<BatchRecord>IdempotentPublishBatch
Idempotent batch publish — combines Request::IdempotentPublish semantics
with the batching efficiency of Request::PublishBatch.
Records are assigned sequence numbers base_sequence..base_sequence + N - 1.
Implementations§
Source§impl Request
impl Request
Sourcepub fn to_bytes(&self) -> Result<Vec<u8>>
pub fn to_bytes(&self) -> Result<Vec<u8>>
Serialize request to bytes (postcard format, no format prefix)
For internal Rust-to-Rust communication where format is known.
Use to_wire() for wire transmission with format detection support.
Sourcepub fn from_bytes(data: &[u8]) -> Result<Self>
pub fn from_bytes(data: &[u8]) -> Result<Self>
Deserialize request from bytes (postcard format)
For internal Rust-to-Rust communication where format is known.
Use from_wire() for wire transmission with format detection support.
Sourcepub fn validate(&self) -> Result<()>
pub fn validate(&self) -> Result<()>
Post-deserialization bounds validation.
Prevents crafted payloads with oversized strings / vecs from being
accepted. Must be called after every from_bytes / from_wire.
Sourcepub fn to_wire(
&self,
format: WireFormat,
correlation_id: u32,
) -> Result<Vec<u8>>
pub fn to_wire( &self, format: WireFormat, correlation_id: u32, ) -> Result<Vec<u8>>
Serialize request with wire format prefix
Wire format: [format_byte][correlation_id (4 bytes BE)][payload]
- format_byte: 0x00 = postcard, 0x01 = protobuf
- correlation_id: 4-byte big-endian u32 for request-response matching
- payload: serialized message
§Correlation ID sizing
The wire protocol uses u32 for correlation IDs (4 bytes), which is
the Kafka-compatible choice and sufficient for client-server RPCs
(4 billion in-flight requests before wrap-around). The cluster-internal
protocol (rivven-cluster) uses u64 for its own RPC correlation to
avoid any wrap-around concern on high-throughput inter-node links.
The two namespaces are independent and never cross boundaries.
Note: Length prefix is NOT included (handled by transport layer)
§Errors
Returns ProtocolError::MessageTooLarge if the serialized message
exceeds MAX_MESSAGE_SIZE.