rivven_protocol/messages.rs
1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Partition assignments for SyncGroup: `Vec<(member_id, Vec<(topic, Vec<partition>)>)>`
11pub type SyncGroupAssignments = Vec<(String, Vec<(String, Vec<u32>)>)>;
12
13/// Quota alteration request
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct QuotaAlteration {
16 /// Entity type: "user", "client-id", "consumer-group", "default"
17 pub entity_type: String,
18 /// Entity name (None for defaults)
19 pub entity_name: Option<String>,
20 /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
21 pub quota_key: String,
22 /// Quota value (None to remove quota, Some to set)
23 pub quota_value: Option<u64>,
24}
25
26/// Quota entry in describe response
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct QuotaEntry {
29 /// Entity type
30 pub entity_type: String,
31 /// Entity name (None for defaults)
32 pub entity_name: Option<String>,
33 /// Quota values
34 pub quotas: HashMap<String, u64>,
35}
36
37/// Topic configuration entry for AlterTopicConfig
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct TopicConfigEntry {
40 /// Configuration key (e.g., "retention.ms", "max.message.bytes")
41 pub key: String,
42 /// Configuration value (None to reset to default)
43 pub value: Option<String>,
44}
45
46/// Topic configuration in describe response
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct TopicConfigDescription {
49 /// Topic name
50 pub topic: String,
51 /// Configuration entries
52 pub configs: HashMap<String, TopicConfigValue>,
53}
54
55/// Topic configuration value with metadata
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TopicConfigValue {
58 /// Current value
59 pub value: String,
60 /// Whether this is the default value
61 pub is_default: bool,
62 /// Whether this config is read-only
63 pub is_read_only: bool,
64 /// Whether this config is sensitive (e.g., passwords)
65 pub is_sensitive: bool,
66}
67
68/// Delete records result for a partition
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct DeleteRecordsResult {
71 /// Partition ID
72 pub partition: u32,
73 /// New low watermark (first available offset after deletion)
74 pub low_watermark: u64,
75 /// Error message if deletion failed for this partition
76 pub error: Option<String>,
77}
78
79/// A single record within a [`Request::PublishBatch`].
80///
81/// Carries only the key and value — the topic, partition, and
82/// leader_epoch are shared across the entire batch.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct BatchRecord {
85 #[serde(with = "crate::serde_utils::option_bytes_serde")]
86 pub key: Option<Bytes>,
87 #[serde(with = "crate::serde_utils::bytes_serde")]
88 pub value: Bytes,
89}
90
91/// Protocol request messages
92///
93/// # Stability
94///
95/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
96/// Adding new variants should only be done at the end of the enum.
97#[derive(Clone, Serialize, Deserialize)]
98pub enum Request {
99 /// Authenticate with username/password (SASL/PLAIN compatible).
100 ///
101 /// # Security — transport encryption required
102 ///
103 /// The password is sent in **plaintext** on the wire (SASL/PLAIN).
104 /// This variant must only be used over a TLS-encrypted connection;
105 /// otherwise the password is exposed to network observers.
106 ///
107 /// # Deprecation
108 ///
109 /// Prefer `ScramClientFirst` / `ScramClientFinal` (SCRAM-SHA-256
110 /// challenge-response) which never sends the password over the wire.
111 #[deprecated(
112 note = "Use SCRAM-SHA-256 (ScramClientFirst/ScramClientFinal) instead of plaintext auth"
113 )]
114 Authenticate {
115 username: String,
116 password: String,
117 /// When `true` the server **must** reject this request if the
118 /// connection is not TLS-encrypted, preventing accidental
119 /// credential exposure on cleartext transports.
120 #[serde(default)]
121 require_tls: bool,
122 },
123
124 /// Authenticate with SASL bytes (for Kafka client compatibility)
125 SaslAuthenticate {
126 #[serde(with = "crate::serde_utils::bytes_serde")]
127 mechanism: Bytes,
128 #[serde(with = "crate::serde_utils::bytes_serde")]
129 auth_bytes: Bytes,
130 },
131
132 /// SCRAM-SHA-256: Client-first message
133 ScramClientFirst {
134 /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
135 #[serde(with = "crate::serde_utils::bytes_serde")]
136 message: Bytes,
137 },
138
139 /// SCRAM-SHA-256: Client-final message
140 ScramClientFinal {
141 /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
142 #[serde(with = "crate::serde_utils::bytes_serde")]
143 message: Bytes,
144 },
145
146 /// Publish a message to a topic
147 Publish {
148 topic: String,
149 partition: Option<u32>,
150 #[serde(with = "crate::serde_utils::option_bytes_serde")]
151 key: Option<Bytes>,
152 #[serde(with = "crate::serde_utils::bytes_serde")]
153 value: Bytes,
154 /// Leader epoch for data-path fencing (§2.4).
155 /// When set, the broker rejects the write if its current leader epoch
156 /// for this partition is higher, preventing stale-leader writes.
157 #[serde(default)]
158 leader_epoch: Option<u64>,
159 },
160
161 /// Consume messages from a topic
162 Consume {
163 topic: String,
164 partition: u32,
165 offset: u64,
166 max_messages: u32,
167 /// Isolation level for transactional reads
168 /// None = read_uncommitted (default, backward compatible)
169 /// Some(0) = read_uncommitted
170 /// Some(1) = read_committed (filters aborted transaction messages)
171 #[serde(default)]
172 isolation_level: Option<u8>,
173 /// Long-polling: maximum time (ms) to wait for new data before returning
174 /// an empty response. None or 0 = immediate (no waiting). Capped at 30 000 ms.
175 #[serde(default)]
176 max_wait_ms: Option<u64>,
177 },
178
179 /// Create a new topic
180 CreateTopic {
181 name: String,
182 partitions: Option<u32>,
183 },
184
185 /// List all topics
186 ListTopics,
187
188 /// Delete a topic
189 DeleteTopic { name: String },
190
191 /// Commit consumer offset
192 CommitOffset {
193 consumer_group: String,
194 topic: String,
195 partition: u32,
196 offset: u64,
197 },
198
199 /// Get consumer offset
200 GetOffset {
201 consumer_group: String,
202 topic: String,
203 partition: u32,
204 },
205
206 /// Get topic metadata
207 GetMetadata { topic: String },
208
209 /// Get cluster metadata (all topics or specific ones)
210 GetClusterMetadata {
211 /// Topics to get metadata for (empty = all topics)
212 topics: Vec<String>,
213 },
214
215 /// Ping
216 Ping,
217
218 /// Get offset bounds for a partition
219 GetOffsetBounds { topic: String, partition: u32 },
220
221 /// List all consumer groups
222 ListGroups,
223
224 /// Describe a consumer group (get all offsets)
225 DescribeGroup { consumer_group: String },
226
227 /// Delete a consumer group
228 DeleteGroup { consumer_group: String },
229
230 /// Find offset for a timestamp
231 GetOffsetForTimestamp {
232 topic: String,
233 partition: u32,
234 /// Timestamp in milliseconds since epoch
235 timestamp_ms: i64,
236 },
237
238 // =========================================================================
239 // Idempotent Producer
240 // =========================================================================
241 /// Initialize idempotent producer (request producer ID and epoch)
242 ///
243 /// Call this before sending idempotent produce requests.
244 /// If reconnecting, provide the previous producer_id to bump epoch.
245 InitProducerId {
246 /// Previous producer ID (None for new producers)
247 producer_id: Option<u64>,
248 },
249
250 /// Publish with idempotent semantics (exactly-once delivery)
251 ///
252 /// Requires InitProducerId to have been called first.
253 IdempotentPublish {
254 topic: String,
255 partition: Option<u32>,
256 #[serde(with = "crate::serde_utils::option_bytes_serde")]
257 key: Option<Bytes>,
258 #[serde(with = "crate::serde_utils::bytes_serde")]
259 value: Bytes,
260 /// Producer ID from InitProducerId response
261 producer_id: u64,
262 /// Producer epoch from InitProducerId response
263 producer_epoch: u16,
264 /// Sequence number (starts at 0, increments per partition)
265 sequence: i32,
266 /// Leader epoch for fencing stale writes (§2.4)
267 #[serde(default)]
268 leader_epoch: Option<u64>,
269 },
270
271 // =========================================================================
272 // Native Transactions
273 // =========================================================================
274 /// Begin a new transaction
275 BeginTransaction {
276 /// Transaction ID (unique per producer)
277 txn_id: String,
278 /// Producer ID from InitProducerId
279 producer_id: u64,
280 /// Producer epoch
281 producer_epoch: u16,
282 /// Transaction timeout in milliseconds (optional, defaults to 60s)
283 timeout_ms: Option<u64>,
284 },
285
286 /// Add partitions to an active transaction
287 AddPartitionsToTxn {
288 /// Transaction ID
289 txn_id: String,
290 /// Producer ID
291 producer_id: u64,
292 /// Producer epoch
293 producer_epoch: u16,
294 /// Partitions to add (topic, partition pairs)
295 partitions: Vec<(String, u32)>,
296 },
297
298 /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
299 TransactionalPublish {
300 /// Transaction ID
301 txn_id: String,
302 topic: String,
303 partition: Option<u32>,
304 #[serde(with = "crate::serde_utils::option_bytes_serde")]
305 key: Option<Bytes>,
306 #[serde(with = "crate::serde_utils::bytes_serde")]
307 value: Bytes,
308 /// Producer ID
309 producer_id: u64,
310 /// Producer epoch
311 producer_epoch: u16,
312 /// Sequence number
313 sequence: i32,
314 /// Leader epoch for fencing stale writes (§2.4)
315 #[serde(default)]
316 leader_epoch: Option<u64>,
317 },
318
319 /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
320 AddOffsetsToTxn {
321 /// Transaction ID
322 txn_id: String,
323 /// Producer ID
324 producer_id: u64,
325 /// Producer epoch
326 producer_epoch: u16,
327 /// Consumer group ID
328 group_id: String,
329 /// Offsets to commit (topic, partition, offset triples)
330 offsets: Vec<(String, u32, i64)>,
331 },
332
333 /// Commit a transaction
334 CommitTransaction {
335 /// Transaction ID
336 txn_id: String,
337 /// Producer ID
338 producer_id: u64,
339 /// Producer epoch
340 producer_epoch: u16,
341 },
342
343 /// Abort a transaction
344 AbortTransaction {
345 /// Transaction ID
346 txn_id: String,
347 /// Producer ID
348 producer_id: u64,
349 /// Producer epoch
350 producer_epoch: u16,
351 },
352
353 // =========================================================================
354 // Per-Principal Quotas (Kafka Parity)
355 // =========================================================================
356 /// Describe quotas for entities
357 DescribeQuotas {
358 /// Entities to describe (empty = all)
359 /// Format: Vec<(entity_type, entity_name)>
360 /// entity_type: "user", "client-id", "consumer-group", "default"
361 /// entity_name: None for defaults, Some for specific entities
362 entities: Vec<(String, Option<String>)>,
363 },
364
365 /// Alter quotas for entities
366 AlterQuotas {
367 /// Quota alterations to apply
368 /// Each item: (entity_type, entity_name, quota_key, quota_value)
369 /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
370 /// quota_value: None to remove, Some(value) to set
371 alterations: Vec<QuotaAlteration>,
372 },
373
374 // =========================================================================
375 // Admin API (Kafka Parity)
376 // =========================================================================
377 /// Alter topic configuration
378 AlterTopicConfig {
379 /// Topic name
380 topic: String,
381 /// Configuration changes to apply
382 configs: Vec<TopicConfigEntry>,
383 },
384
385 /// Create additional partitions for an existing topic
386 CreatePartitions {
387 /// Topic name
388 topic: String,
389 /// New total partition count (must be > current count)
390 new_partition_count: u32,
391 /// Optional assignment of new partitions to brokers
392 /// If empty, broker will auto-assign
393 assignments: Vec<Vec<String>>,
394 },
395
396 /// Delete records before a given offset (log truncation)
397 DeleteRecords {
398 /// Topic name
399 topic: String,
400 /// Partition-offset pairs: delete all records before these offsets
401 partition_offsets: Vec<(u32, u64)>,
402 },
403
404 /// Describe topic configurations
405 DescribeTopicConfigs {
406 /// Topics to describe (empty = all)
407 topics: Vec<String>,
408 },
409
410 /// Protocol version handshake
411 ///
412 /// Sent by the client as the first message after connecting.
413 /// The server validates the protocol version and returns compatibility info.
414 Handshake {
415 /// Client's protocol version (must match `PROTOCOL_VERSION`)
416 protocol_version: u32,
417 /// Optional client identifier for diagnostics
418 client_id: String,
419 },
420
421 // =========================================================================
422 // Consumer Group Coordination (Kafka-compatible protocol)
423 // =========================================================================
424 /// Join a consumer group. The coordinator assigns a member ID and,
425 /// once all members have joined, elects a leader and triggers a
426 /// rebalance. Returns the generation ID, member ID, and leader info.
427 JoinGroup {
428 /// Consumer group ID
429 group_id: String,
430 /// Member ID (empty string for first join; server assigns one)
431 member_id: String,
432 /// Session timeout in milliseconds — if no heartbeat is received
433 /// within this period, the member is considered dead and a
434 /// rebalance is triggered.
435 session_timeout_ms: u32,
436 /// Rebalance timeout in milliseconds — maximum time the
437 /// coordinator waits for all members to join during a rebalance.
438 rebalance_timeout_ms: u32,
439 /// Protocol type (e.g. "consumer")
440 protocol_type: String,
441 /// Topics this member wants to consume
442 subscriptions: Vec<String>,
443 },
444
445 /// Sync a consumer group — the leader sends partition assignments
446 /// to the coordinator, and all members (including the leader)
447 /// receive their individual assignments.
448 SyncGroup {
449 /// Consumer group ID
450 group_id: String,
451 /// Generation ID from JoinGroup response
452 generation_id: u32,
453 /// This member's ID
454 member_id: String,
455 /// Assignments (only sent by leader; empty for followers).
456 /// Each entry is `(member_id, Vec<(topic, Vec<partition>)>)`.
457 assignments: SyncGroupAssignments,
458 },
459
460 /// Heartbeat — sent periodically by group members to keep their
461 /// session alive. The response tells the member whether a
462 /// rebalance is needed.
463 Heartbeat {
464 /// Consumer group ID
465 group_id: String,
466 /// Generation ID
467 generation_id: u32,
468 /// This member's ID
469 member_id: String,
470 },
471
472 /// Leave a consumer group gracefully. Triggers an immediate
473 /// rebalance for the remaining members.
474 LeaveGroup {
475 /// Consumer group ID
476 group_id: String,
477 /// This member's ID
478 member_id: String,
479 },
480
481 // =========================================================================
482 // Batch Publish (hot-path optimization)
483 // =========================================================================
484 /// Publish a batch of records to a single (topic, partition) in one
485 /// wire message — eliminates per-record topic cloning and serialization
486 /// overhead compared to sending N individual `Publish` requests.
487 PublishBatch {
488 topic: String,
489 partition: Option<u32>,
490 records: Vec<BatchRecord>,
491 /// Leader epoch for data-path fencing (§2.4).
492 #[serde(default)]
493 leader_epoch: Option<u64>,
494 },
495
496 /// Idempotent batch publish — combines [`Request::IdempotentPublish`] semantics
497 /// with the batching efficiency of [`Request::PublishBatch`].
498 ///
499 /// Records are assigned sequence numbers `base_sequence..base_sequence + N - 1`.
500 IdempotentPublishBatch {
501 topic: String,
502 partition: Option<u32>,
503 records: Vec<BatchRecord>,
504 producer_id: u64,
505 producer_epoch: u16,
506 /// First sequence number in the batch
507 base_sequence: i32,
508 #[serde(default)]
509 leader_epoch: Option<u64>,
510 },
511}
512
513// Manual Debug impl to redact credentials from log output.
514// `Request::Authenticate { password }` and SASL/SCRAM variants contain
515// secrets that must never appear in debug logs.
516impl std::fmt::Debug for Request {
517 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518 match self {
519 #[allow(deprecated)]
520 Self::Authenticate {
521 username,
522 require_tls,
523 ..
524 } => f
525 .debug_struct("Authenticate")
526 .field("username", username)
527 .field("password", &"[REDACTED]")
528 .field("require_tls", require_tls)
529 .finish(),
530 Self::SaslAuthenticate { mechanism, .. } => f
531 .debug_struct("SaslAuthenticate")
532 .field("mechanism", mechanism)
533 .field("auth_bytes", &"[REDACTED]")
534 .finish(),
535 Self::ScramClientFirst { .. } => f
536 .debug_struct("ScramClientFirst")
537 .field("message", &"[REDACTED]")
538 .finish(),
539 Self::ScramClientFinal { .. } => f
540 .debug_struct("ScramClientFinal")
541 .field("message", &"[REDACTED]")
542 .finish(),
543 Self::Publish {
544 topic,
545 partition,
546 key,
547 value,
548 leader_epoch,
549 } => f
550 .debug_struct("Publish")
551 .field("topic", topic)
552 .field("partition", partition)
553 .field("key", key)
554 .field("value_len", &value.len())
555 .field("leader_epoch", leader_epoch)
556 .finish(),
557 Self::Consume {
558 topic,
559 partition,
560 offset,
561 max_messages,
562 isolation_level,
563 max_wait_ms,
564 } => f
565 .debug_struct("Consume")
566 .field("topic", topic)
567 .field("partition", partition)
568 .field("offset", offset)
569 .field("max_messages", max_messages)
570 .field("isolation_level", isolation_level)
571 .field("max_wait_ms", max_wait_ms)
572 .finish(),
573 Self::CreateTopic { name, partitions } => f
574 .debug_struct("CreateTopic")
575 .field("name", name)
576 .field("partitions", partitions)
577 .finish(),
578 Self::ListTopics => write!(f, "ListTopics"),
579 Self::DeleteTopic { name } => {
580 f.debug_struct("DeleteTopic").field("name", name).finish()
581 }
582 Self::CommitOffset {
583 consumer_group,
584 topic,
585 partition,
586 offset,
587 } => f
588 .debug_struct("CommitOffset")
589 .field("consumer_group", consumer_group)
590 .field("topic", topic)
591 .field("partition", partition)
592 .field("offset", offset)
593 .finish(),
594 Self::GetOffset {
595 consumer_group,
596 topic,
597 partition,
598 } => f
599 .debug_struct("GetOffset")
600 .field("consumer_group", consumer_group)
601 .field("topic", topic)
602 .field("partition", partition)
603 .finish(),
604 Self::GetMetadata { topic } => {
605 f.debug_struct("GetMetadata").field("topic", topic).finish()
606 }
607 Self::GetClusterMetadata { topics } => f
608 .debug_struct("GetClusterMetadata")
609 .field("topics", topics)
610 .finish(),
611 Self::Ping => write!(f, "Ping"),
612 Self::GetOffsetBounds { topic, partition } => f
613 .debug_struct("GetOffsetBounds")
614 .field("topic", topic)
615 .field("partition", partition)
616 .finish(),
617 Self::ListGroups => write!(f, "ListGroups"),
618 Self::DescribeGroup { consumer_group } => f
619 .debug_struct("DescribeGroup")
620 .field("consumer_group", consumer_group)
621 .finish(),
622 Self::DeleteGroup { consumer_group } => f
623 .debug_struct("DeleteGroup")
624 .field("consumer_group", consumer_group)
625 .finish(),
626 Self::GetOffsetForTimestamp {
627 topic,
628 partition,
629 timestamp_ms,
630 } => f
631 .debug_struct("GetOffsetForTimestamp")
632 .field("topic", topic)
633 .field("partition", partition)
634 .field("timestamp_ms", timestamp_ms)
635 .finish(),
636 Self::InitProducerId { producer_id } => f
637 .debug_struct("InitProducerId")
638 .field("producer_id", producer_id)
639 .finish(),
640 Self::IdempotentPublish {
641 topic,
642 partition,
643 producer_id,
644 producer_epoch,
645 sequence,
646 ..
647 } => f
648 .debug_struct("IdempotentPublish")
649 .field("topic", topic)
650 .field("partition", partition)
651 .field("producer_id", producer_id)
652 .field("producer_epoch", producer_epoch)
653 .field("sequence", sequence)
654 .finish(),
655 Self::BeginTransaction {
656 txn_id,
657 producer_id,
658 producer_epoch,
659 timeout_ms,
660 } => f
661 .debug_struct("BeginTransaction")
662 .field("txn_id", txn_id)
663 .field("producer_id", producer_id)
664 .field("producer_epoch", producer_epoch)
665 .field("timeout_ms", timeout_ms)
666 .finish(),
667 Self::AddPartitionsToTxn {
668 txn_id,
669 producer_id,
670 producer_epoch,
671 partitions,
672 } => f
673 .debug_struct("AddPartitionsToTxn")
674 .field("txn_id", txn_id)
675 .field("producer_id", producer_id)
676 .field("producer_epoch", producer_epoch)
677 .field("partitions", partitions)
678 .finish(),
679 Self::TransactionalPublish {
680 txn_id,
681 topic,
682 partition,
683 producer_id,
684 producer_epoch,
685 sequence,
686 ..
687 } => f
688 .debug_struct("TransactionalPublish")
689 .field("txn_id", txn_id)
690 .field("topic", topic)
691 .field("partition", partition)
692 .field("producer_id", producer_id)
693 .field("producer_epoch", producer_epoch)
694 .field("sequence", sequence)
695 .finish(),
696 Self::AddOffsetsToTxn {
697 txn_id,
698 producer_id,
699 producer_epoch,
700 group_id,
701 offsets,
702 } => f
703 .debug_struct("AddOffsetsToTxn")
704 .field("txn_id", txn_id)
705 .field("producer_id", producer_id)
706 .field("producer_epoch", producer_epoch)
707 .field("group_id", group_id)
708 .field("offsets", offsets)
709 .finish(),
710 Self::CommitTransaction {
711 txn_id,
712 producer_id,
713 producer_epoch,
714 } => f
715 .debug_struct("CommitTransaction")
716 .field("txn_id", txn_id)
717 .field("producer_id", producer_id)
718 .field("producer_epoch", producer_epoch)
719 .finish(),
720 Self::AbortTransaction {
721 txn_id,
722 producer_id,
723 producer_epoch,
724 } => f
725 .debug_struct("AbortTransaction")
726 .field("txn_id", txn_id)
727 .field("producer_id", producer_id)
728 .field("producer_epoch", producer_epoch)
729 .finish(),
730 Self::DescribeQuotas { entities } => f
731 .debug_struct("DescribeQuotas")
732 .field("entities", entities)
733 .finish(),
734 Self::AlterQuotas { alterations } => f
735 .debug_struct("AlterQuotas")
736 .field("alterations", alterations)
737 .finish(),
738 Self::AlterTopicConfig { topic, configs } => f
739 .debug_struct("AlterTopicConfig")
740 .field("topic", topic)
741 .field("configs", configs)
742 .finish(),
743 Self::CreatePartitions {
744 topic,
745 new_partition_count,
746 assignments,
747 } => f
748 .debug_struct("CreatePartitions")
749 .field("topic", topic)
750 .field("new_partition_count", new_partition_count)
751 .field("assignments", assignments)
752 .finish(),
753 Self::DeleteRecords {
754 topic,
755 partition_offsets,
756 } => f
757 .debug_struct("DeleteRecords")
758 .field("topic", topic)
759 .field("partition_offsets", partition_offsets)
760 .finish(),
761 Self::DescribeTopicConfigs { topics } => f
762 .debug_struct("DescribeTopicConfigs")
763 .field("topics", topics)
764 .finish(),
765 Self::Handshake {
766 protocol_version,
767 client_id,
768 } => f
769 .debug_struct("Handshake")
770 .field("protocol_version", protocol_version)
771 .field("client_id", client_id)
772 .finish(),
773 Self::JoinGroup {
774 group_id,
775 member_id,
776 ..
777 } => f
778 .debug_struct("JoinGroup")
779 .field("group_id", group_id)
780 .field("member_id", member_id)
781 .finish(),
782 Self::SyncGroup {
783 group_id,
784 generation_id,
785 member_id,
786 ..
787 } => f
788 .debug_struct("SyncGroup")
789 .field("group_id", group_id)
790 .field("generation_id", generation_id)
791 .field("member_id", member_id)
792 .finish(),
793 Self::Heartbeat {
794 group_id,
795 generation_id,
796 member_id,
797 } => f
798 .debug_struct("Heartbeat")
799 .field("group_id", group_id)
800 .field("generation_id", generation_id)
801 .field("member_id", member_id)
802 .finish(),
803 Self::LeaveGroup {
804 group_id,
805 member_id,
806 } => f
807 .debug_struct("LeaveGroup")
808 .field("group_id", group_id)
809 .field("member_id", member_id)
810 .finish(),
811 Self::PublishBatch {
812 topic,
813 partition,
814 records,
815 leader_epoch,
816 } => f
817 .debug_struct("PublishBatch")
818 .field("topic", topic)
819 .field("partition", partition)
820 .field("record_count", &records.len())
821 .field("leader_epoch", leader_epoch)
822 .finish(),
823 Self::IdempotentPublishBatch {
824 topic,
825 partition,
826 records,
827 producer_id,
828 producer_epoch,
829 base_sequence,
830 leader_epoch,
831 } => f
832 .debug_struct("IdempotentPublishBatch")
833 .field("topic", topic)
834 .field("partition", partition)
835 .field("record_count", &records.len())
836 .field("producer_id", producer_id)
837 .field("producer_epoch", producer_epoch)
838 .field("base_sequence", base_sequence)
839 .field("leader_epoch", leader_epoch)
840 .finish(),
841 }
842 }
843}
844
845/// Protocol response messages
846///
847/// # Stability
848///
849/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
850/// Adding new variants should only be done at the end of the enum.
851#[derive(Debug, Clone, Serialize, Deserialize)]
852pub enum Response {
853 /// Authentication successful
854 Authenticated {
855 /// Session token for subsequent requests
856 session_id: String,
857 /// Session timeout in seconds
858 expires_in: u64,
859 },
860
861 /// SCRAM-SHA-256: Server-first message (challenge)
862 ScramServerFirst {
863 /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
864 #[serde(with = "crate::serde_utils::bytes_serde")]
865 message: Bytes,
866 },
867
868 /// SCRAM-SHA-256: Server-final message (verification or error)
869 ScramServerFinal {
870 /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
871 #[serde(with = "crate::serde_utils::bytes_serde")]
872 message: Bytes,
873 /// Session ID (if authentication succeeded)
874 session_id: Option<String>,
875 /// Session timeout in seconds (if authentication succeeded)
876 expires_in: Option<u64>,
877 },
878
879 /// Success response with offset
880 Published { offset: u64, partition: u32 },
881
882 /// Messages response
883 Messages { messages: Vec<MessageData> },
884
885 /// Topic created
886 TopicCreated { name: String, partitions: u32 },
887
888 /// List of topics
889 Topics { topics: Vec<String> },
890
891 /// Topic deleted
892 TopicDeleted,
893
894 /// Offset committed
895 OffsetCommitted,
896
897 /// Offset response
898 Offset { offset: Option<u64> },
899
900 /// Metadata
901 Metadata { name: String, partitions: u32 },
902
903 /// Full cluster metadata for topic(s)
904 ClusterMetadata {
905 /// Controller node ID (Raft leader)
906 controller_id: Option<String>,
907 /// Broker/node list
908 brokers: Vec<BrokerInfo>,
909 /// Topic metadata
910 topics: Vec<TopicMetadata>,
911 },
912
913 /// Pong
914 Pong,
915
916 /// Offset bounds for a partition
917 OffsetBounds { earliest: u64, latest: u64 },
918
919 /// List of consumer groups
920 Groups { groups: Vec<String> },
921
922 /// Consumer group details with all offsets
923 GroupDescription {
924 consumer_group: String,
925 /// topic → partition → offset
926 offsets: HashMap<String, HashMap<u32, u64>>,
927 },
928
929 /// Consumer group deleted
930 GroupDeleted,
931
932 /// Offset for a timestamp
933 OffsetForTimestamp {
934 /// The first offset with timestamp >= the requested timestamp
935 /// None if no matching offset was found
936 offset: Option<u64>,
937 },
938
939 /// Error response
940 Error { message: String },
941
942 /// Success
943 Ok,
944
945 // =========================================================================
946 // Idempotent Producer
947 // =========================================================================
948 /// Producer ID initialized
949 ProducerIdInitialized {
950 /// Assigned or existing producer ID
951 producer_id: u64,
952 /// Current epoch (increments on reconnect)
953 producer_epoch: u16,
954 },
955
956 /// Idempotent publish result
957 IdempotentPublished {
958 /// Offset where message was written
959 offset: u64,
960 /// Partition the message was written to
961 partition: u32,
962 /// Whether this was a duplicate (message already existed)
963 duplicate: bool,
964 },
965
966 // =========================================================================
967 // Native Transactions
968 // =========================================================================
969 /// Transaction started successfully
970 TransactionStarted {
971 /// Transaction ID
972 txn_id: String,
973 },
974
975 /// Partitions added to transaction
976 PartitionsAddedToTxn {
977 /// Transaction ID
978 txn_id: String,
979 /// Number of partitions now in transaction
980 partition_count: usize,
981 },
982
983 /// Transactional publish result
984 TransactionalPublished {
985 /// Offset where message was written (pending commit)
986 offset: u64,
987 /// Partition the message was written to
988 partition: u32,
989 /// Sequence number accepted
990 sequence: i32,
991 },
992
993 /// Offsets added to transaction
994 OffsetsAddedToTxn {
995 /// Transaction ID
996 txn_id: String,
997 },
998
999 /// Transaction committed
1000 TransactionCommitted {
1001 /// Transaction ID
1002 txn_id: String,
1003 },
1004
1005 /// Transaction aborted
1006 TransactionAborted {
1007 /// Transaction ID
1008 txn_id: String,
1009 },
1010
1011 // =========================================================================
1012 // Per-Principal Quotas (Kafka Parity)
1013 // =========================================================================
1014 /// Quota descriptions
1015 QuotasDescribed {
1016 /// List of quota entries
1017 entries: Vec<QuotaEntry>,
1018 },
1019
1020 /// Quotas altered successfully
1021 QuotasAltered {
1022 /// Number of quota alterations applied
1023 altered_count: usize,
1024 },
1025
1026 /// Throttle response (returned when quota exceeded)
1027 Throttled {
1028 /// Time to wait before retrying (milliseconds)
1029 throttle_time_ms: u64,
1030 /// Quota type that was exceeded
1031 quota_type: String,
1032 /// Entity that exceeded quota
1033 entity: String,
1034 },
1035
1036 // =========================================================================
1037 // Admin API (Kafka Parity)
1038 // =========================================================================
1039 /// Topic configuration altered
1040 TopicConfigAltered {
1041 /// Topic name
1042 topic: String,
1043 /// Number of configurations changed
1044 changed_count: usize,
1045 },
1046
1047 /// Partitions created
1048 PartitionsCreated {
1049 /// Topic name
1050 topic: String,
1051 /// New total partition count
1052 new_partition_count: u32,
1053 },
1054
1055 /// Records deleted
1056 RecordsDeleted {
1057 /// Topic name
1058 topic: String,
1059 /// Results per partition
1060 results: Vec<DeleteRecordsResult>,
1061 },
1062
1063 /// Topic configurations described
1064 TopicConfigsDescribed {
1065 /// Configuration descriptions per topic
1066 configs: Vec<TopicConfigDescription>,
1067 },
1068
1069 /// Protocol version handshake response
1070 HandshakeResult {
1071 /// Server's protocol version
1072 server_version: u32,
1073 /// Whether the client version is compatible
1074 compatible: bool,
1075 /// Human-readable message (e.g. reason for incompatibility)
1076 message: String,
1077 },
1078
1079 // =========================================================================
1080 // Consumer Group Coordination
1081 // =========================================================================
1082 /// JoinGroup response — returned to each member after all members
1083 /// have joined (or the rebalance timeout expires).
1084 JoinGroupResult {
1085 /// Generation ID (monotonically increasing; bumped on each rebalance)
1086 generation_id: u32,
1087 /// Protocol type (e.g. "consumer")
1088 protocol_type: String,
1089 /// This member's assigned member ID
1090 member_id: String,
1091 /// The leader's member ID (if member_id == leader_id, this member is leader)
1092 leader_id: String,
1093 /// All members in the group (only populated for the leader)
1094 /// Each entry is (member_id, subscriptions)
1095 members: Vec<(String, Vec<String>)>,
1096 },
1097
1098 /// SyncGroup response — partition assignments for this member.
1099 SyncGroupResult {
1100 /// This member's partition assignments: `Vec<(topic, Vec<partition>)>`
1101 assignments: Vec<(String, Vec<u32>)>,
1102 },
1103
1104 /// Heartbeat response — signals whether the member should rejoin.
1105 HeartbeatResult {
1106 /// Error code: 0 = OK, 27 = REBALANCE_IN_PROGRESS (rejoin needed)
1107 error_code: i32,
1108 },
1109
1110 /// LeaveGroup response
1111 LeaveGroupResult,
1112
1113 // =========================================================================
1114 // Batch Publish Responses
1115 // =========================================================================
1116 /// Result of a [`Request::PublishBatch`].
1117 ///
1118 /// All records are written to contiguous offsets starting at `base_offset`.
1119 PublishedBatch {
1120 /// Offset of the first record in the batch
1121 base_offset: u64,
1122 /// Partition the records were written to
1123 partition: u32,
1124 /// Number of records successfully written
1125 record_count: u32,
1126 },
1127
1128 /// Result of a [`Request::IdempotentPublishBatch`].
1129 IdempotentPublishedBatch {
1130 /// Offset of the first record in the batch
1131 base_offset: u64,
1132 /// Partition the records were written to
1133 partition: u32,
1134 /// Number of records successfully written
1135 record_count: u32,
1136 /// Whether this was a duplicate batch
1137 duplicate: bool,
1138 },
1139}
1140
1141impl Request {
1142 /// Serialize request to bytes (postcard format, no format prefix)
1143 ///
1144 /// For internal Rust-to-Rust communication where format is known.
1145 /// Use `to_wire()` for wire transmission with format detection support.
1146 #[inline]
1147 pub fn to_bytes(&self) -> Result<Vec<u8>> {
1148 Ok(postcard::to_allocvec(self)?)
1149 }
1150
1151 /// Deserialize request from bytes (postcard format)
1152 ///
1153 /// For internal Rust-to-Rust communication where format is known.
1154 /// Use `from_wire()` for wire transmission with format detection support.
1155 #[inline]
1156 pub fn from_bytes(data: &[u8]) -> Result<Self> {
1157 let request: Self = postcard::from_bytes(data)?;
1158 request.validate()?;
1159 Ok(request)
1160 }
1161
1162 /// Post-deserialization bounds validation.
1163 ///
1164 /// Prevents crafted payloads with oversized strings / vecs from being
1165 /// accepted. Must be called after every `from_bytes` / `from_wire`.
1166 #[allow(deprecated)]
1167 pub fn validate(&self) -> Result<()> {
1168 use crate::{MAX_AUTH_PAYLOAD, MAX_LIST_LEN, MAX_NAME_LEN};
1169
1170 let check_name = |s: &str, field: &str| -> Result<()> {
1171 if s.len() > MAX_NAME_LEN {
1172 return Err(crate::ProtocolError::InvalidFormat(format!(
1173 "{field} exceeds max length ({} > {MAX_NAME_LEN})",
1174 s.len()
1175 )));
1176 }
1177 Ok(())
1178 };
1179 let check_list = |len: usize, field: &str| -> Result<()> {
1180 if len > MAX_LIST_LEN {
1181 return Err(crate::ProtocolError::InvalidFormat(format!(
1182 "{field} list exceeds max length ({len} > {MAX_LIST_LEN})"
1183 )));
1184 }
1185 Ok(())
1186 };
1187
1188 match self {
1189 Request::Authenticate {
1190 username, password, ..
1191 } => {
1192 check_name(username, "username")?;
1193 if password.len() > MAX_AUTH_PAYLOAD {
1194 return Err(crate::ProtocolError::InvalidFormat(
1195 "password exceeds max auth payload".into(),
1196 ));
1197 }
1198 }
1199 Request::SaslAuthenticate {
1200 mechanism,
1201 auth_bytes,
1202 } => {
1203 if mechanism.len() > MAX_NAME_LEN {
1204 return Err(crate::ProtocolError::InvalidFormat(
1205 "SASL mechanism name too long".into(),
1206 ));
1207 }
1208 if auth_bytes.len() > MAX_AUTH_PAYLOAD {
1209 return Err(crate::ProtocolError::InvalidFormat(
1210 "SASL auth bytes exceed max auth payload".into(),
1211 ));
1212 }
1213 }
1214 Request::ScramClientFirst { message } | Request::ScramClientFinal { message } => {
1215 if message.len() > MAX_AUTH_PAYLOAD {
1216 return Err(crate::ProtocolError::InvalidFormat(
1217 "SCRAM message exceeds max auth payload".into(),
1218 ));
1219 }
1220 }
1221 Request::Publish { topic, .. } => check_name(topic, "topic")?,
1222 Request::Consume { topic, .. } => check_name(topic, "topic")?,
1223 Request::CreateTopic { name, .. } => check_name(name, "topic")?,
1224 Request::DeleteTopic { name } => check_name(name, "topic")?,
1225 Request::CommitOffset {
1226 consumer_group,
1227 topic,
1228 ..
1229 } => {
1230 check_name(consumer_group, "consumer_group")?;
1231 check_name(topic, "topic")?;
1232 }
1233 Request::GetOffset {
1234 consumer_group,
1235 topic,
1236 ..
1237 } => {
1238 check_name(consumer_group, "consumer_group")?;
1239 check_name(topic, "topic")?;
1240 }
1241 Request::GetMetadata { topic } => check_name(topic, "topic")?,
1242 Request::GetClusterMetadata { topics, .. } => {
1243 check_list(topics.len(), "topics")?;
1244 for t in topics {
1245 check_name(t, "topic")?;
1246 }
1247 }
1248 Request::DescribeGroup { consumer_group } | Request::DeleteGroup { consumer_group } => {
1249 check_name(consumer_group, "consumer_group")?;
1250 }
1251 Request::GetOffsetForTimestamp { topic, .. } => check_name(topic, "topic")?,
1252 Request::InitProducerId { .. } => {}
1253 Request::IdempotentPublish { topic, .. } => check_name(topic, "topic")?,
1254 Request::BeginTransaction { txn_id, .. } => check_name(txn_id, "txn_id")?,
1255 Request::AddPartitionsToTxn {
1256 txn_id, partitions, ..
1257 } => {
1258 check_name(txn_id, "txn_id")?;
1259 check_list(partitions.len(), "partitions")?;
1260 }
1261 Request::TransactionalPublish { txn_id, topic, .. } => {
1262 check_name(txn_id, "txn_id")?;
1263 check_name(topic, "topic")?;
1264 }
1265 Request::AddOffsetsToTxn {
1266 txn_id, offsets, ..
1267 } => {
1268 check_name(txn_id, "txn_id")?;
1269 check_list(offsets.len(), "offsets")?;
1270 }
1271 Request::CommitTransaction { txn_id, .. }
1272 | Request::AbortTransaction { txn_id, .. } => {
1273 check_name(txn_id, "txn_id")?;
1274 }
1275 Request::DescribeQuotas { entities } => {
1276 check_list(entities.len(), "entities")?;
1277 }
1278 Request::AlterQuotas { alterations, .. } => {
1279 check_list(alterations.len(), "alterations")?;
1280 }
1281 Request::AlterTopicConfig { topic, configs, .. } => {
1282 check_name(topic, "topic")?;
1283 check_list(configs.len(), "configs")?;
1284 }
1285 Request::CreatePartitions {
1286 topic, assignments, ..
1287 } => {
1288 check_name(topic, "topic")?;
1289 check_list(assignments.len(), "assignments")?;
1290 }
1291 Request::DeleteRecords {
1292 topic,
1293 partition_offsets,
1294 } => {
1295 check_name(topic, "topic")?;
1296 check_list(partition_offsets.len(), "partition_offsets")?;
1297 }
1298 Request::DescribeTopicConfigs { topics } => {
1299 check_list(topics.len(), "topics")?;
1300 for t in topics {
1301 check_name(t, "topic")?;
1302 }
1303 }
1304 Request::Handshake { client_id, .. } => {
1305 check_name(client_id, "client_id")?;
1306 }
1307 Request::JoinGroup {
1308 group_id,
1309 member_id,
1310 protocol_type,
1311 subscriptions,
1312 ..
1313 } => {
1314 check_name(group_id, "group_id")?;
1315 check_name(member_id, "member_id")?;
1316 check_name(protocol_type, "protocol_type")?;
1317 check_list(subscriptions.len(), "subscriptions")?;
1318 for t in subscriptions {
1319 check_name(t, "subscription_topic")?;
1320 }
1321 }
1322 Request::SyncGroup {
1323 group_id,
1324 member_id,
1325 assignments,
1326 ..
1327 } => {
1328 check_name(group_id, "group_id")?;
1329 check_name(member_id, "member_id")?;
1330 check_list(assignments.len(), "assignments")?;
1331 for (mid, topic_parts) in assignments {
1332 check_name(mid, "assignment_member_id")?;
1333 check_list(topic_parts.len(), "topic_partitions")?;
1334 for (topic, parts) in topic_parts {
1335 check_name(topic, "assignment_topic")?;
1336 check_list(parts.len(), "partitions")?;
1337 }
1338 }
1339 }
1340 Request::Heartbeat {
1341 group_id,
1342 member_id,
1343 ..
1344 } => {
1345 check_name(group_id, "group_id")?;
1346 check_name(member_id, "member_id")?;
1347 }
1348 Request::LeaveGroup {
1349 group_id,
1350 member_id,
1351 } => {
1352 check_name(group_id, "group_id")?;
1353 check_name(member_id, "member_id")?;
1354 }
1355 Request::PublishBatch { topic, records, .. } => {
1356 check_name(topic, "topic")?;
1357 check_list(records.len(), "records")?;
1358 }
1359 Request::IdempotentPublishBatch { topic, records, .. } => {
1360 check_name(topic, "topic")?;
1361 check_list(records.len(), "records")?;
1362 }
1363 // Enum variants with no string/vec payload need no validation.
1364 _ => {}
1365 }
1366 Ok(())
1367 }
1368
1369 /// Serialize request with wire format prefix
1370 ///
1371 /// Wire format: `[format_byte][correlation_id (4 bytes BE)][payload]`
1372 /// - format_byte: 0x00 = postcard, 0x01 = protobuf
1373 /// - correlation_id: 4-byte big-endian u32 for request-response matching
1374 /// - payload: serialized message
1375 ///
1376 /// ## Correlation ID sizing
1377 ///
1378 /// The wire protocol uses `u32` for correlation IDs (4 bytes), which is
1379 /// the Kafka-compatible choice and sufficient for client-server RPCs
1380 /// (4 billion in-flight requests before wrap-around). The cluster-internal
1381 /// protocol (`rivven-cluster`) uses `u64` for its own RPC correlation to
1382 /// avoid any wrap-around concern on high-throughput inter-node links.
1383 /// The two namespaces are independent and never cross boundaries.
1384 ///
1385 /// Note: Length prefix is NOT included (handled by transport layer)
1386 ///
1387 /// # Errors
1388 ///
1389 /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
1390 /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
1391 #[inline]
1392 pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
1393 let result = match format {
1394 crate::WireFormat::Postcard => {
1395 // Single allocation — serialize directly into the output
1396 // Vec via `postcard::to_extend` instead of double-allocating
1397 // (to_allocvec → intermediate Vec → copy into result Vec).
1398 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + 128);
1399 result.push(format.as_byte());
1400 result.extend_from_slice(&correlation_id.to_be_bytes());
1401 postcard::to_extend(self, result)?
1402 }
1403 crate::WireFormat::Protobuf => {
1404 // Protobuf requires the `protobuf` feature
1405 #[cfg(feature = "protobuf")]
1406 {
1407 let payload = self.to_proto_bytes()?;
1408 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
1409 result.push(format.as_byte());
1410 result.extend_from_slice(&correlation_id.to_be_bytes());
1411 result.extend_from_slice(&payload);
1412 result
1413 }
1414 #[cfg(not(feature = "protobuf"))]
1415 {
1416 return Err(crate::ProtocolError::Serialization(
1417 "Protobuf support requires the 'protobuf' feature".into(),
1418 ));
1419 }
1420 }
1421 };
1422
1423 // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
1424 if result.len() > crate::MAX_MESSAGE_SIZE {
1425 return Err(crate::ProtocolError::MessageTooLarge(
1426 result.len(),
1427 crate::MAX_MESSAGE_SIZE,
1428 ));
1429 }
1430
1431 Ok(result)
1432 }
1433
1434 /// Deserialize request with format auto-detection
1435 ///
1436 /// Detects format from first byte, reads correlation_id, and deserializes accordingly.
1437 /// Returns the deserialized request, the detected format, and the correlation_id.
1438 #[inline]
1439 pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
1440 if data.len() < crate::WIRE_HEADER_SIZE {
1441 return Err(crate::ProtocolError::Serialization(
1442 "Wire data too short (need format byte + correlation_id)".into(),
1443 ));
1444 }
1445
1446 let format_byte = data[0];
1447 let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1448 crate::ProtocolError::Serialization(format!(
1449 "Unknown wire format: 0x{:02x}",
1450 format_byte
1451 ))
1452 })?;
1453
1454 let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1455 let payload = &data[crate::WIRE_HEADER_SIZE..];
1456
1457 match format {
1458 crate::WireFormat::Postcard => {
1459 let request: Self = postcard::from_bytes(payload)?;
1460 request.validate()?;
1461 Ok((request, format, correlation_id))
1462 }
1463 crate::WireFormat::Protobuf => {
1464 #[cfg(feature = "protobuf")]
1465 {
1466 let request = Self::from_proto_bytes(payload)?;
1467 Ok((request, format, correlation_id))
1468 }
1469 #[cfg(not(feature = "protobuf"))]
1470 {
1471 Err(crate::ProtocolError::Serialization(
1472 "Protobuf support requires the 'protobuf' feature".into(),
1473 ))
1474 }
1475 }
1476 }
1477 }
1478}
1479
1480impl Response {
1481 /// Serialize response to bytes (postcard format, no format prefix)
1482 #[inline]
1483 pub fn to_bytes(&self) -> Result<Vec<u8>> {
1484 Ok(postcard::to_allocvec(self)?)
1485 }
1486
1487 /// Deserialize response from bytes (postcard format)
1488 #[inline]
1489 pub fn from_bytes(data: &[u8]) -> Result<Self> {
1490 let response: Self = postcard::from_bytes(data)?;
1491 response.validate()?;
1492 Ok(response)
1493 }
1494
1495 /// Post-deserialization bounds validation for responses.
1496 pub fn validate(&self) -> Result<()> {
1497 use crate::{MAX_LIST_LEN, MAX_MESSAGES_PER_RESPONSE, MAX_NAME_LEN};
1498
1499 match self {
1500 Response::Messages { messages } => {
1501 if messages.len() > MAX_MESSAGES_PER_RESPONSE {
1502 return Err(crate::ProtocolError::InvalidFormat(format!(
1503 "messages batch size {} exceeds max {MAX_MESSAGES_PER_RESPONSE}",
1504 messages.len()
1505 )));
1506 }
1507 }
1508 Response::Topics { topics } | Response::Groups { groups: topics } => {
1509 if topics.len() > MAX_LIST_LEN {
1510 return Err(crate::ProtocolError::InvalidFormat(format!(
1511 "list length {} exceeds max {MAX_LIST_LEN}",
1512 topics.len()
1513 )));
1514 }
1515 for t in topics {
1516 if t.len() > MAX_NAME_LEN {
1517 return Err(crate::ProtocolError::InvalidFormat(format!(
1518 "name length {} exceeds max {MAX_NAME_LEN}",
1519 t.len()
1520 )));
1521 }
1522 }
1523 }
1524 Response::ClusterMetadata {
1525 brokers, topics, ..
1526 } => {
1527 if brokers.len() > MAX_LIST_LEN {
1528 return Err(crate::ProtocolError::InvalidFormat(
1529 "brokers list too large".into(),
1530 ));
1531 }
1532 if topics.len() > MAX_LIST_LEN {
1533 return Err(crate::ProtocolError::InvalidFormat(
1534 "topics list too large".into(),
1535 ));
1536 }
1537 }
1538 Response::RecordsDeleted { results, .. } => {
1539 if results.len() > MAX_LIST_LEN {
1540 return Err(crate::ProtocolError::InvalidFormat(
1541 "delete results list too large".into(),
1542 ));
1543 }
1544 }
1545 Response::Error { message } => {
1546 if message.len() > crate::MAX_MESSAGE_SIZE {
1547 return Err(crate::ProtocolError::InvalidFormat(
1548 "error message exceeds max message size".into(),
1549 ));
1550 }
1551 }
1552 _ => {}
1553 }
1554 Ok(())
1555 }
1556
1557 /// Serialize response with wire format prefix
1558 ///
1559 /// # Errors
1560 ///
1561 /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
1562 /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
1563 #[inline]
1564 pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
1565 let result = match format {
1566 crate::WireFormat::Postcard => {
1567 // Estimate payload size to avoid reallocations.
1568 // For Messages responses, use message count × estimated per-message
1569 // size (offset 8 + partition 4 + value ~256 + timestamp 8 + overhead ~24 ≈ 300 bytes).
1570 // For other variants the default 128-byte hint is usually sufficient.
1571 let size_hint = match self {
1572 Response::Messages { messages } => messages.len().saturating_mul(300).max(128),
1573 _ => 128,
1574 };
1575 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + size_hint);
1576 result.push(format.as_byte());
1577 result.extend_from_slice(&correlation_id.to_be_bytes());
1578 postcard::to_extend(self, result)?
1579 }
1580 crate::WireFormat::Protobuf => {
1581 #[cfg(feature = "protobuf")]
1582 {
1583 let payload = self.to_proto_bytes()?;
1584 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
1585 result.push(format.as_byte());
1586 result.extend_from_slice(&correlation_id.to_be_bytes());
1587 result.extend_from_slice(&payload);
1588 result
1589 }
1590 #[cfg(not(feature = "protobuf"))]
1591 {
1592 return Err(crate::ProtocolError::Serialization(
1593 "Protobuf support requires the 'protobuf' feature".into(),
1594 ));
1595 }
1596 }
1597 };
1598
1599 // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
1600 if result.len() > crate::MAX_MESSAGE_SIZE {
1601 return Err(crate::ProtocolError::MessageTooLarge(
1602 result.len(),
1603 crate::MAX_MESSAGE_SIZE,
1604 ));
1605 }
1606
1607 Ok(result)
1608 }
1609
1610 /// Deserialize response with format auto-detection
1611 #[inline]
1612 pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
1613 if data.len() < crate::WIRE_HEADER_SIZE {
1614 return Err(crate::ProtocolError::Serialization(
1615 "Wire data too short (need format byte + correlation_id)".into(),
1616 ));
1617 }
1618
1619 let format_byte = data[0];
1620 let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1621 crate::ProtocolError::Serialization(format!(
1622 "Unknown wire format: 0x{:02x}",
1623 format_byte
1624 ))
1625 })?;
1626
1627 let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1628 let payload = &data[crate::WIRE_HEADER_SIZE..];
1629
1630 match format {
1631 crate::WireFormat::Postcard => {
1632 let response: Self = postcard::from_bytes(payload)?;
1633 response.validate()?;
1634 Ok((response, format, correlation_id))
1635 }
1636 crate::WireFormat::Protobuf => {
1637 #[cfg(feature = "protobuf")]
1638 {
1639 let response = Self::from_proto_bytes(payload)?;
1640 Ok((response, format, correlation_id))
1641 }
1642 #[cfg(not(feature = "protobuf"))]
1643 {
1644 Err(crate::ProtocolError::Serialization(
1645 "Protobuf support requires the 'protobuf' feature".into(),
1646 ))
1647 }
1648 }
1649 }
1650 }
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655 use super::*;
1656
1657 #[test]
1658 #[allow(deprecated)]
1659 fn test_request_roundtrip() {
1660 let requests = vec![
1661 Request::Ping,
1662 Request::ListTopics,
1663 Request::CreateTopic {
1664 name: "test".to_string(),
1665 partitions: Some(4),
1666 },
1667 Request::Authenticate {
1668 username: "admin".to_string(),
1669 password: "secret".to_string(),
1670 require_tls: true,
1671 },
1672 ];
1673
1674 for req in requests {
1675 let bytes = req.to_bytes().unwrap();
1676 let decoded = Request::from_bytes(&bytes).unwrap();
1677 // Can't directly compare due to Debug, but serialization should succeed
1678 assert!(!bytes.is_empty());
1679 let _ = decoded; // Use decoded
1680 }
1681 }
1682
1683 #[test]
1684 fn test_response_roundtrip() {
1685 let responses = vec![
1686 Response::Pong,
1687 Response::Ok,
1688 Response::Topics {
1689 topics: vec!["a".to_string(), "b".to_string()],
1690 },
1691 Response::Error {
1692 message: "test error".to_string(),
1693 },
1694 ];
1695
1696 for resp in responses {
1697 let bytes = resp.to_bytes().unwrap();
1698 let decoded = Response::from_bytes(&bytes).unwrap();
1699 assert!(!bytes.is_empty());
1700 let _ = decoded;
1701 }
1702 }
1703
1704 #[test]
1705 fn test_request_wire_roundtrip() {
1706 let request = Request::Ping;
1707
1708 // Serialize with format prefix and correlation_id
1709 let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 42).unwrap();
1710
1711 // First byte should be format identifier
1712 assert_eq!(wire_bytes[0], 0x00); // Postcard format
1713
1714 // Deserialize with auto-detection
1715 let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1716 assert_eq!(format, crate::WireFormat::Postcard);
1717 assert_eq!(correlation_id, 42);
1718 assert!(matches!(decoded, Request::Ping));
1719 }
1720
1721 #[test]
1722 fn test_response_wire_roundtrip() {
1723 let response = Response::Pong;
1724
1725 // Serialize with format prefix and correlation_id
1726 let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 99).unwrap();
1727
1728 // First byte should be format identifier
1729 assert_eq!(wire_bytes[0], 0x00); // Postcard format
1730
1731 // Deserialize with auto-detection
1732 let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1733 assert_eq!(format, crate::WireFormat::Postcard);
1734 assert_eq!(correlation_id, 99);
1735 assert!(matches!(decoded, Response::Pong));
1736 }
1737
1738 #[test]
1739 fn test_wire_format_empty_data() {
1740 let result = Request::from_wire(&[]);
1741 assert!(result.is_err());
1742 }
1743
1744 #[test]
1745 fn test_wire_format_complex_request() {
1746 use bytes::Bytes;
1747
1748 let request = Request::Publish {
1749 topic: "test-topic".to_string(),
1750 partition: Some(3),
1751 key: Some(Bytes::from("my-key")),
1752 value: Bytes::from("hello world"),
1753 leader_epoch: None,
1754 };
1755
1756 let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 1).unwrap();
1757 assert_eq!(wire_bytes[0], 0x00);
1758
1759 let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1760 assert_eq!(format, crate::WireFormat::Postcard);
1761 assert_eq!(correlation_id, 1);
1762
1763 // Verify the decoded request matches
1764 if let Request::Publish {
1765 topic, partition, ..
1766 } = decoded
1767 {
1768 assert_eq!(topic, "test-topic");
1769 assert_eq!(partition, Some(3));
1770 } else {
1771 panic!("Expected Publish request");
1772 }
1773 }
1774
1775 #[test]
1776 fn test_wire_format_complex_response() {
1777 let response = Response::Published {
1778 offset: 12345,
1779 partition: 7,
1780 };
1781
1782 let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 2).unwrap();
1783 assert_eq!(wire_bytes[0], 0x00);
1784
1785 let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1786 assert_eq!(format, crate::WireFormat::Postcard);
1787 assert_eq!(correlation_id, 2);
1788
1789 if let Response::Published { offset, partition } = decoded {
1790 assert_eq!(offset, 12345);
1791 assert_eq!(partition, 7);
1792 } else {
1793 panic!("Expected Published response");
1794 }
1795 }
1796
1797 /// Snapshot test: postcard serializes enum variants by ordinal position.
1798 /// If someone reorders variants in Request or Response, this test fails
1799 /// because the byte prefix (discriminant) will change, breaking wire compat.
1800 ///
1801 /// Exhaustive — every Request variant is pinned.
1802 #[test]
1803 #[allow(deprecated)]
1804 fn test_postcard_wire_stability_request_discriminants() {
1805 use bytes::Bytes;
1806
1807 // Serialize a representative of each variant and check the leading
1808 // discriminant byte(s) haven't shifted. Postcard uses varint encoding
1809 // for enum discriminants.
1810 let test_cases: Vec<(Request, u8)> = vec![
1811 // variant 0: Authenticate
1812 (
1813 Request::Authenticate {
1814 username: "u".into(),
1815 password: "p".into(),
1816 require_tls: false,
1817 },
1818 0,
1819 ),
1820 // variant 1: SaslAuthenticate
1821 (
1822 Request::SaslAuthenticate {
1823 mechanism: Bytes::from("PLAIN"),
1824 auth_bytes: Bytes::from("data"),
1825 },
1826 1,
1827 ),
1828 // variant 2: ScramClientFirst
1829 (
1830 Request::ScramClientFirst {
1831 message: Bytes::from("n,,n=user,r=nonce"),
1832 },
1833 2,
1834 ),
1835 // variant 3: ScramClientFinal
1836 (
1837 Request::ScramClientFinal {
1838 message: Bytes::from("c=bind,r=nonce,p=proof"),
1839 },
1840 3,
1841 ),
1842 // variant 4: Publish
1843 (
1844 Request::Publish {
1845 topic: "t".into(),
1846 partition: None,
1847 key: None,
1848 value: Bytes::from("v"),
1849 leader_epoch: None,
1850 },
1851 4,
1852 ),
1853 // variant 5: Consume
1854 (
1855 Request::Consume {
1856 topic: "t".into(),
1857 partition: 0,
1858 offset: 0,
1859 max_messages: 1,
1860 isolation_level: None,
1861 max_wait_ms: None,
1862 },
1863 5,
1864 ),
1865 // variant 6: CreateTopic
1866 (
1867 Request::CreateTopic {
1868 name: "t".into(),
1869 partitions: None,
1870 },
1871 6,
1872 ),
1873 // variant 7: ListTopics
1874 (Request::ListTopics, 7),
1875 // variant 8: DeleteTopic
1876 (Request::DeleteTopic { name: "t".into() }, 8),
1877 // variant 9: CommitOffset
1878 (
1879 Request::CommitOffset {
1880 consumer_group: "g".into(),
1881 topic: "t".into(),
1882 partition: 0,
1883 offset: 0,
1884 },
1885 9,
1886 ),
1887 // variant 10: GetOffset
1888 (
1889 Request::GetOffset {
1890 consumer_group: "g".into(),
1891 topic: "t".into(),
1892 partition: 0,
1893 },
1894 10,
1895 ),
1896 // variant 11: GetMetadata
1897 (Request::GetMetadata { topic: "t".into() }, 11),
1898 // variant 12: GetClusterMetadata
1899 (Request::GetClusterMetadata { topics: vec![] }, 12),
1900 // variant 13: Ping
1901 (Request::Ping, 13),
1902 // variant 14: GetOffsetBounds
1903 (
1904 Request::GetOffsetBounds {
1905 topic: "t".into(),
1906 partition: 0,
1907 },
1908 14,
1909 ),
1910 // variant 15: ListGroups
1911 (Request::ListGroups, 15),
1912 // variant 16: DescribeGroup
1913 (
1914 Request::DescribeGroup {
1915 consumer_group: "g".into(),
1916 },
1917 16,
1918 ),
1919 // variant 17: DeleteGroup
1920 (
1921 Request::DeleteGroup {
1922 consumer_group: "g".into(),
1923 },
1924 17,
1925 ),
1926 // variant 18: GetOffsetForTimestamp
1927 (
1928 Request::GetOffsetForTimestamp {
1929 topic: "t".into(),
1930 partition: 0,
1931 timestamp_ms: 0,
1932 },
1933 18,
1934 ),
1935 // variant 19: InitProducerId
1936 (Request::InitProducerId { producer_id: None }, 19),
1937 // variant 20: IdempotentPublish
1938 (
1939 Request::IdempotentPublish {
1940 topic: "t".into(),
1941 partition: None,
1942 key: None,
1943 value: Bytes::from("v"),
1944 producer_id: 1,
1945 producer_epoch: 0,
1946 sequence: 0,
1947 leader_epoch: None,
1948 },
1949 20,
1950 ),
1951 // variant 21: BeginTransaction
1952 (
1953 Request::BeginTransaction {
1954 txn_id: "tx".into(),
1955 producer_id: 1,
1956 producer_epoch: 0,
1957 timeout_ms: None,
1958 },
1959 21,
1960 ),
1961 // variant 22: AddPartitionsToTxn
1962 (
1963 Request::AddPartitionsToTxn {
1964 txn_id: "tx".into(),
1965 producer_id: 1,
1966 producer_epoch: 0,
1967 partitions: vec![],
1968 },
1969 22,
1970 ),
1971 // variant 23: TransactionalPublish
1972 (
1973 Request::TransactionalPublish {
1974 txn_id: "tx".into(),
1975 topic: "t".into(),
1976 partition: None,
1977 key: None,
1978 value: Bytes::from("v"),
1979 producer_id: 1,
1980 producer_epoch: 0,
1981 sequence: 0,
1982 leader_epoch: None,
1983 },
1984 23,
1985 ),
1986 // variant 24: AddOffsetsToTxn
1987 (
1988 Request::AddOffsetsToTxn {
1989 txn_id: "tx".into(),
1990 producer_id: 1,
1991 producer_epoch: 0,
1992 group_id: "g".into(),
1993 offsets: vec![],
1994 },
1995 24,
1996 ),
1997 // variant 25: CommitTransaction
1998 (
1999 Request::CommitTransaction {
2000 txn_id: "tx".into(),
2001 producer_id: 1,
2002 producer_epoch: 0,
2003 },
2004 25,
2005 ),
2006 // variant 26: AbortTransaction
2007 (
2008 Request::AbortTransaction {
2009 txn_id: "tx".into(),
2010 producer_id: 1,
2011 producer_epoch: 0,
2012 },
2013 26,
2014 ),
2015 // variant 27: DescribeQuotas
2016 (Request::DescribeQuotas { entities: vec![] }, 27),
2017 // variant 28: AlterQuotas
2018 (
2019 Request::AlterQuotas {
2020 alterations: vec![],
2021 },
2022 28,
2023 ),
2024 // variant 29: AlterTopicConfig
2025 (
2026 Request::AlterTopicConfig {
2027 topic: "t".into(),
2028 configs: vec![],
2029 },
2030 29,
2031 ),
2032 // variant 30: CreatePartitions
2033 (
2034 Request::CreatePartitions {
2035 topic: "t".into(),
2036 new_partition_count: 2,
2037 assignments: vec![],
2038 },
2039 30,
2040 ),
2041 // variant 31: DeleteRecords
2042 (
2043 Request::DeleteRecords {
2044 topic: "t".into(),
2045 partition_offsets: vec![],
2046 },
2047 31,
2048 ),
2049 // variant 32: DescribeTopicConfigs
2050 (Request::DescribeTopicConfigs { topics: vec![] }, 32),
2051 // variant 33: Handshake
2052 (
2053 Request::Handshake {
2054 protocol_version: crate::PROTOCOL_VERSION,
2055 client_id: "test".into(),
2056 },
2057 33,
2058 ),
2059 // variant 34: JoinGroup
2060 (
2061 Request::JoinGroup {
2062 group_id: "g".into(),
2063 member_id: String::new(),
2064 session_timeout_ms: 10000,
2065 rebalance_timeout_ms: 30000,
2066 protocol_type: "consumer".into(),
2067 subscriptions: vec![],
2068 },
2069 34,
2070 ),
2071 // variant 35: SyncGroup
2072 (
2073 Request::SyncGroup {
2074 group_id: "g".into(),
2075 generation_id: 1,
2076 member_id: "m".into(),
2077 assignments: vec![],
2078 },
2079 35,
2080 ),
2081 // variant 36: Heartbeat
2082 (
2083 Request::Heartbeat {
2084 group_id: "g".into(),
2085 generation_id: 1,
2086 member_id: "m".into(),
2087 },
2088 36,
2089 ),
2090 // variant 37: LeaveGroup
2091 (
2092 Request::LeaveGroup {
2093 group_id: "g".into(),
2094 member_id: "m".into(),
2095 },
2096 37,
2097 ),
2098 // variant 38: PublishBatch
2099 (
2100 Request::PublishBatch {
2101 topic: "t".into(),
2102 partition: Some(0),
2103 records: vec![],
2104 leader_epoch: None,
2105 },
2106 38,
2107 ),
2108 // variant 39: IdempotentPublishBatch
2109 (
2110 Request::IdempotentPublishBatch {
2111 topic: "t".into(),
2112 partition: Some(0),
2113 records: vec![],
2114 producer_id: 1,
2115 producer_epoch: 0,
2116 base_sequence: 0,
2117 leader_epoch: None,
2118 },
2119 39,
2120 ),
2121 ];
2122
2123 for (request, expected_discriminant) in test_cases {
2124 let bytes = request.to_bytes().unwrap();
2125 assert_eq!(
2126 bytes[0], expected_discriminant,
2127 "Wire discriminant changed for {:?} — enum variant order may have shifted!",
2128 request
2129 );
2130 }
2131 }
2132
2133 /// Exhaustive — every Response variant is pinned.
2134 #[test]
2135 fn test_postcard_wire_stability_response_discriminants() {
2136 use bytes::Bytes;
2137
2138 let test_cases: Vec<(Response, u8)> = vec![
2139 // variant 0: Authenticated
2140 (
2141 Response::Authenticated {
2142 session_id: String::new(),
2143 expires_in: 0,
2144 },
2145 0,
2146 ),
2147 // variant 1: ScramServerFirst
2148 (
2149 Response::ScramServerFirst {
2150 message: Bytes::from("r=nonce,s=salt,i=4096"),
2151 },
2152 1,
2153 ),
2154 // variant 2: ScramServerFinal
2155 (
2156 Response::ScramServerFinal {
2157 message: Bytes::from("v=verifier"),
2158 session_id: None,
2159 expires_in: None,
2160 },
2161 2,
2162 ),
2163 // variant 3: Published
2164 (
2165 Response::Published {
2166 offset: 0,
2167 partition: 0,
2168 },
2169 3,
2170 ),
2171 // variant 4: Messages
2172 (Response::Messages { messages: vec![] }, 4),
2173 // variant 5: TopicCreated
2174 (
2175 Response::TopicCreated {
2176 name: "t".into(),
2177 partitions: 1,
2178 },
2179 5,
2180 ),
2181 // variant 6: Topics
2182 (Response::Topics { topics: vec![] }, 6),
2183 // variant 7: TopicDeleted
2184 (Response::TopicDeleted, 7),
2185 // variant 8: OffsetCommitted
2186 (Response::OffsetCommitted, 8),
2187 // variant 9: Offset
2188 (Response::Offset { offset: None }, 9),
2189 // variant 10: Metadata
2190 (
2191 Response::Metadata {
2192 name: "t".into(),
2193 partitions: 1,
2194 },
2195 10,
2196 ),
2197 // variant 11: ClusterMetadata
2198 (
2199 Response::ClusterMetadata {
2200 controller_id: None,
2201 brokers: vec![],
2202 topics: vec![],
2203 },
2204 11,
2205 ),
2206 // variant 12: Pong
2207 (Response::Pong, 12),
2208 // variant 13: OffsetBounds
2209 (
2210 Response::OffsetBounds {
2211 earliest: 0,
2212 latest: 0,
2213 },
2214 13,
2215 ),
2216 // variant 14: Groups
2217 (Response::Groups { groups: vec![] }, 14),
2218 // variant 15: GroupDescription
2219 (
2220 Response::GroupDescription {
2221 consumer_group: "g".into(),
2222 offsets: HashMap::new(),
2223 },
2224 15,
2225 ),
2226 // variant 16: GroupDeleted
2227 (Response::GroupDeleted, 16),
2228 // variant 17: OffsetForTimestamp
2229 (Response::OffsetForTimestamp { offset: None }, 17),
2230 // variant 18: Error
2231 (
2232 Response::Error {
2233 message: "e".into(),
2234 },
2235 18,
2236 ),
2237 // variant 19: Ok
2238 (Response::Ok, 19),
2239 // variant 20: ProducerIdInitialized
2240 (
2241 Response::ProducerIdInitialized {
2242 producer_id: 1,
2243 producer_epoch: 0,
2244 },
2245 20,
2246 ),
2247 // variant 21: IdempotentPublished
2248 (
2249 Response::IdempotentPublished {
2250 offset: 0,
2251 partition: 0,
2252 duplicate: false,
2253 },
2254 21,
2255 ),
2256 // variant 22: TransactionStarted
2257 (
2258 Response::TransactionStarted {
2259 txn_id: "tx".into(),
2260 },
2261 22,
2262 ),
2263 // variant 23: PartitionsAddedToTxn
2264 (
2265 Response::PartitionsAddedToTxn {
2266 txn_id: "tx".into(),
2267 partition_count: 0,
2268 },
2269 23,
2270 ),
2271 // variant 24: TransactionalPublished
2272 (
2273 Response::TransactionalPublished {
2274 offset: 0,
2275 partition: 0,
2276 sequence: 0,
2277 },
2278 24,
2279 ),
2280 // variant 25: OffsetsAddedToTxn
2281 (
2282 Response::OffsetsAddedToTxn {
2283 txn_id: "tx".into(),
2284 },
2285 25,
2286 ),
2287 // variant 26: TransactionCommitted
2288 (
2289 Response::TransactionCommitted {
2290 txn_id: "tx".into(),
2291 },
2292 26,
2293 ),
2294 // variant 27: TransactionAborted
2295 (
2296 Response::TransactionAborted {
2297 txn_id: "tx".into(),
2298 },
2299 27,
2300 ),
2301 // variant 28: QuotasDescribed
2302 (Response::QuotasDescribed { entries: vec![] }, 28),
2303 // variant 29: QuotasAltered
2304 (Response::QuotasAltered { altered_count: 0 }, 29),
2305 // variant 30: Throttled
2306 (
2307 Response::Throttled {
2308 throttle_time_ms: 0,
2309 quota_type: "produce_bytes_rate".into(),
2310 entity: "user".into(),
2311 },
2312 30,
2313 ),
2314 // variant 31: TopicConfigAltered
2315 (
2316 Response::TopicConfigAltered {
2317 topic: "t".into(),
2318 changed_count: 0,
2319 },
2320 31,
2321 ),
2322 // variant 32: PartitionsCreated
2323 (
2324 Response::PartitionsCreated {
2325 topic: "t".into(),
2326 new_partition_count: 2,
2327 },
2328 32,
2329 ),
2330 // variant 33: RecordsDeleted
2331 (
2332 Response::RecordsDeleted {
2333 topic: "t".into(),
2334 results: vec![],
2335 },
2336 33,
2337 ),
2338 // variant 34: TopicConfigsDescribed
2339 (Response::TopicConfigsDescribed { configs: vec![] }, 34),
2340 // variant 35: HandshakeResult
2341 (
2342 Response::HandshakeResult {
2343 server_version: crate::PROTOCOL_VERSION,
2344 compatible: true,
2345 message: String::new(),
2346 },
2347 35,
2348 ),
2349 // variant 36: JoinGroupResult
2350 (
2351 Response::JoinGroupResult {
2352 generation_id: 1,
2353 protocol_type: "consumer".into(),
2354 member_id: "m".into(),
2355 leader_id: "l".into(),
2356 members: vec![],
2357 },
2358 36,
2359 ),
2360 // variant 37: SyncGroupResult
2361 (
2362 Response::SyncGroupResult {
2363 assignments: vec![],
2364 },
2365 37,
2366 ),
2367 // variant 38: HeartbeatResult
2368 (Response::HeartbeatResult { error_code: 0 }, 38),
2369 // variant 39: LeaveGroupResult
2370 (Response::LeaveGroupResult, 39),
2371 // variant 40: PublishedBatch
2372 (
2373 Response::PublishedBatch {
2374 base_offset: 0,
2375 partition: 0,
2376 record_count: 0,
2377 },
2378 40,
2379 ),
2380 // variant 41: IdempotentPublishedBatch
2381 (
2382 Response::IdempotentPublishedBatch {
2383 base_offset: 0,
2384 partition: 0,
2385 record_count: 0,
2386 duplicate: false,
2387 },
2388 41,
2389 ),
2390 ];
2391
2392 for (response, expected_discriminant) in test_cases {
2393 let bytes = response.to_bytes().unwrap();
2394 assert_eq!(
2395 bytes[0], expected_discriminant,
2396 "Wire discriminant changed for {:?} — enum variant order may have shifted!",
2397 response
2398 );
2399 }
2400 }
2401
2402 /// Verify that to_wire rejects oversized messages.
2403 #[test]
2404 fn test_to_wire_rejects_oversized_request() {
2405 use bytes::Bytes;
2406 // Create a request with a payload larger than MAX_MESSAGE_SIZE
2407 let huge_value = vec![0u8; crate::MAX_MESSAGE_SIZE + 1];
2408 let request = Request::Publish {
2409 topic: "t".into(),
2410 partition: None,
2411 key: None,
2412 value: Bytes::from(huge_value),
2413 leader_epoch: None,
2414 };
2415 let result = request.to_wire(crate::WireFormat::Postcard, 0);
2416 assert!(
2417 matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
2418 "Expected MessageTooLarge error for oversized request"
2419 );
2420 }
2421
2422 #[test]
2423 fn test_to_wire_rejects_oversized_response() {
2424 let huge_messages = vec![MessageData {
2425 offset: 0,
2426 partition: 0,
2427 timestamp: 0,
2428 key: None,
2429 value: bytes::Bytes::from(vec![0u8; crate::MAX_MESSAGE_SIZE + 1]),
2430 headers: vec![],
2431 }];
2432 let response = Response::Messages {
2433 messages: huge_messages,
2434 };
2435 let result = response.to_wire(crate::WireFormat::Postcard, 0);
2436 assert!(
2437 matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
2438 "Expected MessageTooLarge error for oversized response"
2439 );
2440 }
2441
2442 /// Guard against accidental reordering/removal of enum variants.
2443 ///
2444 /// Postcard uses discriminant indices as wire tags — reordering or deleting
2445 /// a variant silently breaks all existing clients. This test serialises
2446 /// one instance of every variant and asserts the discriminant byte matches
2447 /// the expected index. Adding new variants at the END is safe — just
2448 /// extend the list. Removing or reordering variants will make this test
2449 /// fail.
2450 #[test]
2451 #[allow(deprecated)]
2452 fn request_discriminant_stability() {
2453 // Each tuple: (expected discriminant, instance of the variant)
2454 let cases: Vec<(u8, Request)> = vec![
2455 (
2456 0,
2457 Request::Authenticate {
2458 username: String::new(),
2459 password: String::new(),
2460 require_tls: false,
2461 },
2462 ),
2463 (
2464 1,
2465 Request::SaslAuthenticate {
2466 mechanism: Bytes::new(),
2467 auth_bytes: Bytes::new(),
2468 },
2469 ),
2470 (
2471 2,
2472 Request::ScramClientFirst {
2473 message: Bytes::new(),
2474 },
2475 ),
2476 (
2477 3,
2478 Request::ScramClientFinal {
2479 message: Bytes::new(),
2480 },
2481 ),
2482 (
2483 4,
2484 Request::Publish {
2485 topic: String::new(),
2486 partition: None,
2487 key: None,
2488 value: Bytes::new(),
2489 leader_epoch: None,
2490 },
2491 ),
2492 (
2493 5,
2494 Request::Consume {
2495 topic: String::new(),
2496 partition: 0,
2497 offset: 0,
2498 max_messages: 1,
2499 isolation_level: None,
2500 max_wait_ms: None,
2501 },
2502 ),
2503 (
2504 6,
2505 Request::CreateTopic {
2506 name: String::new(),
2507 partitions: None,
2508 },
2509 ),
2510 (7, Request::ListTopics),
2511 (
2512 8,
2513 Request::DeleteTopic {
2514 name: String::new(),
2515 },
2516 ),
2517 (
2518 9,
2519 Request::CommitOffset {
2520 consumer_group: String::new(),
2521 topic: String::new(),
2522 partition: 0,
2523 offset: 0,
2524 },
2525 ),
2526 (
2527 10,
2528 Request::GetOffset {
2529 consumer_group: String::new(),
2530 topic: String::new(),
2531 partition: 0,
2532 },
2533 ),
2534 (
2535 11,
2536 Request::GetMetadata {
2537 topic: String::new(),
2538 },
2539 ),
2540 (12, Request::GetClusterMetadata { topics: Vec::new() }),
2541 (13, Request::Ping),
2542 (
2543 14,
2544 Request::GetOffsetBounds {
2545 topic: String::new(),
2546 partition: 0,
2547 },
2548 ),
2549 (15, Request::ListGroups),
2550 (
2551 16,
2552 Request::DescribeGroup {
2553 consumer_group: String::new(),
2554 },
2555 ),
2556 (
2557 17,
2558 Request::DeleteGroup {
2559 consumer_group: String::new(),
2560 },
2561 ),
2562 (
2563 18,
2564 Request::GetOffsetForTimestamp {
2565 topic: String::new(),
2566 partition: 0,
2567 timestamp_ms: 0,
2568 },
2569 ),
2570 ];
2571
2572 for (expected_disc, req) in &cases {
2573 let bytes = req.to_bytes().unwrap();
2574 assert_eq!(
2575 bytes[0], *expected_disc,
2576 "Request variant at discriminant {expected_disc} moved! This breaks wire compatibility."
2577 );
2578 }
2579
2580 // Last variant (LeaveGroup = 37) sentinel
2581 let last = Request::LeaveGroup {
2582 group_id: String::new(),
2583 member_id: String::new(),
2584 };
2585 let last_bytes = last.to_bytes().unwrap();
2586 assert_eq!(
2587 last_bytes[0], 37,
2588 "LeaveGroup must remain discriminant 37 (last variant)"
2589 );
2590 }
2591}