Skip to main content

rivven_protocol/
messages.rs

1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Partition assignments for SyncGroup: `Vec<(member_id, Vec<(topic, Vec<partition>)>)>`
11pub type SyncGroupAssignments = Vec<(String, Vec<(String, Vec<u32>)>)>;
12
13/// Quota alteration request
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct QuotaAlteration {
16    /// Entity type: "user", "client-id", "consumer-group", "default"
17    pub entity_type: String,
18    /// Entity name (None for defaults)
19    pub entity_name: Option<String>,
20    /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
21    pub quota_key: String,
22    /// Quota value (None to remove quota, Some to set)
23    pub quota_value: Option<u64>,
24}
25
26/// Quota entry in describe response
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct QuotaEntry {
29    /// Entity type
30    pub entity_type: String,
31    /// Entity name (None for defaults)
32    pub entity_name: Option<String>,
33    /// Quota values
34    pub quotas: HashMap<String, u64>,
35}
36
37/// Topic configuration entry for AlterTopicConfig
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct TopicConfigEntry {
40    /// Configuration key (e.g., "retention.ms", "max.message.bytes")
41    pub key: String,
42    /// Configuration value (None to reset to default)
43    pub value: Option<String>,
44}
45
46/// Topic configuration in describe response
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct TopicConfigDescription {
49    /// Topic name
50    pub topic: String,
51    /// Configuration entries
52    pub configs: HashMap<String, TopicConfigValue>,
53}
54
55/// Topic configuration value with metadata
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TopicConfigValue {
58    /// Current value
59    pub value: String,
60    /// Whether this is the default value
61    pub is_default: bool,
62    /// Whether this config is read-only
63    pub is_read_only: bool,
64    /// Whether this config is sensitive (e.g., passwords)
65    pub is_sensitive: bool,
66}
67
68/// Delete records result for a partition
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct DeleteRecordsResult {
71    /// Partition ID
72    pub partition: u32,
73    /// New low watermark (first available offset after deletion)
74    pub low_watermark: u64,
75    /// Error message if deletion failed for this partition
76    pub error: Option<String>,
77}
78
79/// A single record within a [`Request::PublishBatch`].
80///
81/// Carries only the key and value — the topic, partition, and
82/// leader_epoch are shared across the entire batch.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct BatchRecord {
85    #[serde(with = "crate::serde_utils::option_bytes_serde")]
86    pub key: Option<Bytes>,
87    #[serde(with = "crate::serde_utils::bytes_serde")]
88    pub value: Bytes,
89}
90
91/// Protocol request messages
92///
93/// # Stability
94///
95/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
96/// Adding new variants should only be done at the end of the enum.
97#[derive(Clone, Serialize, Deserialize)]
98pub enum Request {
99    /// Authenticate with username/password (SASL/PLAIN compatible).
100    ///
101    /// # Security — transport encryption required
102    ///
103    /// The password is sent in **plaintext** on the wire (SASL/PLAIN).
104    /// This variant must only be used over a TLS-encrypted connection;
105    /// otherwise the password is exposed to network observers.
106    ///
107    /// # Deprecation
108    ///
109    /// Prefer `ScramClientFirst` / `ScramClientFinal` (SCRAM-SHA-256
110    /// challenge-response) which never sends the password over the wire.
111    #[deprecated(
112        note = "Use SCRAM-SHA-256 (ScramClientFirst/ScramClientFinal) instead of plaintext auth"
113    )]
114    Authenticate {
115        username: String,
116        password: String,
117        /// When `true` the server **must** reject this request if the
118        /// connection is not TLS-encrypted, preventing accidental
119        /// credential exposure on cleartext transports.
120        #[serde(default)]
121        require_tls: bool,
122    },
123
124    /// Authenticate with SASL bytes (for Kafka client compatibility)
125    SaslAuthenticate {
126        #[serde(with = "crate::serde_utils::bytes_serde")]
127        mechanism: Bytes,
128        #[serde(with = "crate::serde_utils::bytes_serde")]
129        auth_bytes: Bytes,
130    },
131
132    /// SCRAM-SHA-256: Client-first message
133    ScramClientFirst {
134        /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
135        #[serde(with = "crate::serde_utils::bytes_serde")]
136        message: Bytes,
137    },
138
139    /// SCRAM-SHA-256: Client-final message
140    ScramClientFinal {
141        /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
142        #[serde(with = "crate::serde_utils::bytes_serde")]
143        message: Bytes,
144    },
145
146    /// Publish a message to a topic
147    Publish {
148        topic: String,
149        partition: Option<u32>,
150        #[serde(with = "crate::serde_utils::option_bytes_serde")]
151        key: Option<Bytes>,
152        #[serde(with = "crate::serde_utils::bytes_serde")]
153        value: Bytes,
154        /// Leader epoch for data-path fencing (§2.4).
155        /// When set, the broker rejects the write if its current leader epoch
156        /// for this partition is higher, preventing stale-leader writes.
157        #[serde(default)]
158        leader_epoch: Option<u64>,
159    },
160
161    /// Consume messages from a topic
162    Consume {
163        topic: String,
164        partition: u32,
165        offset: u64,
166        max_messages: u32,
167        /// Isolation level for transactional reads
168        /// None = read_uncommitted (default, backward compatible)
169        /// Some(0) = read_uncommitted
170        /// Some(1) = read_committed (filters aborted transaction messages)
171        #[serde(default)]
172        isolation_level: Option<u8>,
173        /// Long-polling: maximum time (ms) to wait for new data before returning
174        /// an empty response. None or 0 = immediate (no waiting). Capped at 30 000 ms.
175        #[serde(default)]
176        max_wait_ms: Option<u64>,
177    },
178
179    /// Create a new topic
180    CreateTopic {
181        name: String,
182        partitions: Option<u32>,
183    },
184
185    /// List all topics
186    ListTopics,
187
188    /// Delete a topic
189    DeleteTopic { name: String },
190
191    /// Commit consumer offset
192    CommitOffset {
193        consumer_group: String,
194        topic: String,
195        partition: u32,
196        offset: u64,
197    },
198
199    /// Get consumer offset
200    GetOffset {
201        consumer_group: String,
202        topic: String,
203        partition: u32,
204    },
205
206    /// Get topic metadata
207    GetMetadata { topic: String },
208
209    /// Get cluster metadata (all topics or specific ones)
210    GetClusterMetadata {
211        /// Topics to get metadata for (empty = all topics)
212        topics: Vec<String>,
213    },
214
215    /// Ping
216    Ping,
217
218    /// Get offset bounds for a partition
219    GetOffsetBounds { topic: String, partition: u32 },
220
221    /// List all consumer groups
222    ListGroups,
223
224    /// Describe a consumer group (get all offsets)
225    DescribeGroup { consumer_group: String },
226
227    /// Delete a consumer group
228    DeleteGroup { consumer_group: String },
229
230    /// Find offset for a timestamp
231    GetOffsetForTimestamp {
232        topic: String,
233        partition: u32,
234        /// Timestamp in milliseconds since epoch
235        timestamp_ms: i64,
236    },
237
238    // =========================================================================
239    // Idempotent Producer
240    // =========================================================================
241    /// Initialize idempotent producer (request producer ID and epoch)
242    ///
243    /// Call this before sending idempotent produce requests.
244    /// If reconnecting, provide the previous producer_id to bump epoch.
245    InitProducerId {
246        /// Previous producer ID (None for new producers)
247        producer_id: Option<u64>,
248    },
249
250    /// Publish with idempotent semantics (exactly-once delivery)
251    ///
252    /// Requires InitProducerId to have been called first.
253    IdempotentPublish {
254        topic: String,
255        partition: Option<u32>,
256        #[serde(with = "crate::serde_utils::option_bytes_serde")]
257        key: Option<Bytes>,
258        #[serde(with = "crate::serde_utils::bytes_serde")]
259        value: Bytes,
260        /// Producer ID from InitProducerId response
261        producer_id: u64,
262        /// Producer epoch from InitProducerId response
263        producer_epoch: u16,
264        /// Sequence number (starts at 0, increments per partition)
265        sequence: i32,
266        /// Leader epoch for fencing stale writes (§2.4)
267        #[serde(default)]
268        leader_epoch: Option<u64>,
269    },
270
271    // =========================================================================
272    // Native Transactions
273    // =========================================================================
274    /// Begin a new transaction
275    BeginTransaction {
276        /// Transaction ID (unique per producer)
277        txn_id: String,
278        /// Producer ID from InitProducerId
279        producer_id: u64,
280        /// Producer epoch
281        producer_epoch: u16,
282        /// Transaction timeout in milliseconds (optional, defaults to 60s)
283        timeout_ms: Option<u64>,
284    },
285
286    /// Add partitions to an active transaction
287    AddPartitionsToTxn {
288        /// Transaction ID
289        txn_id: String,
290        /// Producer ID
291        producer_id: u64,
292        /// Producer epoch
293        producer_epoch: u16,
294        /// Partitions to add (topic, partition pairs)
295        partitions: Vec<(String, u32)>,
296    },
297
298    /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
299    TransactionalPublish {
300        /// Transaction ID
301        txn_id: String,
302        topic: String,
303        partition: Option<u32>,
304        #[serde(with = "crate::serde_utils::option_bytes_serde")]
305        key: Option<Bytes>,
306        #[serde(with = "crate::serde_utils::bytes_serde")]
307        value: Bytes,
308        /// Producer ID
309        producer_id: u64,
310        /// Producer epoch
311        producer_epoch: u16,
312        /// Sequence number
313        sequence: i32,
314        /// Leader epoch for fencing stale writes (§2.4)
315        #[serde(default)]
316        leader_epoch: Option<u64>,
317    },
318
319    /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
320    AddOffsetsToTxn {
321        /// Transaction ID
322        txn_id: String,
323        /// Producer ID
324        producer_id: u64,
325        /// Producer epoch
326        producer_epoch: u16,
327        /// Consumer group ID
328        group_id: String,
329        /// Offsets to commit (topic, partition, offset triples)
330        offsets: Vec<(String, u32, i64)>,
331    },
332
333    /// Commit a transaction
334    CommitTransaction {
335        /// Transaction ID
336        txn_id: String,
337        /// Producer ID
338        producer_id: u64,
339        /// Producer epoch
340        producer_epoch: u16,
341    },
342
343    /// Abort a transaction
344    AbortTransaction {
345        /// Transaction ID
346        txn_id: String,
347        /// Producer ID
348        producer_id: u64,
349        /// Producer epoch
350        producer_epoch: u16,
351    },
352
353    // =========================================================================
354    // Per-Principal Quotas (Kafka Parity)
355    // =========================================================================
356    /// Describe quotas for entities
357    DescribeQuotas {
358        /// Entities to describe (empty = all)
359        /// Format: Vec<(entity_type, entity_name)>
360        /// entity_type: "user", "client-id", "consumer-group", "default"
361        /// entity_name: None for defaults, Some for specific entities
362        entities: Vec<(String, Option<String>)>,
363    },
364
365    /// Alter quotas for entities
366    AlterQuotas {
367        /// Quota alterations to apply
368        /// Each item: (entity_type, entity_name, quota_key, quota_value)
369        /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
370        /// quota_value: None to remove, Some(value) to set
371        alterations: Vec<QuotaAlteration>,
372    },
373
374    // =========================================================================
375    // Admin API (Kafka Parity)
376    // =========================================================================
377    /// Alter topic configuration
378    AlterTopicConfig {
379        /// Topic name
380        topic: String,
381        /// Configuration changes to apply
382        configs: Vec<TopicConfigEntry>,
383    },
384
385    /// Create additional partitions for an existing topic
386    CreatePartitions {
387        /// Topic name
388        topic: String,
389        /// New total partition count (must be > current count)
390        new_partition_count: u32,
391        /// Optional assignment of new partitions to brokers
392        /// If empty, broker will auto-assign
393        assignments: Vec<Vec<String>>,
394    },
395
396    /// Delete records before a given offset (log truncation)
397    DeleteRecords {
398        /// Topic name
399        topic: String,
400        /// Partition-offset pairs: delete all records before these offsets
401        partition_offsets: Vec<(u32, u64)>,
402    },
403
404    /// Describe topic configurations
405    DescribeTopicConfigs {
406        /// Topics to describe (empty = all)
407        topics: Vec<String>,
408    },
409
410    /// Protocol version handshake
411    ///
412    /// Sent by the client as the first message after connecting.
413    /// The server validates the protocol version and returns compatibility info.
414    Handshake {
415        /// Client's protocol version (must match `PROTOCOL_VERSION`)
416        protocol_version: u32,
417        /// Optional client identifier for diagnostics
418        client_id: String,
419    },
420
421    // =========================================================================
422    // Consumer Group Coordination (Kafka-compatible protocol)
423    // =========================================================================
424    /// Join a consumer group. The coordinator assigns a member ID and,
425    /// once all members have joined, elects a leader and triggers a
426    /// rebalance. Returns the generation ID, member ID, and leader info.
427    JoinGroup {
428        /// Consumer group ID
429        group_id: String,
430        /// Member ID (empty string for first join; server assigns one)
431        member_id: String,
432        /// Session timeout in milliseconds — if no heartbeat is received
433        /// within this period, the member is considered dead and a
434        /// rebalance is triggered.
435        session_timeout_ms: u32,
436        /// Rebalance timeout in milliseconds — maximum time the
437        /// coordinator waits for all members to join during a rebalance.
438        rebalance_timeout_ms: u32,
439        /// Protocol type (e.g. "consumer")
440        protocol_type: String,
441        /// Topics this member wants to consume
442        subscriptions: Vec<String>,
443    },
444
445    /// Sync a consumer group — the leader sends partition assignments
446    /// to the coordinator, and all members (including the leader)
447    /// receive their individual assignments.
448    SyncGroup {
449        /// Consumer group ID
450        group_id: String,
451        /// Generation ID from JoinGroup response
452        generation_id: u32,
453        /// This member's ID
454        member_id: String,
455        /// Assignments (only sent by leader; empty for followers).
456        /// Each entry is `(member_id, Vec<(topic, Vec<partition>)>)`.
457        assignments: SyncGroupAssignments,
458    },
459
460    /// Heartbeat — sent periodically by group members to keep their
461    /// session alive. The response tells the member whether a
462    /// rebalance is needed.
463    Heartbeat {
464        /// Consumer group ID
465        group_id: String,
466        /// Generation ID
467        generation_id: u32,
468        /// This member's ID
469        member_id: String,
470    },
471
472    /// Leave a consumer group gracefully. Triggers an immediate
473    /// rebalance for the remaining members.
474    LeaveGroup {
475        /// Consumer group ID
476        group_id: String,
477        /// This member's ID
478        member_id: String,
479    },
480
481    // =========================================================================
482    // Batch Publish (hot-path optimization)
483    // =========================================================================
484    /// Publish a batch of records to a single (topic, partition) in one
485    /// wire message — eliminates per-record topic cloning and serialization
486    /// overhead compared to sending N individual `Publish` requests.
487    PublishBatch {
488        topic: String,
489        partition: Option<u32>,
490        records: Vec<BatchRecord>,
491        /// Leader epoch for data-path fencing (§2.4).
492        #[serde(default)]
493        leader_epoch: Option<u64>,
494    },
495
496    /// Idempotent batch publish — combines [`Request::IdempotentPublish`] semantics
497    /// with the batching efficiency of [`Request::PublishBatch`].
498    ///
499    /// Records are assigned sequence numbers `base_sequence..base_sequence + N - 1`.
500    IdempotentPublishBatch {
501        topic: String,
502        partition: Option<u32>,
503        records: Vec<BatchRecord>,
504        producer_id: u64,
505        producer_epoch: u16,
506        /// First sequence number in the batch
507        base_sequence: i32,
508        #[serde(default)]
509        leader_epoch: Option<u64>,
510    },
511}
512
513// Manual Debug impl to redact credentials from log output.
514// `Request::Authenticate { password }` and SASL/SCRAM variants contain
515// secrets that must never appear in debug logs.
516impl std::fmt::Debug for Request {
517    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518        match self {
519            #[allow(deprecated)]
520            Self::Authenticate {
521                username,
522                require_tls,
523                ..
524            } => f
525                .debug_struct("Authenticate")
526                .field("username", username)
527                .field("password", &"[REDACTED]")
528                .field("require_tls", require_tls)
529                .finish(),
530            Self::SaslAuthenticate { mechanism, .. } => f
531                .debug_struct("SaslAuthenticate")
532                .field("mechanism", mechanism)
533                .field("auth_bytes", &"[REDACTED]")
534                .finish(),
535            Self::ScramClientFirst { .. } => f
536                .debug_struct("ScramClientFirst")
537                .field("message", &"[REDACTED]")
538                .finish(),
539            Self::ScramClientFinal { .. } => f
540                .debug_struct("ScramClientFinal")
541                .field("message", &"[REDACTED]")
542                .finish(),
543            Self::Publish {
544                topic,
545                partition,
546                key,
547                value,
548                leader_epoch,
549            } => f
550                .debug_struct("Publish")
551                .field("topic", topic)
552                .field("partition", partition)
553                .field("key", key)
554                .field("value_len", &value.len())
555                .field("leader_epoch", leader_epoch)
556                .finish(),
557            Self::Consume {
558                topic,
559                partition,
560                offset,
561                max_messages,
562                isolation_level,
563                max_wait_ms,
564            } => f
565                .debug_struct("Consume")
566                .field("topic", topic)
567                .field("partition", partition)
568                .field("offset", offset)
569                .field("max_messages", max_messages)
570                .field("isolation_level", isolation_level)
571                .field("max_wait_ms", max_wait_ms)
572                .finish(),
573            Self::CreateTopic { name, partitions } => f
574                .debug_struct("CreateTopic")
575                .field("name", name)
576                .field("partitions", partitions)
577                .finish(),
578            Self::ListTopics => write!(f, "ListTopics"),
579            Self::DeleteTopic { name } => {
580                f.debug_struct("DeleteTopic").field("name", name).finish()
581            }
582            Self::CommitOffset {
583                consumer_group,
584                topic,
585                partition,
586                offset,
587            } => f
588                .debug_struct("CommitOffset")
589                .field("consumer_group", consumer_group)
590                .field("topic", topic)
591                .field("partition", partition)
592                .field("offset", offset)
593                .finish(),
594            Self::GetOffset {
595                consumer_group,
596                topic,
597                partition,
598            } => f
599                .debug_struct("GetOffset")
600                .field("consumer_group", consumer_group)
601                .field("topic", topic)
602                .field("partition", partition)
603                .finish(),
604            Self::GetMetadata { topic } => {
605                f.debug_struct("GetMetadata").field("topic", topic).finish()
606            }
607            Self::GetClusterMetadata { topics } => f
608                .debug_struct("GetClusterMetadata")
609                .field("topics", topics)
610                .finish(),
611            Self::Ping => write!(f, "Ping"),
612            Self::GetOffsetBounds { topic, partition } => f
613                .debug_struct("GetOffsetBounds")
614                .field("topic", topic)
615                .field("partition", partition)
616                .finish(),
617            Self::ListGroups => write!(f, "ListGroups"),
618            Self::DescribeGroup { consumer_group } => f
619                .debug_struct("DescribeGroup")
620                .field("consumer_group", consumer_group)
621                .finish(),
622            Self::DeleteGroup { consumer_group } => f
623                .debug_struct("DeleteGroup")
624                .field("consumer_group", consumer_group)
625                .finish(),
626            Self::GetOffsetForTimestamp {
627                topic,
628                partition,
629                timestamp_ms,
630            } => f
631                .debug_struct("GetOffsetForTimestamp")
632                .field("topic", topic)
633                .field("partition", partition)
634                .field("timestamp_ms", timestamp_ms)
635                .finish(),
636            Self::InitProducerId { producer_id } => f
637                .debug_struct("InitProducerId")
638                .field("producer_id", producer_id)
639                .finish(),
640            Self::IdempotentPublish {
641                topic,
642                partition,
643                producer_id,
644                producer_epoch,
645                sequence,
646                ..
647            } => f
648                .debug_struct("IdempotentPublish")
649                .field("topic", topic)
650                .field("partition", partition)
651                .field("producer_id", producer_id)
652                .field("producer_epoch", producer_epoch)
653                .field("sequence", sequence)
654                .finish(),
655            Self::BeginTransaction {
656                txn_id,
657                producer_id,
658                producer_epoch,
659                timeout_ms,
660            } => f
661                .debug_struct("BeginTransaction")
662                .field("txn_id", txn_id)
663                .field("producer_id", producer_id)
664                .field("producer_epoch", producer_epoch)
665                .field("timeout_ms", timeout_ms)
666                .finish(),
667            Self::AddPartitionsToTxn {
668                txn_id,
669                producer_id,
670                producer_epoch,
671                partitions,
672            } => f
673                .debug_struct("AddPartitionsToTxn")
674                .field("txn_id", txn_id)
675                .field("producer_id", producer_id)
676                .field("producer_epoch", producer_epoch)
677                .field("partitions", partitions)
678                .finish(),
679            Self::TransactionalPublish {
680                txn_id,
681                topic,
682                partition,
683                producer_id,
684                producer_epoch,
685                sequence,
686                ..
687            } => f
688                .debug_struct("TransactionalPublish")
689                .field("txn_id", txn_id)
690                .field("topic", topic)
691                .field("partition", partition)
692                .field("producer_id", producer_id)
693                .field("producer_epoch", producer_epoch)
694                .field("sequence", sequence)
695                .finish(),
696            Self::AddOffsetsToTxn {
697                txn_id,
698                producer_id,
699                producer_epoch,
700                group_id,
701                offsets,
702            } => f
703                .debug_struct("AddOffsetsToTxn")
704                .field("txn_id", txn_id)
705                .field("producer_id", producer_id)
706                .field("producer_epoch", producer_epoch)
707                .field("group_id", group_id)
708                .field("offsets", offsets)
709                .finish(),
710            Self::CommitTransaction {
711                txn_id,
712                producer_id,
713                producer_epoch,
714            } => f
715                .debug_struct("CommitTransaction")
716                .field("txn_id", txn_id)
717                .field("producer_id", producer_id)
718                .field("producer_epoch", producer_epoch)
719                .finish(),
720            Self::AbortTransaction {
721                txn_id,
722                producer_id,
723                producer_epoch,
724            } => f
725                .debug_struct("AbortTransaction")
726                .field("txn_id", txn_id)
727                .field("producer_id", producer_id)
728                .field("producer_epoch", producer_epoch)
729                .finish(),
730            Self::DescribeQuotas { entities } => f
731                .debug_struct("DescribeQuotas")
732                .field("entities", entities)
733                .finish(),
734            Self::AlterQuotas { alterations } => f
735                .debug_struct("AlterQuotas")
736                .field("alterations", alterations)
737                .finish(),
738            Self::AlterTopicConfig { topic, configs } => f
739                .debug_struct("AlterTopicConfig")
740                .field("topic", topic)
741                .field("configs", configs)
742                .finish(),
743            Self::CreatePartitions {
744                topic,
745                new_partition_count,
746                assignments,
747            } => f
748                .debug_struct("CreatePartitions")
749                .field("topic", topic)
750                .field("new_partition_count", new_partition_count)
751                .field("assignments", assignments)
752                .finish(),
753            Self::DeleteRecords {
754                topic,
755                partition_offsets,
756            } => f
757                .debug_struct("DeleteRecords")
758                .field("topic", topic)
759                .field("partition_offsets", partition_offsets)
760                .finish(),
761            Self::DescribeTopicConfigs { topics } => f
762                .debug_struct("DescribeTopicConfigs")
763                .field("topics", topics)
764                .finish(),
765            Self::Handshake {
766                protocol_version,
767                client_id,
768            } => f
769                .debug_struct("Handshake")
770                .field("protocol_version", protocol_version)
771                .field("client_id", client_id)
772                .finish(),
773            Self::JoinGroup {
774                group_id,
775                member_id,
776                ..
777            } => f
778                .debug_struct("JoinGroup")
779                .field("group_id", group_id)
780                .field("member_id", member_id)
781                .finish(),
782            Self::SyncGroup {
783                group_id,
784                generation_id,
785                member_id,
786                ..
787            } => f
788                .debug_struct("SyncGroup")
789                .field("group_id", group_id)
790                .field("generation_id", generation_id)
791                .field("member_id", member_id)
792                .finish(),
793            Self::Heartbeat {
794                group_id,
795                generation_id,
796                member_id,
797            } => f
798                .debug_struct("Heartbeat")
799                .field("group_id", group_id)
800                .field("generation_id", generation_id)
801                .field("member_id", member_id)
802                .finish(),
803            Self::LeaveGroup {
804                group_id,
805                member_id,
806            } => f
807                .debug_struct("LeaveGroup")
808                .field("group_id", group_id)
809                .field("member_id", member_id)
810                .finish(),
811            Self::PublishBatch {
812                topic,
813                partition,
814                records,
815                leader_epoch,
816            } => f
817                .debug_struct("PublishBatch")
818                .field("topic", topic)
819                .field("partition", partition)
820                .field("record_count", &records.len())
821                .field("leader_epoch", leader_epoch)
822                .finish(),
823            Self::IdempotentPublishBatch {
824                topic,
825                partition,
826                records,
827                producer_id,
828                producer_epoch,
829                base_sequence,
830                leader_epoch,
831            } => f
832                .debug_struct("IdempotentPublishBatch")
833                .field("topic", topic)
834                .field("partition", partition)
835                .field("record_count", &records.len())
836                .field("producer_id", producer_id)
837                .field("producer_epoch", producer_epoch)
838                .field("base_sequence", base_sequence)
839                .field("leader_epoch", leader_epoch)
840                .finish(),
841        }
842    }
843}
844
845/// Protocol response messages
846///
847/// # Stability
848///
849/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
850/// Adding new variants should only be done at the end of the enum.
851#[derive(Debug, Clone, Serialize, Deserialize)]
852pub enum Response {
853    /// Authentication successful
854    Authenticated {
855        /// Session token for subsequent requests
856        session_id: String,
857        /// Session timeout in seconds
858        expires_in: u64,
859    },
860
861    /// SCRAM-SHA-256: Server-first message (challenge)
862    ScramServerFirst {
863        /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
864        #[serde(with = "crate::serde_utils::bytes_serde")]
865        message: Bytes,
866    },
867
868    /// SCRAM-SHA-256: Server-final message (verification or error)
869    ScramServerFinal {
870        /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
871        #[serde(with = "crate::serde_utils::bytes_serde")]
872        message: Bytes,
873        /// Session ID (if authentication succeeded)
874        session_id: Option<String>,
875        /// Session timeout in seconds (if authentication succeeded)
876        expires_in: Option<u64>,
877    },
878
879    /// Success response with offset
880    Published { offset: u64, partition: u32 },
881
882    /// Messages response
883    Messages { messages: Vec<MessageData> },
884
885    /// Topic created
886    TopicCreated { name: String, partitions: u32 },
887
888    /// List of topics
889    Topics { topics: Vec<String> },
890
891    /// Topic deleted
892    TopicDeleted,
893
894    /// Offset committed
895    OffsetCommitted,
896
897    /// Offset response
898    Offset { offset: Option<u64> },
899
900    /// Metadata
901    Metadata { name: String, partitions: u32 },
902
903    /// Full cluster metadata for topic(s)
904    ClusterMetadata {
905        /// Controller node ID (Raft leader)
906        controller_id: Option<String>,
907        /// Broker/node list
908        brokers: Vec<BrokerInfo>,
909        /// Topic metadata
910        topics: Vec<TopicMetadata>,
911    },
912
913    /// Pong
914    Pong,
915
916    /// Offset bounds for a partition
917    OffsetBounds { earliest: u64, latest: u64 },
918
919    /// List of consumer groups
920    Groups { groups: Vec<String> },
921
922    /// Consumer group details with all offsets
923    GroupDescription {
924        consumer_group: String,
925        /// topic → partition → offset
926        offsets: HashMap<String, HashMap<u32, u64>>,
927    },
928
929    /// Consumer group deleted
930    GroupDeleted,
931
932    /// Offset for a timestamp
933    OffsetForTimestamp {
934        /// The first offset with timestamp >= the requested timestamp
935        /// None if no matching offset was found
936        offset: Option<u64>,
937    },
938
939    /// Error response
940    Error { message: String },
941
942    /// Success
943    Ok,
944
945    // =========================================================================
946    // Idempotent Producer
947    // =========================================================================
948    /// Producer ID initialized
949    ProducerIdInitialized {
950        /// Assigned or existing producer ID
951        producer_id: u64,
952        /// Current epoch (increments on reconnect)
953        producer_epoch: u16,
954    },
955
956    /// Idempotent publish result
957    IdempotentPublished {
958        /// Offset where message was written
959        offset: u64,
960        /// Partition the message was written to
961        partition: u32,
962        /// Whether this was a duplicate (message already existed)
963        duplicate: bool,
964    },
965
966    // =========================================================================
967    // Native Transactions
968    // =========================================================================
969    /// Transaction started successfully
970    TransactionStarted {
971        /// Transaction ID
972        txn_id: String,
973    },
974
975    /// Partitions added to transaction
976    PartitionsAddedToTxn {
977        /// Transaction ID
978        txn_id: String,
979        /// Number of partitions now in transaction
980        partition_count: usize,
981    },
982
983    /// Transactional publish result
984    TransactionalPublished {
985        /// Offset where message was written (pending commit)
986        offset: u64,
987        /// Partition the message was written to
988        partition: u32,
989        /// Sequence number accepted
990        sequence: i32,
991    },
992
993    /// Offsets added to transaction
994    OffsetsAddedToTxn {
995        /// Transaction ID
996        txn_id: String,
997    },
998
999    /// Transaction committed
1000    TransactionCommitted {
1001        /// Transaction ID
1002        txn_id: String,
1003    },
1004
1005    /// Transaction aborted
1006    TransactionAborted {
1007        /// Transaction ID
1008        txn_id: String,
1009    },
1010
1011    // =========================================================================
1012    // Per-Principal Quotas (Kafka Parity)
1013    // =========================================================================
1014    /// Quota descriptions
1015    QuotasDescribed {
1016        /// List of quota entries
1017        entries: Vec<QuotaEntry>,
1018    },
1019
1020    /// Quotas altered successfully
1021    QuotasAltered {
1022        /// Number of quota alterations applied
1023        altered_count: usize,
1024    },
1025
1026    /// Throttle response (returned when quota exceeded)
1027    Throttled {
1028        /// Time to wait before retrying (milliseconds)
1029        throttle_time_ms: u64,
1030        /// Quota type that was exceeded
1031        quota_type: String,
1032        /// Entity that exceeded quota
1033        entity: String,
1034    },
1035
1036    // =========================================================================
1037    // Admin API (Kafka Parity)
1038    // =========================================================================
1039    /// Topic configuration altered
1040    TopicConfigAltered {
1041        /// Topic name
1042        topic: String,
1043        /// Number of configurations changed
1044        changed_count: usize,
1045    },
1046
1047    /// Partitions created
1048    PartitionsCreated {
1049        /// Topic name
1050        topic: String,
1051        /// New total partition count
1052        new_partition_count: u32,
1053    },
1054
1055    /// Records deleted
1056    RecordsDeleted {
1057        /// Topic name
1058        topic: String,
1059        /// Results per partition
1060        results: Vec<DeleteRecordsResult>,
1061    },
1062
1063    /// Topic configurations described
1064    TopicConfigsDescribed {
1065        /// Configuration descriptions per topic
1066        configs: Vec<TopicConfigDescription>,
1067    },
1068
1069    /// Protocol version handshake response
1070    HandshakeResult {
1071        /// Server's protocol version
1072        server_version: u32,
1073        /// Whether the client version is compatible
1074        compatible: bool,
1075        /// Human-readable message (e.g. reason for incompatibility)
1076        message: String,
1077    },
1078
1079    // =========================================================================
1080    // Consumer Group Coordination
1081    // =========================================================================
1082    /// JoinGroup response — returned to each member after all members
1083    /// have joined (or the rebalance timeout expires).
1084    JoinGroupResult {
1085        /// Generation ID (monotonically increasing; bumped on each rebalance)
1086        generation_id: u32,
1087        /// Protocol type (e.g. "consumer")
1088        protocol_type: String,
1089        /// This member's assigned member ID
1090        member_id: String,
1091        /// The leader's member ID (if member_id == leader_id, this member is leader)
1092        leader_id: String,
1093        /// All members in the group (only populated for the leader)
1094        /// Each entry is (member_id, subscriptions)
1095        members: Vec<(String, Vec<String>)>,
1096    },
1097
1098    /// SyncGroup response — partition assignments for this member.
1099    SyncGroupResult {
1100        /// This member's partition assignments: `Vec<(topic, Vec<partition>)>`
1101        assignments: Vec<(String, Vec<u32>)>,
1102    },
1103
1104    /// Heartbeat response — signals whether the member should rejoin.
1105    HeartbeatResult {
1106        /// Error code: 0 = OK, 27 = REBALANCE_IN_PROGRESS (rejoin needed)
1107        error_code: i32,
1108    },
1109
1110    /// LeaveGroup response
1111    LeaveGroupResult,
1112
1113    // =========================================================================
1114    // Batch Publish Responses
1115    // =========================================================================
1116    /// Result of a [`Request::PublishBatch`].
1117    ///
1118    /// All records are written to contiguous offsets starting at `base_offset`.
1119    PublishedBatch {
1120        /// Offset of the first record in the batch
1121        base_offset: u64,
1122        /// Partition the records were written to
1123        partition: u32,
1124        /// Number of records successfully written
1125        record_count: u32,
1126    },
1127
1128    /// Result of a [`Request::IdempotentPublishBatch`].
1129    IdempotentPublishedBatch {
1130        /// Offset of the first record in the batch
1131        base_offset: u64,
1132        /// Partition the records were written to
1133        partition: u32,
1134        /// Number of records successfully written
1135        record_count: u32,
1136        /// Whether this was a duplicate batch
1137        duplicate: bool,
1138    },
1139}
1140
1141impl Request {
1142    /// Serialize request to bytes (postcard format, no format prefix)
1143    ///
1144    /// For internal Rust-to-Rust communication where format is known.
1145    /// Use `to_wire()` for wire transmission with format detection support.
1146    #[inline]
1147    pub fn to_bytes(&self) -> Result<Vec<u8>> {
1148        Ok(postcard::to_allocvec(self)?)
1149    }
1150
1151    /// Deserialize request from bytes (postcard format)
1152    ///
1153    /// For internal Rust-to-Rust communication where format is known.
1154    /// Use `from_wire()` for wire transmission with format detection support.
1155    #[inline]
1156    pub fn from_bytes(data: &[u8]) -> Result<Self> {
1157        let request: Self = postcard::from_bytes(data)?;
1158        request.validate()?;
1159        Ok(request)
1160    }
1161
1162    /// Post-deserialization bounds validation.
1163    ///
1164    /// Prevents crafted payloads with oversized strings / vecs from being
1165    /// accepted. Must be called after every `from_bytes` / `from_wire`.
1166    #[allow(deprecated)]
1167    pub fn validate(&self) -> Result<()> {
1168        use crate::{MAX_AUTH_PAYLOAD, MAX_LIST_LEN, MAX_NAME_LEN};
1169
1170        let check_name = |s: &str, field: &str| -> Result<()> {
1171            if s.len() > MAX_NAME_LEN {
1172                return Err(crate::ProtocolError::InvalidFormat(format!(
1173                    "{field} exceeds max length ({} > {MAX_NAME_LEN})",
1174                    s.len()
1175                )));
1176            }
1177            Ok(())
1178        };
1179        let check_list = |len: usize, field: &str| -> Result<()> {
1180            if len > MAX_LIST_LEN {
1181                return Err(crate::ProtocolError::InvalidFormat(format!(
1182                    "{field} list exceeds max length ({len} > {MAX_LIST_LEN})"
1183                )));
1184            }
1185            Ok(())
1186        };
1187
1188        match self {
1189            Request::Authenticate {
1190                username, password, ..
1191            } => {
1192                check_name(username, "username")?;
1193                if password.len() > MAX_AUTH_PAYLOAD {
1194                    return Err(crate::ProtocolError::InvalidFormat(
1195                        "password exceeds max auth payload".into(),
1196                    ));
1197                }
1198            }
1199            Request::SaslAuthenticate {
1200                mechanism,
1201                auth_bytes,
1202            } => {
1203                if mechanism.len() > MAX_NAME_LEN {
1204                    return Err(crate::ProtocolError::InvalidFormat(
1205                        "SASL mechanism name too long".into(),
1206                    ));
1207                }
1208                if auth_bytes.len() > MAX_AUTH_PAYLOAD {
1209                    return Err(crate::ProtocolError::InvalidFormat(
1210                        "SASL auth bytes exceed max auth payload".into(),
1211                    ));
1212                }
1213            }
1214            Request::ScramClientFirst { message } | Request::ScramClientFinal { message } => {
1215                if message.len() > MAX_AUTH_PAYLOAD {
1216                    return Err(crate::ProtocolError::InvalidFormat(
1217                        "SCRAM message exceeds max auth payload".into(),
1218                    ));
1219                }
1220            }
1221            Request::Publish { topic, .. } => check_name(topic, "topic")?,
1222            Request::Consume { topic, .. } => check_name(topic, "topic")?,
1223            Request::CreateTopic { name, .. } => check_name(name, "topic")?,
1224            Request::DeleteTopic { name } => check_name(name, "topic")?,
1225            Request::CommitOffset {
1226                consumer_group,
1227                topic,
1228                ..
1229            } => {
1230                check_name(consumer_group, "consumer_group")?;
1231                check_name(topic, "topic")?;
1232            }
1233            Request::GetOffset {
1234                consumer_group,
1235                topic,
1236                ..
1237            } => {
1238                check_name(consumer_group, "consumer_group")?;
1239                check_name(topic, "topic")?;
1240            }
1241            Request::GetMetadata { topic } => check_name(topic, "topic")?,
1242            Request::GetClusterMetadata { topics, .. } => {
1243                check_list(topics.len(), "topics")?;
1244                for t in topics {
1245                    check_name(t, "topic")?;
1246                }
1247            }
1248            Request::DescribeGroup { consumer_group } | Request::DeleteGroup { consumer_group } => {
1249                check_name(consumer_group, "consumer_group")?;
1250            }
1251            Request::GetOffsetForTimestamp { topic, .. } => check_name(topic, "topic")?,
1252            Request::InitProducerId { .. } => {}
1253            Request::IdempotentPublish { topic, .. } => check_name(topic, "topic")?,
1254            Request::BeginTransaction { txn_id, .. } => check_name(txn_id, "txn_id")?,
1255            Request::AddPartitionsToTxn {
1256                txn_id, partitions, ..
1257            } => {
1258                check_name(txn_id, "txn_id")?;
1259                check_list(partitions.len(), "partitions")?;
1260            }
1261            Request::TransactionalPublish { txn_id, topic, .. } => {
1262                check_name(txn_id, "txn_id")?;
1263                check_name(topic, "topic")?;
1264            }
1265            Request::AddOffsetsToTxn {
1266                txn_id, offsets, ..
1267            } => {
1268                check_name(txn_id, "txn_id")?;
1269                check_list(offsets.len(), "offsets")?;
1270            }
1271            Request::CommitTransaction { txn_id, .. }
1272            | Request::AbortTransaction { txn_id, .. } => {
1273                check_name(txn_id, "txn_id")?;
1274            }
1275            Request::DescribeQuotas { entities } => {
1276                check_list(entities.len(), "entities")?;
1277            }
1278            Request::AlterQuotas { alterations, .. } => {
1279                check_list(alterations.len(), "alterations")?;
1280            }
1281            Request::AlterTopicConfig { topic, configs, .. } => {
1282                check_name(topic, "topic")?;
1283                check_list(configs.len(), "configs")?;
1284            }
1285            Request::CreatePartitions {
1286                topic, assignments, ..
1287            } => {
1288                check_name(topic, "topic")?;
1289                check_list(assignments.len(), "assignments")?;
1290            }
1291            Request::DeleteRecords {
1292                topic,
1293                partition_offsets,
1294            } => {
1295                check_name(topic, "topic")?;
1296                check_list(partition_offsets.len(), "partition_offsets")?;
1297            }
1298            Request::DescribeTopicConfigs { topics } => {
1299                check_list(topics.len(), "topics")?;
1300                for t in topics {
1301                    check_name(t, "topic")?;
1302                }
1303            }
1304            Request::Handshake { client_id, .. } => {
1305                check_name(client_id, "client_id")?;
1306            }
1307            Request::JoinGroup {
1308                group_id,
1309                member_id,
1310                protocol_type,
1311                subscriptions,
1312                ..
1313            } => {
1314                check_name(group_id, "group_id")?;
1315                check_name(member_id, "member_id")?;
1316                check_name(protocol_type, "protocol_type")?;
1317                check_list(subscriptions.len(), "subscriptions")?;
1318                for t in subscriptions {
1319                    check_name(t, "subscription_topic")?;
1320                }
1321            }
1322            Request::SyncGroup {
1323                group_id,
1324                member_id,
1325                assignments,
1326                ..
1327            } => {
1328                check_name(group_id, "group_id")?;
1329                check_name(member_id, "member_id")?;
1330                check_list(assignments.len(), "assignments")?;
1331                for (mid, topic_parts) in assignments {
1332                    check_name(mid, "assignment_member_id")?;
1333                    check_list(topic_parts.len(), "topic_partitions")?;
1334                    for (topic, parts) in topic_parts {
1335                        check_name(topic, "assignment_topic")?;
1336                        check_list(parts.len(), "partitions")?;
1337                    }
1338                }
1339            }
1340            Request::Heartbeat {
1341                group_id,
1342                member_id,
1343                ..
1344            } => {
1345                check_name(group_id, "group_id")?;
1346                check_name(member_id, "member_id")?;
1347            }
1348            Request::LeaveGroup {
1349                group_id,
1350                member_id,
1351            } => {
1352                check_name(group_id, "group_id")?;
1353                check_name(member_id, "member_id")?;
1354            }
1355            Request::PublishBatch { topic, records, .. } => {
1356                check_name(topic, "topic")?;
1357                check_list(records.len(), "records")?;
1358            }
1359            Request::IdempotentPublishBatch { topic, records, .. } => {
1360                check_name(topic, "topic")?;
1361                check_list(records.len(), "records")?;
1362            }
1363            // Enum variants with no string/vec payload need no validation.
1364            _ => {}
1365        }
1366        Ok(())
1367    }
1368
1369    /// Serialize request with wire format prefix
1370    ///
1371    /// Wire format: `[format_byte][correlation_id (4 bytes BE)][payload]`
1372    /// - format_byte: 0x00 = postcard, 0x01 = protobuf
1373    /// - correlation_id: 4-byte big-endian u32 for request-response matching
1374    /// - payload: serialized message
1375    ///
1376    /// ## Correlation ID sizing
1377    ///
1378    /// The wire protocol uses `u32` for correlation IDs (4 bytes), which is
1379    /// the Kafka-compatible choice and sufficient for client-server RPCs
1380    /// (4 billion in-flight requests before wrap-around). The cluster-internal
1381    /// protocol (`rivven-cluster`) uses `u64` for its own RPC correlation to
1382    /// avoid any wrap-around concern on high-throughput inter-node links.
1383    /// The two namespaces are independent and never cross boundaries.
1384    ///
1385    /// Note: Length prefix is NOT included (handled by transport layer)
1386    ///
1387    /// # Errors
1388    ///
1389    /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
1390    /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
1391    #[inline]
1392    pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
1393        let result = match format {
1394            crate::WireFormat::Postcard => {
1395                // Single allocation — serialize directly into the output
1396                // Vec via `postcard::to_extend` instead of double-allocating
1397                // (to_allocvec → intermediate Vec → copy into result Vec).
1398                let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + 128);
1399                result.push(format.as_byte());
1400                result.extend_from_slice(&correlation_id.to_be_bytes());
1401                postcard::to_extend(self, result)?
1402            }
1403            crate::WireFormat::Protobuf => {
1404                // Protobuf requires the `protobuf` feature
1405                #[cfg(feature = "protobuf")]
1406                {
1407                    let payload = self.to_proto_bytes()?;
1408                    let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
1409                    result.push(format.as_byte());
1410                    result.extend_from_slice(&correlation_id.to_be_bytes());
1411                    result.extend_from_slice(&payload);
1412                    result
1413                }
1414                #[cfg(not(feature = "protobuf"))]
1415                {
1416                    return Err(crate::ProtocolError::Serialization(
1417                        "Protobuf support requires the 'protobuf' feature".into(),
1418                    ));
1419                }
1420            }
1421        };
1422
1423        // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
1424        if result.len() > crate::MAX_MESSAGE_SIZE {
1425            return Err(crate::ProtocolError::MessageTooLarge(
1426                result.len(),
1427                crate::MAX_MESSAGE_SIZE,
1428            ));
1429        }
1430
1431        Ok(result)
1432    }
1433
1434    /// Deserialize request with format auto-detection
1435    ///
1436    /// Detects format from first byte, reads correlation_id, and deserializes accordingly.
1437    /// Returns the deserialized request, the detected format, and the correlation_id.
1438    #[inline]
1439    pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
1440        if data.len() < crate::WIRE_HEADER_SIZE {
1441            return Err(crate::ProtocolError::Serialization(
1442                "Wire data too short (need format byte + correlation_id)".into(),
1443            ));
1444        }
1445
1446        let format_byte = data[0];
1447        let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1448            crate::ProtocolError::Serialization(format!(
1449                "Unknown wire format: 0x{:02x}",
1450                format_byte
1451            ))
1452        })?;
1453
1454        let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1455        let payload = &data[crate::WIRE_HEADER_SIZE..];
1456
1457        match format {
1458            crate::WireFormat::Postcard => {
1459                let request: Self = postcard::from_bytes(payload)?;
1460                request.validate()?;
1461                Ok((request, format, correlation_id))
1462            }
1463            crate::WireFormat::Protobuf => {
1464                #[cfg(feature = "protobuf")]
1465                {
1466                    let request = Self::from_proto_bytes(payload)?;
1467                    Ok((request, format, correlation_id))
1468                }
1469                #[cfg(not(feature = "protobuf"))]
1470                {
1471                    Err(crate::ProtocolError::Serialization(
1472                        "Protobuf support requires the 'protobuf' feature".into(),
1473                    ))
1474                }
1475            }
1476        }
1477    }
1478}
1479
1480impl Response {
1481    /// Serialize response to bytes (postcard format, no format prefix)
1482    #[inline]
1483    pub fn to_bytes(&self) -> Result<Vec<u8>> {
1484        Ok(postcard::to_allocvec(self)?)
1485    }
1486
1487    /// Deserialize response from bytes (postcard format)
1488    #[inline]
1489    pub fn from_bytes(data: &[u8]) -> Result<Self> {
1490        let response: Self = postcard::from_bytes(data)?;
1491        response.validate()?;
1492        Ok(response)
1493    }
1494
1495    /// Post-deserialization bounds validation for responses.
1496    pub fn validate(&self) -> Result<()> {
1497        use crate::{MAX_LIST_LEN, MAX_MESSAGES_PER_RESPONSE, MAX_NAME_LEN};
1498
1499        match self {
1500            Response::Messages { messages } => {
1501                if messages.len() > MAX_MESSAGES_PER_RESPONSE {
1502                    return Err(crate::ProtocolError::InvalidFormat(format!(
1503                        "messages batch size {} exceeds max {MAX_MESSAGES_PER_RESPONSE}",
1504                        messages.len()
1505                    )));
1506                }
1507            }
1508            Response::Topics { topics } | Response::Groups { groups: topics } => {
1509                if topics.len() > MAX_LIST_LEN {
1510                    return Err(crate::ProtocolError::InvalidFormat(format!(
1511                        "list length {} exceeds max {MAX_LIST_LEN}",
1512                        topics.len()
1513                    )));
1514                }
1515                for t in topics {
1516                    if t.len() > MAX_NAME_LEN {
1517                        return Err(crate::ProtocolError::InvalidFormat(format!(
1518                            "name length {} exceeds max {MAX_NAME_LEN}",
1519                            t.len()
1520                        )));
1521                    }
1522                }
1523            }
1524            Response::ClusterMetadata {
1525                brokers, topics, ..
1526            } => {
1527                if brokers.len() > MAX_LIST_LEN {
1528                    return Err(crate::ProtocolError::InvalidFormat(
1529                        "brokers list too large".into(),
1530                    ));
1531                }
1532                if topics.len() > MAX_LIST_LEN {
1533                    return Err(crate::ProtocolError::InvalidFormat(
1534                        "topics list too large".into(),
1535                    ));
1536                }
1537            }
1538            Response::RecordsDeleted { results, .. } => {
1539                if results.len() > MAX_LIST_LEN {
1540                    return Err(crate::ProtocolError::InvalidFormat(
1541                        "delete results list too large".into(),
1542                    ));
1543                }
1544            }
1545            Response::Error { message } => {
1546                if message.len() > crate::MAX_MESSAGE_SIZE {
1547                    return Err(crate::ProtocolError::InvalidFormat(
1548                        "error message exceeds max message size".into(),
1549                    ));
1550                }
1551            }
1552            _ => {}
1553        }
1554        Ok(())
1555    }
1556
1557    /// Serialize response with wire format prefix
1558    ///
1559    /// # Errors
1560    ///
1561    /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
1562    /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
1563    #[inline]
1564    pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
1565        let result = match format {
1566            crate::WireFormat::Postcard => {
1567                // Estimate payload size to avoid reallocations.
1568                // For Messages responses, use message count × estimated per-message
1569                // size (offset 8 + partition 4 + value ~256 + timestamp 8 + overhead ~24 ≈ 300 bytes).
1570                // For other variants the default 128-byte hint is usually sufficient.
1571                let size_hint = match self {
1572                    Response::Messages { messages } => messages.len().saturating_mul(300).max(128),
1573                    _ => 128,
1574                };
1575                let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + size_hint);
1576                result.push(format.as_byte());
1577                result.extend_from_slice(&correlation_id.to_be_bytes());
1578                postcard::to_extend(self, result)?
1579            }
1580            crate::WireFormat::Protobuf => {
1581                #[cfg(feature = "protobuf")]
1582                {
1583                    let payload = self.to_proto_bytes()?;
1584                    let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
1585                    result.push(format.as_byte());
1586                    result.extend_from_slice(&correlation_id.to_be_bytes());
1587                    result.extend_from_slice(&payload);
1588                    result
1589                }
1590                #[cfg(not(feature = "protobuf"))]
1591                {
1592                    return Err(crate::ProtocolError::Serialization(
1593                        "Protobuf support requires the 'protobuf' feature".into(),
1594                    ));
1595                }
1596            }
1597        };
1598
1599        // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
1600        if result.len() > crate::MAX_MESSAGE_SIZE {
1601            return Err(crate::ProtocolError::MessageTooLarge(
1602                result.len(),
1603                crate::MAX_MESSAGE_SIZE,
1604            ));
1605        }
1606
1607        Ok(result)
1608    }
1609
1610    /// Deserialize response with format auto-detection
1611    #[inline]
1612    pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
1613        if data.len() < crate::WIRE_HEADER_SIZE {
1614            return Err(crate::ProtocolError::Serialization(
1615                "Wire data too short (need format byte + correlation_id)".into(),
1616            ));
1617        }
1618
1619        let format_byte = data[0];
1620        let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1621            crate::ProtocolError::Serialization(format!(
1622                "Unknown wire format: 0x{:02x}",
1623                format_byte
1624            ))
1625        })?;
1626
1627        let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1628        let payload = &data[crate::WIRE_HEADER_SIZE..];
1629
1630        match format {
1631            crate::WireFormat::Postcard => {
1632                let response: Self = postcard::from_bytes(payload)?;
1633                response.validate()?;
1634                Ok((response, format, correlation_id))
1635            }
1636            crate::WireFormat::Protobuf => {
1637                #[cfg(feature = "protobuf")]
1638                {
1639                    let response = Self::from_proto_bytes(payload)?;
1640                    Ok((response, format, correlation_id))
1641                }
1642                #[cfg(not(feature = "protobuf"))]
1643                {
1644                    Err(crate::ProtocolError::Serialization(
1645                        "Protobuf support requires the 'protobuf' feature".into(),
1646                    ))
1647                }
1648            }
1649        }
1650    }
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655    use super::*;
1656
1657    #[test]
1658    #[allow(deprecated)]
1659    fn test_request_roundtrip() {
1660        let requests = vec![
1661            Request::Ping,
1662            Request::ListTopics,
1663            Request::CreateTopic {
1664                name: "test".to_string(),
1665                partitions: Some(4),
1666            },
1667            Request::Authenticate {
1668                username: "admin".to_string(),
1669                password: "secret".to_string(),
1670                require_tls: true,
1671            },
1672        ];
1673
1674        for req in requests {
1675            let bytes = req.to_bytes().unwrap();
1676            let decoded = Request::from_bytes(&bytes).unwrap();
1677            // Can't directly compare due to Debug, but serialization should succeed
1678            assert!(!bytes.is_empty());
1679            let _ = decoded; // Use decoded
1680        }
1681    }
1682
1683    #[test]
1684    fn test_response_roundtrip() {
1685        let responses = vec![
1686            Response::Pong,
1687            Response::Ok,
1688            Response::Topics {
1689                topics: vec!["a".to_string(), "b".to_string()],
1690            },
1691            Response::Error {
1692                message: "test error".to_string(),
1693            },
1694        ];
1695
1696        for resp in responses {
1697            let bytes = resp.to_bytes().unwrap();
1698            let decoded = Response::from_bytes(&bytes).unwrap();
1699            assert!(!bytes.is_empty());
1700            let _ = decoded;
1701        }
1702    }
1703
1704    #[test]
1705    fn test_request_wire_roundtrip() {
1706        let request = Request::Ping;
1707
1708        // Serialize with format prefix and correlation_id
1709        let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 42).unwrap();
1710
1711        // First byte should be format identifier
1712        assert_eq!(wire_bytes[0], 0x00); // Postcard format
1713
1714        // Deserialize with auto-detection
1715        let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1716        assert_eq!(format, crate::WireFormat::Postcard);
1717        assert_eq!(correlation_id, 42);
1718        assert!(matches!(decoded, Request::Ping));
1719    }
1720
1721    #[test]
1722    fn test_response_wire_roundtrip() {
1723        let response = Response::Pong;
1724
1725        // Serialize with format prefix and correlation_id
1726        let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 99).unwrap();
1727
1728        // First byte should be format identifier
1729        assert_eq!(wire_bytes[0], 0x00); // Postcard format
1730
1731        // Deserialize with auto-detection
1732        let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1733        assert_eq!(format, crate::WireFormat::Postcard);
1734        assert_eq!(correlation_id, 99);
1735        assert!(matches!(decoded, Response::Pong));
1736    }
1737
1738    #[test]
1739    fn test_wire_format_empty_data() {
1740        let result = Request::from_wire(&[]);
1741        assert!(result.is_err());
1742    }
1743
1744    #[test]
1745    fn test_wire_format_complex_request() {
1746        use bytes::Bytes;
1747
1748        let request = Request::Publish {
1749            topic: "test-topic".to_string(),
1750            partition: Some(3),
1751            key: Some(Bytes::from("my-key")),
1752            value: Bytes::from("hello world"),
1753            leader_epoch: None,
1754        };
1755
1756        let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 1).unwrap();
1757        assert_eq!(wire_bytes[0], 0x00);
1758
1759        let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1760        assert_eq!(format, crate::WireFormat::Postcard);
1761        assert_eq!(correlation_id, 1);
1762
1763        // Verify the decoded request matches
1764        if let Request::Publish {
1765            topic, partition, ..
1766        } = decoded
1767        {
1768            assert_eq!(topic, "test-topic");
1769            assert_eq!(partition, Some(3));
1770        } else {
1771            panic!("Expected Publish request");
1772        }
1773    }
1774
1775    #[test]
1776    fn test_wire_format_complex_response() {
1777        let response = Response::Published {
1778            offset: 12345,
1779            partition: 7,
1780        };
1781
1782        let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 2).unwrap();
1783        assert_eq!(wire_bytes[0], 0x00);
1784
1785        let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1786        assert_eq!(format, crate::WireFormat::Postcard);
1787        assert_eq!(correlation_id, 2);
1788
1789        if let Response::Published { offset, partition } = decoded {
1790            assert_eq!(offset, 12345);
1791            assert_eq!(partition, 7);
1792        } else {
1793            panic!("Expected Published response");
1794        }
1795    }
1796
1797    /// Snapshot test: postcard serializes enum variants by ordinal position.
1798    /// If someone reorders variants in Request or Response, this test fails
1799    /// because the byte prefix (discriminant) will change, breaking wire compat.
1800    ///
1801    /// Exhaustive — every Request variant is pinned.
1802    #[test]
1803    #[allow(deprecated)]
1804    fn test_postcard_wire_stability_request_discriminants() {
1805        use bytes::Bytes;
1806
1807        // Serialize a representative of each variant and check the leading
1808        // discriminant byte(s) haven't shifted. Postcard uses varint encoding
1809        // for enum discriminants.
1810        let test_cases: Vec<(Request, u8)> = vec![
1811            // variant 0: Authenticate
1812            (
1813                Request::Authenticate {
1814                    username: "u".into(),
1815                    password: "p".into(),
1816                    require_tls: false,
1817                },
1818                0,
1819            ),
1820            // variant 1: SaslAuthenticate
1821            (
1822                Request::SaslAuthenticate {
1823                    mechanism: Bytes::from("PLAIN"),
1824                    auth_bytes: Bytes::from("data"),
1825                },
1826                1,
1827            ),
1828            // variant 2: ScramClientFirst
1829            (
1830                Request::ScramClientFirst {
1831                    message: Bytes::from("n,,n=user,r=nonce"),
1832                },
1833                2,
1834            ),
1835            // variant 3: ScramClientFinal
1836            (
1837                Request::ScramClientFinal {
1838                    message: Bytes::from("c=bind,r=nonce,p=proof"),
1839                },
1840                3,
1841            ),
1842            // variant 4: Publish
1843            (
1844                Request::Publish {
1845                    topic: "t".into(),
1846                    partition: None,
1847                    key: None,
1848                    value: Bytes::from("v"),
1849                    leader_epoch: None,
1850                },
1851                4,
1852            ),
1853            // variant 5: Consume
1854            (
1855                Request::Consume {
1856                    topic: "t".into(),
1857                    partition: 0,
1858                    offset: 0,
1859                    max_messages: 1,
1860                    isolation_level: None,
1861                    max_wait_ms: None,
1862                },
1863                5,
1864            ),
1865            // variant 6: CreateTopic
1866            (
1867                Request::CreateTopic {
1868                    name: "t".into(),
1869                    partitions: None,
1870                },
1871                6,
1872            ),
1873            // variant 7: ListTopics
1874            (Request::ListTopics, 7),
1875            // variant 8: DeleteTopic
1876            (Request::DeleteTopic { name: "t".into() }, 8),
1877            // variant 9: CommitOffset
1878            (
1879                Request::CommitOffset {
1880                    consumer_group: "g".into(),
1881                    topic: "t".into(),
1882                    partition: 0,
1883                    offset: 0,
1884                },
1885                9,
1886            ),
1887            // variant 10: GetOffset
1888            (
1889                Request::GetOffset {
1890                    consumer_group: "g".into(),
1891                    topic: "t".into(),
1892                    partition: 0,
1893                },
1894                10,
1895            ),
1896            // variant 11: GetMetadata
1897            (Request::GetMetadata { topic: "t".into() }, 11),
1898            // variant 12: GetClusterMetadata
1899            (Request::GetClusterMetadata { topics: vec![] }, 12),
1900            // variant 13: Ping
1901            (Request::Ping, 13),
1902            // variant 14: GetOffsetBounds
1903            (
1904                Request::GetOffsetBounds {
1905                    topic: "t".into(),
1906                    partition: 0,
1907                },
1908                14,
1909            ),
1910            // variant 15: ListGroups
1911            (Request::ListGroups, 15),
1912            // variant 16: DescribeGroup
1913            (
1914                Request::DescribeGroup {
1915                    consumer_group: "g".into(),
1916                },
1917                16,
1918            ),
1919            // variant 17: DeleteGroup
1920            (
1921                Request::DeleteGroup {
1922                    consumer_group: "g".into(),
1923                },
1924                17,
1925            ),
1926            // variant 18: GetOffsetForTimestamp
1927            (
1928                Request::GetOffsetForTimestamp {
1929                    topic: "t".into(),
1930                    partition: 0,
1931                    timestamp_ms: 0,
1932                },
1933                18,
1934            ),
1935            // variant 19: InitProducerId
1936            (Request::InitProducerId { producer_id: None }, 19),
1937            // variant 20: IdempotentPublish
1938            (
1939                Request::IdempotentPublish {
1940                    topic: "t".into(),
1941                    partition: None,
1942                    key: None,
1943                    value: Bytes::from("v"),
1944                    producer_id: 1,
1945                    producer_epoch: 0,
1946                    sequence: 0,
1947                    leader_epoch: None,
1948                },
1949                20,
1950            ),
1951            // variant 21: BeginTransaction
1952            (
1953                Request::BeginTransaction {
1954                    txn_id: "tx".into(),
1955                    producer_id: 1,
1956                    producer_epoch: 0,
1957                    timeout_ms: None,
1958                },
1959                21,
1960            ),
1961            // variant 22: AddPartitionsToTxn
1962            (
1963                Request::AddPartitionsToTxn {
1964                    txn_id: "tx".into(),
1965                    producer_id: 1,
1966                    producer_epoch: 0,
1967                    partitions: vec![],
1968                },
1969                22,
1970            ),
1971            // variant 23: TransactionalPublish
1972            (
1973                Request::TransactionalPublish {
1974                    txn_id: "tx".into(),
1975                    topic: "t".into(),
1976                    partition: None,
1977                    key: None,
1978                    value: Bytes::from("v"),
1979                    producer_id: 1,
1980                    producer_epoch: 0,
1981                    sequence: 0,
1982                    leader_epoch: None,
1983                },
1984                23,
1985            ),
1986            // variant 24: AddOffsetsToTxn
1987            (
1988                Request::AddOffsetsToTxn {
1989                    txn_id: "tx".into(),
1990                    producer_id: 1,
1991                    producer_epoch: 0,
1992                    group_id: "g".into(),
1993                    offsets: vec![],
1994                },
1995                24,
1996            ),
1997            // variant 25: CommitTransaction
1998            (
1999                Request::CommitTransaction {
2000                    txn_id: "tx".into(),
2001                    producer_id: 1,
2002                    producer_epoch: 0,
2003                },
2004                25,
2005            ),
2006            // variant 26: AbortTransaction
2007            (
2008                Request::AbortTransaction {
2009                    txn_id: "tx".into(),
2010                    producer_id: 1,
2011                    producer_epoch: 0,
2012                },
2013                26,
2014            ),
2015            // variant 27: DescribeQuotas
2016            (Request::DescribeQuotas { entities: vec![] }, 27),
2017            // variant 28: AlterQuotas
2018            (
2019                Request::AlterQuotas {
2020                    alterations: vec![],
2021                },
2022                28,
2023            ),
2024            // variant 29: AlterTopicConfig
2025            (
2026                Request::AlterTopicConfig {
2027                    topic: "t".into(),
2028                    configs: vec![],
2029                },
2030                29,
2031            ),
2032            // variant 30: CreatePartitions
2033            (
2034                Request::CreatePartitions {
2035                    topic: "t".into(),
2036                    new_partition_count: 2,
2037                    assignments: vec![],
2038                },
2039                30,
2040            ),
2041            // variant 31: DeleteRecords
2042            (
2043                Request::DeleteRecords {
2044                    topic: "t".into(),
2045                    partition_offsets: vec![],
2046                },
2047                31,
2048            ),
2049            // variant 32: DescribeTopicConfigs
2050            (Request::DescribeTopicConfigs { topics: vec![] }, 32),
2051            // variant 33: Handshake
2052            (
2053                Request::Handshake {
2054                    protocol_version: crate::PROTOCOL_VERSION,
2055                    client_id: "test".into(),
2056                },
2057                33,
2058            ),
2059            // variant 34: JoinGroup
2060            (
2061                Request::JoinGroup {
2062                    group_id: "g".into(),
2063                    member_id: String::new(),
2064                    session_timeout_ms: 10000,
2065                    rebalance_timeout_ms: 30000,
2066                    protocol_type: "consumer".into(),
2067                    subscriptions: vec![],
2068                },
2069                34,
2070            ),
2071            // variant 35: SyncGroup
2072            (
2073                Request::SyncGroup {
2074                    group_id: "g".into(),
2075                    generation_id: 1,
2076                    member_id: "m".into(),
2077                    assignments: vec![],
2078                },
2079                35,
2080            ),
2081            // variant 36: Heartbeat
2082            (
2083                Request::Heartbeat {
2084                    group_id: "g".into(),
2085                    generation_id: 1,
2086                    member_id: "m".into(),
2087                },
2088                36,
2089            ),
2090            // variant 37: LeaveGroup
2091            (
2092                Request::LeaveGroup {
2093                    group_id: "g".into(),
2094                    member_id: "m".into(),
2095                },
2096                37,
2097            ),
2098            // variant 38: PublishBatch
2099            (
2100                Request::PublishBatch {
2101                    topic: "t".into(),
2102                    partition: Some(0),
2103                    records: vec![],
2104                    leader_epoch: None,
2105                },
2106                38,
2107            ),
2108            // variant 39: IdempotentPublishBatch
2109            (
2110                Request::IdempotentPublishBatch {
2111                    topic: "t".into(),
2112                    partition: Some(0),
2113                    records: vec![],
2114                    producer_id: 1,
2115                    producer_epoch: 0,
2116                    base_sequence: 0,
2117                    leader_epoch: None,
2118                },
2119                39,
2120            ),
2121        ];
2122
2123        for (request, expected_discriminant) in test_cases {
2124            let bytes = request.to_bytes().unwrap();
2125            assert_eq!(
2126                bytes[0], expected_discriminant,
2127                "Wire discriminant changed for {:?} — enum variant order may have shifted!",
2128                request
2129            );
2130        }
2131    }
2132
2133    /// Exhaustive — every Response variant is pinned.
2134    #[test]
2135    fn test_postcard_wire_stability_response_discriminants() {
2136        use bytes::Bytes;
2137
2138        let test_cases: Vec<(Response, u8)> = vec![
2139            // variant 0: Authenticated
2140            (
2141                Response::Authenticated {
2142                    session_id: String::new(),
2143                    expires_in: 0,
2144                },
2145                0,
2146            ),
2147            // variant 1: ScramServerFirst
2148            (
2149                Response::ScramServerFirst {
2150                    message: Bytes::from("r=nonce,s=salt,i=4096"),
2151                },
2152                1,
2153            ),
2154            // variant 2: ScramServerFinal
2155            (
2156                Response::ScramServerFinal {
2157                    message: Bytes::from("v=verifier"),
2158                    session_id: None,
2159                    expires_in: None,
2160                },
2161                2,
2162            ),
2163            // variant 3: Published
2164            (
2165                Response::Published {
2166                    offset: 0,
2167                    partition: 0,
2168                },
2169                3,
2170            ),
2171            // variant 4: Messages
2172            (Response::Messages { messages: vec![] }, 4),
2173            // variant 5: TopicCreated
2174            (
2175                Response::TopicCreated {
2176                    name: "t".into(),
2177                    partitions: 1,
2178                },
2179                5,
2180            ),
2181            // variant 6: Topics
2182            (Response::Topics { topics: vec![] }, 6),
2183            // variant 7: TopicDeleted
2184            (Response::TopicDeleted, 7),
2185            // variant 8: OffsetCommitted
2186            (Response::OffsetCommitted, 8),
2187            // variant 9: Offset
2188            (Response::Offset { offset: None }, 9),
2189            // variant 10: Metadata
2190            (
2191                Response::Metadata {
2192                    name: "t".into(),
2193                    partitions: 1,
2194                },
2195                10,
2196            ),
2197            // variant 11: ClusterMetadata
2198            (
2199                Response::ClusterMetadata {
2200                    controller_id: None,
2201                    brokers: vec![],
2202                    topics: vec![],
2203                },
2204                11,
2205            ),
2206            // variant 12: Pong
2207            (Response::Pong, 12),
2208            // variant 13: OffsetBounds
2209            (
2210                Response::OffsetBounds {
2211                    earliest: 0,
2212                    latest: 0,
2213                },
2214                13,
2215            ),
2216            // variant 14: Groups
2217            (Response::Groups { groups: vec![] }, 14),
2218            // variant 15: GroupDescription
2219            (
2220                Response::GroupDescription {
2221                    consumer_group: "g".into(),
2222                    offsets: HashMap::new(),
2223                },
2224                15,
2225            ),
2226            // variant 16: GroupDeleted
2227            (Response::GroupDeleted, 16),
2228            // variant 17: OffsetForTimestamp
2229            (Response::OffsetForTimestamp { offset: None }, 17),
2230            // variant 18: Error
2231            (
2232                Response::Error {
2233                    message: "e".into(),
2234                },
2235                18,
2236            ),
2237            // variant 19: Ok
2238            (Response::Ok, 19),
2239            // variant 20: ProducerIdInitialized
2240            (
2241                Response::ProducerIdInitialized {
2242                    producer_id: 1,
2243                    producer_epoch: 0,
2244                },
2245                20,
2246            ),
2247            // variant 21: IdempotentPublished
2248            (
2249                Response::IdempotentPublished {
2250                    offset: 0,
2251                    partition: 0,
2252                    duplicate: false,
2253                },
2254                21,
2255            ),
2256            // variant 22: TransactionStarted
2257            (
2258                Response::TransactionStarted {
2259                    txn_id: "tx".into(),
2260                },
2261                22,
2262            ),
2263            // variant 23: PartitionsAddedToTxn
2264            (
2265                Response::PartitionsAddedToTxn {
2266                    txn_id: "tx".into(),
2267                    partition_count: 0,
2268                },
2269                23,
2270            ),
2271            // variant 24: TransactionalPublished
2272            (
2273                Response::TransactionalPublished {
2274                    offset: 0,
2275                    partition: 0,
2276                    sequence: 0,
2277                },
2278                24,
2279            ),
2280            // variant 25: OffsetsAddedToTxn
2281            (
2282                Response::OffsetsAddedToTxn {
2283                    txn_id: "tx".into(),
2284                },
2285                25,
2286            ),
2287            // variant 26: TransactionCommitted
2288            (
2289                Response::TransactionCommitted {
2290                    txn_id: "tx".into(),
2291                },
2292                26,
2293            ),
2294            // variant 27: TransactionAborted
2295            (
2296                Response::TransactionAborted {
2297                    txn_id: "tx".into(),
2298                },
2299                27,
2300            ),
2301            // variant 28: QuotasDescribed
2302            (Response::QuotasDescribed { entries: vec![] }, 28),
2303            // variant 29: QuotasAltered
2304            (Response::QuotasAltered { altered_count: 0 }, 29),
2305            // variant 30: Throttled
2306            (
2307                Response::Throttled {
2308                    throttle_time_ms: 0,
2309                    quota_type: "produce_bytes_rate".into(),
2310                    entity: "user".into(),
2311                },
2312                30,
2313            ),
2314            // variant 31: TopicConfigAltered
2315            (
2316                Response::TopicConfigAltered {
2317                    topic: "t".into(),
2318                    changed_count: 0,
2319                },
2320                31,
2321            ),
2322            // variant 32: PartitionsCreated
2323            (
2324                Response::PartitionsCreated {
2325                    topic: "t".into(),
2326                    new_partition_count: 2,
2327                },
2328                32,
2329            ),
2330            // variant 33: RecordsDeleted
2331            (
2332                Response::RecordsDeleted {
2333                    topic: "t".into(),
2334                    results: vec![],
2335                },
2336                33,
2337            ),
2338            // variant 34: TopicConfigsDescribed
2339            (Response::TopicConfigsDescribed { configs: vec![] }, 34),
2340            // variant 35: HandshakeResult
2341            (
2342                Response::HandshakeResult {
2343                    server_version: crate::PROTOCOL_VERSION,
2344                    compatible: true,
2345                    message: String::new(),
2346                },
2347                35,
2348            ),
2349            // variant 36: JoinGroupResult
2350            (
2351                Response::JoinGroupResult {
2352                    generation_id: 1,
2353                    protocol_type: "consumer".into(),
2354                    member_id: "m".into(),
2355                    leader_id: "l".into(),
2356                    members: vec![],
2357                },
2358                36,
2359            ),
2360            // variant 37: SyncGroupResult
2361            (
2362                Response::SyncGroupResult {
2363                    assignments: vec![],
2364                },
2365                37,
2366            ),
2367            // variant 38: HeartbeatResult
2368            (Response::HeartbeatResult { error_code: 0 }, 38),
2369            // variant 39: LeaveGroupResult
2370            (Response::LeaveGroupResult, 39),
2371            // variant 40: PublishedBatch
2372            (
2373                Response::PublishedBatch {
2374                    base_offset: 0,
2375                    partition: 0,
2376                    record_count: 0,
2377                },
2378                40,
2379            ),
2380            // variant 41: IdempotentPublishedBatch
2381            (
2382                Response::IdempotentPublishedBatch {
2383                    base_offset: 0,
2384                    partition: 0,
2385                    record_count: 0,
2386                    duplicate: false,
2387                },
2388                41,
2389            ),
2390        ];
2391
2392        for (response, expected_discriminant) in test_cases {
2393            let bytes = response.to_bytes().unwrap();
2394            assert_eq!(
2395                bytes[0], expected_discriminant,
2396                "Wire discriminant changed for {:?} — enum variant order may have shifted!",
2397                response
2398            );
2399        }
2400    }
2401
2402    /// Verify that to_wire rejects oversized messages.
2403    #[test]
2404    fn test_to_wire_rejects_oversized_request() {
2405        use bytes::Bytes;
2406        // Create a request with a payload larger than MAX_MESSAGE_SIZE
2407        let huge_value = vec![0u8; crate::MAX_MESSAGE_SIZE + 1];
2408        let request = Request::Publish {
2409            topic: "t".into(),
2410            partition: None,
2411            key: None,
2412            value: Bytes::from(huge_value),
2413            leader_epoch: None,
2414        };
2415        let result = request.to_wire(crate::WireFormat::Postcard, 0);
2416        assert!(
2417            matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
2418            "Expected MessageTooLarge error for oversized request"
2419        );
2420    }
2421
2422    #[test]
2423    fn test_to_wire_rejects_oversized_response() {
2424        let huge_messages = vec![MessageData {
2425            offset: 0,
2426            partition: 0,
2427            timestamp: 0,
2428            key: None,
2429            value: bytes::Bytes::from(vec![0u8; crate::MAX_MESSAGE_SIZE + 1]),
2430            headers: vec![],
2431        }];
2432        let response = Response::Messages {
2433            messages: huge_messages,
2434        };
2435        let result = response.to_wire(crate::WireFormat::Postcard, 0);
2436        assert!(
2437            matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
2438            "Expected MessageTooLarge error for oversized response"
2439        );
2440    }
2441
2442    /// Guard against accidental reordering/removal of enum variants.
2443    ///
2444    /// Postcard uses discriminant indices as wire tags — reordering or deleting
2445    /// a variant silently breaks all existing clients. This test serialises
2446    /// one instance of every variant and asserts the discriminant byte matches
2447    /// the expected index. Adding new variants at the END is safe — just
2448    /// extend the list. Removing or reordering variants will make this test
2449    /// fail.
2450    #[test]
2451    #[allow(deprecated)]
2452    fn request_discriminant_stability() {
2453        // Each tuple: (expected discriminant, instance of the variant)
2454        let cases: Vec<(u8, Request)> = vec![
2455            (
2456                0,
2457                Request::Authenticate {
2458                    username: String::new(),
2459                    password: String::new(),
2460                    require_tls: false,
2461                },
2462            ),
2463            (
2464                1,
2465                Request::SaslAuthenticate {
2466                    mechanism: Bytes::new(),
2467                    auth_bytes: Bytes::new(),
2468                },
2469            ),
2470            (
2471                2,
2472                Request::ScramClientFirst {
2473                    message: Bytes::new(),
2474                },
2475            ),
2476            (
2477                3,
2478                Request::ScramClientFinal {
2479                    message: Bytes::new(),
2480                },
2481            ),
2482            (
2483                4,
2484                Request::Publish {
2485                    topic: String::new(),
2486                    partition: None,
2487                    key: None,
2488                    value: Bytes::new(),
2489                    leader_epoch: None,
2490                },
2491            ),
2492            (
2493                5,
2494                Request::Consume {
2495                    topic: String::new(),
2496                    partition: 0,
2497                    offset: 0,
2498                    max_messages: 1,
2499                    isolation_level: None,
2500                    max_wait_ms: None,
2501                },
2502            ),
2503            (
2504                6,
2505                Request::CreateTopic {
2506                    name: String::new(),
2507                    partitions: None,
2508                },
2509            ),
2510            (7, Request::ListTopics),
2511            (
2512                8,
2513                Request::DeleteTopic {
2514                    name: String::new(),
2515                },
2516            ),
2517            (
2518                9,
2519                Request::CommitOffset {
2520                    consumer_group: String::new(),
2521                    topic: String::new(),
2522                    partition: 0,
2523                    offset: 0,
2524                },
2525            ),
2526            (
2527                10,
2528                Request::GetOffset {
2529                    consumer_group: String::new(),
2530                    topic: String::new(),
2531                    partition: 0,
2532                },
2533            ),
2534            (
2535                11,
2536                Request::GetMetadata {
2537                    topic: String::new(),
2538                },
2539            ),
2540            (12, Request::GetClusterMetadata { topics: Vec::new() }),
2541            (13, Request::Ping),
2542            (
2543                14,
2544                Request::GetOffsetBounds {
2545                    topic: String::new(),
2546                    partition: 0,
2547                },
2548            ),
2549            (15, Request::ListGroups),
2550            (
2551                16,
2552                Request::DescribeGroup {
2553                    consumer_group: String::new(),
2554                },
2555            ),
2556            (
2557                17,
2558                Request::DeleteGroup {
2559                    consumer_group: String::new(),
2560                },
2561            ),
2562            (
2563                18,
2564                Request::GetOffsetForTimestamp {
2565                    topic: String::new(),
2566                    partition: 0,
2567                    timestamp_ms: 0,
2568                },
2569            ),
2570        ];
2571
2572        for (expected_disc, req) in &cases {
2573            let bytes = req.to_bytes().unwrap();
2574            assert_eq!(
2575                bytes[0], *expected_disc,
2576                "Request variant at discriminant {expected_disc} moved! This breaks wire compatibility."
2577            );
2578        }
2579
2580        // Last variant (LeaveGroup = 37) sentinel
2581        let last = Request::LeaveGroup {
2582            group_id: String::new(),
2583            member_id: String::new(),
2584        };
2585        let last_bytes = last.to_bytes().unwrap();
2586        assert_eq!(
2587            last_bytes[0], 37,
2588            "LeaveGroup must remain discriminant 37 (last variant)"
2589        );
2590    }
2591}