Skip to main content

rivven_protocol/
messages.rs

1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Quota alteration request
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QuotaAlteration {
13    /// Entity type: "user", "client-id", "consumer-group", "default"
14    pub entity_type: String,
15    /// Entity name (None for defaults)
16    pub entity_name: Option<String>,
17    /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
18    pub quota_key: String,
19    /// Quota value (None to remove quota, Some to set)
20    pub quota_value: Option<u64>,
21}
22
23/// Quota entry in describe response
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct QuotaEntry {
26    /// Entity type
27    pub entity_type: String,
28    /// Entity name (None for defaults)
29    pub entity_name: Option<String>,
30    /// Quota values
31    pub quotas: HashMap<String, u64>,
32}
33
34/// Topic configuration entry for AlterTopicConfig
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TopicConfigEntry {
37    /// Configuration key (e.g., "retention.ms", "max.message.bytes")
38    pub key: String,
39    /// Configuration value (None to reset to default)
40    pub value: Option<String>,
41}
42
43/// Topic configuration in describe response
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TopicConfigDescription {
46    /// Topic name
47    pub topic: String,
48    /// Configuration entries
49    pub configs: HashMap<String, TopicConfigValue>,
50}
51
52/// Topic configuration value with metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct TopicConfigValue {
55    /// Current value
56    pub value: String,
57    /// Whether this is the default value
58    pub is_default: bool,
59    /// Whether this config is read-only
60    pub is_read_only: bool,
61    /// Whether this config is sensitive (e.g., passwords)
62    pub is_sensitive: bool,
63}
64
65/// Delete records result for a partition
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DeleteRecordsResult {
68    /// Partition ID
69    pub partition: u32,
70    /// New low watermark (first available offset after deletion)
71    pub low_watermark: u64,
72    /// Error message if deletion failed for this partition
73    pub error: Option<String>,
74}
75
76/// Protocol request messages
77///
78/// # Stability
79///
80/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
81/// Adding new variants should only be done at the end of the enum.
82#[derive(Clone, Serialize, Deserialize)]
83pub enum Request {
84    /// Authenticate with username/password (SASL/PLAIN compatible).
85    ///
86    /// # Security — transport encryption required
87    ///
88    /// The password is sent in **plaintext** on the wire (SASL/PLAIN).
89    /// This variant must only be used over a TLS-encrypted connection;
90    /// otherwise the password is exposed to network observers.
91    ///
92    /// # Deprecation
93    ///
94    /// Prefer `ScramClientFirst` / `ScramClientFinal` (SCRAM-SHA-256
95    /// challenge-response) which never sends the password over the wire.
96    #[deprecated(
97        note = "Use SCRAM-SHA-256 (ScramClientFirst/ScramClientFinal) instead of plaintext auth"
98    )]
99    Authenticate {
100        username: String,
101        password: String,
102        /// When `true` the server **must** reject this request if the
103        /// connection is not TLS-encrypted, preventing accidental
104        /// credential exposure on cleartext transports.
105        #[serde(default)]
106        require_tls: bool,
107    },
108
109    /// Authenticate with SASL bytes (for Kafka client compatibility)
110    SaslAuthenticate {
111        #[serde(with = "crate::serde_utils::bytes_serde")]
112        mechanism: Bytes,
113        #[serde(with = "crate::serde_utils::bytes_serde")]
114        auth_bytes: Bytes,
115    },
116
117    /// SCRAM-SHA-256: Client-first message
118    ScramClientFirst {
119        /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
120        #[serde(with = "crate::serde_utils::bytes_serde")]
121        message: Bytes,
122    },
123
124    /// SCRAM-SHA-256: Client-final message
125    ScramClientFinal {
126        /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
127        #[serde(with = "crate::serde_utils::bytes_serde")]
128        message: Bytes,
129    },
130
131    /// Publish a message to a topic
132    Publish {
133        topic: String,
134        partition: Option<u32>,
135        #[serde(with = "crate::serde_utils::option_bytes_serde")]
136        key: Option<Bytes>,
137        #[serde(with = "crate::serde_utils::bytes_serde")]
138        value: Bytes,
139        /// Leader epoch for data-path fencing (§2.4).
140        /// When set, the broker rejects the write if its current leader epoch
141        /// for this partition is higher, preventing stale-leader writes.
142        #[serde(default)]
143        leader_epoch: Option<u64>,
144    },
145
146    /// Consume messages from a topic
147    Consume {
148        topic: String,
149        partition: u32,
150        offset: u64,
151        max_messages: usize,
152        /// Isolation level for transactional reads
153        /// None = read_uncommitted (default, backward compatible)
154        /// Some(0) = read_uncommitted
155        /// Some(1) = read_committed (filters aborted transaction messages)
156        #[serde(default)]
157        isolation_level: Option<u8>,
158        /// Long-polling: maximum time (ms) to wait for new data before returning
159        /// an empty response. None or 0 = immediate (no waiting). Capped at 30 000 ms.
160        #[serde(default)]
161        max_wait_ms: Option<u64>,
162    },
163
164    /// Create a new topic
165    CreateTopic {
166        name: String,
167        partitions: Option<u32>,
168    },
169
170    /// List all topics
171    ListTopics,
172
173    /// Delete a topic
174    DeleteTopic { name: String },
175
176    /// Commit consumer offset
177    CommitOffset {
178        consumer_group: String,
179        topic: String,
180        partition: u32,
181        offset: u64,
182    },
183
184    /// Get consumer offset
185    GetOffset {
186        consumer_group: String,
187        topic: String,
188        partition: u32,
189    },
190
191    /// Get topic metadata
192    GetMetadata { topic: String },
193
194    /// Get cluster metadata (all topics or specific ones)
195    GetClusterMetadata {
196        /// Topics to get metadata for (empty = all topics)
197        topics: Vec<String>,
198    },
199
200    /// Ping
201    Ping,
202
203    /// Get offset bounds for a partition
204    GetOffsetBounds { topic: String, partition: u32 },
205
206    /// List all consumer groups
207    ListGroups,
208
209    /// Describe a consumer group (get all offsets)
210    DescribeGroup { consumer_group: String },
211
212    /// Delete a consumer group
213    DeleteGroup { consumer_group: String },
214
215    /// Find offset for a timestamp
216    GetOffsetForTimestamp {
217        topic: String,
218        partition: u32,
219        /// Timestamp in milliseconds since epoch
220        timestamp_ms: i64,
221    },
222
223    // =========================================================================
224    // Idempotent Producer
225    // =========================================================================
226    /// Initialize idempotent producer (request producer ID and epoch)
227    ///
228    /// Call this before sending idempotent produce requests.
229    /// If reconnecting, provide the previous producer_id to bump epoch.
230    InitProducerId {
231        /// Previous producer ID (None for new producers)
232        producer_id: Option<u64>,
233    },
234
235    /// Publish with idempotent semantics (exactly-once delivery)
236    ///
237    /// Requires InitProducerId to have been called first.
238    IdempotentPublish {
239        topic: String,
240        partition: Option<u32>,
241        #[serde(with = "crate::serde_utils::option_bytes_serde")]
242        key: Option<Bytes>,
243        #[serde(with = "crate::serde_utils::bytes_serde")]
244        value: Bytes,
245        /// Producer ID from InitProducerId response
246        producer_id: u64,
247        /// Producer epoch from InitProducerId response
248        producer_epoch: u16,
249        /// Sequence number (starts at 0, increments per partition)
250        sequence: i32,
251        /// Leader epoch for fencing stale writes (§2.4)
252        #[serde(default)]
253        leader_epoch: Option<u64>,
254    },
255
256    // =========================================================================
257    // Native Transactions
258    // =========================================================================
259    /// Begin a new transaction
260    BeginTransaction {
261        /// Transaction ID (unique per producer)
262        txn_id: String,
263        /// Producer ID from InitProducerId
264        producer_id: u64,
265        /// Producer epoch
266        producer_epoch: u16,
267        /// Transaction timeout in milliseconds (optional, defaults to 60s)
268        timeout_ms: Option<u64>,
269    },
270
271    /// Add partitions to an active transaction
272    AddPartitionsToTxn {
273        /// Transaction ID
274        txn_id: String,
275        /// Producer ID
276        producer_id: u64,
277        /// Producer epoch
278        producer_epoch: u16,
279        /// Partitions to add (topic, partition pairs)
280        partitions: Vec<(String, u32)>,
281    },
282
283    /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
284    TransactionalPublish {
285        /// Transaction ID
286        txn_id: String,
287        topic: String,
288        partition: Option<u32>,
289        #[serde(with = "crate::serde_utils::option_bytes_serde")]
290        key: Option<Bytes>,
291        #[serde(with = "crate::serde_utils::bytes_serde")]
292        value: Bytes,
293        /// Producer ID
294        producer_id: u64,
295        /// Producer epoch
296        producer_epoch: u16,
297        /// Sequence number
298        sequence: i32,
299        /// Leader epoch for fencing stale writes (§2.4)
300        #[serde(default)]
301        leader_epoch: Option<u64>,
302    },
303
304    /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
305    AddOffsetsToTxn {
306        /// Transaction ID
307        txn_id: String,
308        /// Producer ID
309        producer_id: u64,
310        /// Producer epoch
311        producer_epoch: u16,
312        /// Consumer group ID
313        group_id: String,
314        /// Offsets to commit (topic, partition, offset triples)
315        offsets: Vec<(String, u32, i64)>,
316    },
317
318    /// Commit a transaction
319    CommitTransaction {
320        /// Transaction ID
321        txn_id: String,
322        /// Producer ID
323        producer_id: u64,
324        /// Producer epoch
325        producer_epoch: u16,
326    },
327
328    /// Abort a transaction
329    AbortTransaction {
330        /// Transaction ID
331        txn_id: String,
332        /// Producer ID
333        producer_id: u64,
334        /// Producer epoch
335        producer_epoch: u16,
336    },
337
338    // =========================================================================
339    // Per-Principal Quotas (Kafka Parity)
340    // =========================================================================
341    /// Describe quotas for entities
342    DescribeQuotas {
343        /// Entities to describe (empty = all)
344        /// Format: Vec<(entity_type, entity_name)>
345        /// entity_type: "user", "client-id", "consumer-group", "default"
346        /// entity_name: None for defaults, Some for specific entities
347        entities: Vec<(String, Option<String>)>,
348    },
349
350    /// Alter quotas for entities
351    AlterQuotas {
352        /// Quota alterations to apply
353        /// Each item: (entity_type, entity_name, quota_key, quota_value)
354        /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
355        /// quota_value: None to remove, Some(value) to set
356        alterations: Vec<QuotaAlteration>,
357    },
358
359    // =========================================================================
360    // Admin API (Kafka Parity)
361    // =========================================================================
362    /// Alter topic configuration
363    AlterTopicConfig {
364        /// Topic name
365        topic: String,
366        /// Configuration changes to apply
367        configs: Vec<TopicConfigEntry>,
368    },
369
370    /// Create additional partitions for an existing topic
371    CreatePartitions {
372        /// Topic name
373        topic: String,
374        /// New total partition count (must be > current count)
375        new_partition_count: u32,
376        /// Optional assignment of new partitions to brokers
377        /// If empty, broker will auto-assign
378        assignments: Vec<Vec<String>>,
379    },
380
381    /// Delete records before a given offset (log truncation)
382    DeleteRecords {
383        /// Topic name
384        topic: String,
385        /// Partition-offset pairs: delete all records before these offsets
386        partition_offsets: Vec<(u32, u64)>,
387    },
388
389    /// Describe topic configurations
390    DescribeTopicConfigs {
391        /// Topics to describe (empty = all)
392        topics: Vec<String>,
393    },
394
395    /// Protocol version handshake
396    ///
397    /// Sent by the client as the first message after connecting.
398    /// The server validates the protocol version and returns compatibility info.
399    Handshake {
400        /// Client's protocol version (must match `PROTOCOL_VERSION`)
401        protocol_version: u32,
402        /// Optional client identifier for diagnostics
403        client_id: String,
404    },
405}
406
407// Manual Debug impl to redact credentials from log output.
408// `Request::Authenticate { password }` and SASL/SCRAM variants contain
409// secrets that must never appear in debug logs.
410impl std::fmt::Debug for Request {
411    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412        match self {
413            #[allow(deprecated)]
414            Self::Authenticate {
415                username,
416                require_tls,
417                ..
418            } => f
419                .debug_struct("Authenticate")
420                .field("username", username)
421                .field("password", &"[REDACTED]")
422                .field("require_tls", require_tls)
423                .finish(),
424            Self::SaslAuthenticate { mechanism, .. } => f
425                .debug_struct("SaslAuthenticate")
426                .field("mechanism", mechanism)
427                .field("auth_bytes", &"[REDACTED]")
428                .finish(),
429            Self::ScramClientFirst { .. } => f
430                .debug_struct("ScramClientFirst")
431                .field("message", &"[REDACTED]")
432                .finish(),
433            Self::ScramClientFinal { .. } => f
434                .debug_struct("ScramClientFinal")
435                .field("message", &"[REDACTED]")
436                .finish(),
437            Self::Publish {
438                topic,
439                partition,
440                key,
441                value,
442                leader_epoch,
443            } => f
444                .debug_struct("Publish")
445                .field("topic", topic)
446                .field("partition", partition)
447                .field("key", key)
448                .field("value_len", &value.len())
449                .field("leader_epoch", leader_epoch)
450                .finish(),
451            Self::Consume {
452                topic,
453                partition,
454                offset,
455                max_messages,
456                isolation_level,
457                max_wait_ms,
458            } => f
459                .debug_struct("Consume")
460                .field("topic", topic)
461                .field("partition", partition)
462                .field("offset", offset)
463                .field("max_messages", max_messages)
464                .field("isolation_level", isolation_level)
465                .field("max_wait_ms", max_wait_ms)
466                .finish(),
467            Self::CreateTopic { name, partitions } => f
468                .debug_struct("CreateTopic")
469                .field("name", name)
470                .field("partitions", partitions)
471                .finish(),
472            Self::ListTopics => write!(f, "ListTopics"),
473            Self::DeleteTopic { name } => {
474                f.debug_struct("DeleteTopic").field("name", name).finish()
475            }
476            Self::CommitOffset {
477                consumer_group,
478                topic,
479                partition,
480                offset,
481            } => f
482                .debug_struct("CommitOffset")
483                .field("consumer_group", consumer_group)
484                .field("topic", topic)
485                .field("partition", partition)
486                .field("offset", offset)
487                .finish(),
488            Self::GetOffset {
489                consumer_group,
490                topic,
491                partition,
492            } => f
493                .debug_struct("GetOffset")
494                .field("consumer_group", consumer_group)
495                .field("topic", topic)
496                .field("partition", partition)
497                .finish(),
498            Self::GetMetadata { topic } => {
499                f.debug_struct("GetMetadata").field("topic", topic).finish()
500            }
501            Self::GetClusterMetadata { topics } => f
502                .debug_struct("GetClusterMetadata")
503                .field("topics", topics)
504                .finish(),
505            Self::Ping => write!(f, "Ping"),
506            Self::GetOffsetBounds { topic, partition } => f
507                .debug_struct("GetOffsetBounds")
508                .field("topic", topic)
509                .field("partition", partition)
510                .finish(),
511            Self::ListGroups => write!(f, "ListGroups"),
512            Self::DescribeGroup { consumer_group } => f
513                .debug_struct("DescribeGroup")
514                .field("consumer_group", consumer_group)
515                .finish(),
516            Self::DeleteGroup { consumer_group } => f
517                .debug_struct("DeleteGroup")
518                .field("consumer_group", consumer_group)
519                .finish(),
520            Self::GetOffsetForTimestamp {
521                topic,
522                partition,
523                timestamp_ms,
524            } => f
525                .debug_struct("GetOffsetForTimestamp")
526                .field("topic", topic)
527                .field("partition", partition)
528                .field("timestamp_ms", timestamp_ms)
529                .finish(),
530            Self::InitProducerId { producer_id } => f
531                .debug_struct("InitProducerId")
532                .field("producer_id", producer_id)
533                .finish(),
534            Self::IdempotentPublish {
535                topic,
536                partition,
537                producer_id,
538                producer_epoch,
539                sequence,
540                ..
541            } => f
542                .debug_struct("IdempotentPublish")
543                .field("topic", topic)
544                .field("partition", partition)
545                .field("producer_id", producer_id)
546                .field("producer_epoch", producer_epoch)
547                .field("sequence", sequence)
548                .finish(),
549            Self::BeginTransaction {
550                txn_id,
551                producer_id,
552                producer_epoch,
553                timeout_ms,
554            } => f
555                .debug_struct("BeginTransaction")
556                .field("txn_id", txn_id)
557                .field("producer_id", producer_id)
558                .field("producer_epoch", producer_epoch)
559                .field("timeout_ms", timeout_ms)
560                .finish(),
561            Self::AddPartitionsToTxn {
562                txn_id,
563                producer_id,
564                producer_epoch,
565                partitions,
566            } => f
567                .debug_struct("AddPartitionsToTxn")
568                .field("txn_id", txn_id)
569                .field("producer_id", producer_id)
570                .field("producer_epoch", producer_epoch)
571                .field("partitions", partitions)
572                .finish(),
573            Self::TransactionalPublish {
574                txn_id,
575                topic,
576                partition,
577                producer_id,
578                producer_epoch,
579                sequence,
580                ..
581            } => f
582                .debug_struct("TransactionalPublish")
583                .field("txn_id", txn_id)
584                .field("topic", topic)
585                .field("partition", partition)
586                .field("producer_id", producer_id)
587                .field("producer_epoch", producer_epoch)
588                .field("sequence", sequence)
589                .finish(),
590            Self::AddOffsetsToTxn {
591                txn_id,
592                producer_id,
593                producer_epoch,
594                group_id,
595                offsets,
596            } => f
597                .debug_struct("AddOffsetsToTxn")
598                .field("txn_id", txn_id)
599                .field("producer_id", producer_id)
600                .field("producer_epoch", producer_epoch)
601                .field("group_id", group_id)
602                .field("offsets", offsets)
603                .finish(),
604            Self::CommitTransaction {
605                txn_id,
606                producer_id,
607                producer_epoch,
608            } => f
609                .debug_struct("CommitTransaction")
610                .field("txn_id", txn_id)
611                .field("producer_id", producer_id)
612                .field("producer_epoch", producer_epoch)
613                .finish(),
614            Self::AbortTransaction {
615                txn_id,
616                producer_id,
617                producer_epoch,
618            } => f
619                .debug_struct("AbortTransaction")
620                .field("txn_id", txn_id)
621                .field("producer_id", producer_id)
622                .field("producer_epoch", producer_epoch)
623                .finish(),
624            Self::DescribeQuotas { entities } => f
625                .debug_struct("DescribeQuotas")
626                .field("entities", entities)
627                .finish(),
628            Self::AlterQuotas { alterations } => f
629                .debug_struct("AlterQuotas")
630                .field("alterations", alterations)
631                .finish(),
632            Self::AlterTopicConfig { topic, configs } => f
633                .debug_struct("AlterTopicConfig")
634                .field("topic", topic)
635                .field("configs", configs)
636                .finish(),
637            Self::CreatePartitions {
638                topic,
639                new_partition_count,
640                assignments,
641            } => f
642                .debug_struct("CreatePartitions")
643                .field("topic", topic)
644                .field("new_partition_count", new_partition_count)
645                .field("assignments", assignments)
646                .finish(),
647            Self::DeleteRecords {
648                topic,
649                partition_offsets,
650            } => f
651                .debug_struct("DeleteRecords")
652                .field("topic", topic)
653                .field("partition_offsets", partition_offsets)
654                .finish(),
655            Self::DescribeTopicConfigs { topics } => f
656                .debug_struct("DescribeTopicConfigs")
657                .field("topics", topics)
658                .finish(),
659            Self::Handshake {
660                protocol_version,
661                client_id,
662            } => f
663                .debug_struct("Handshake")
664                .field("protocol_version", protocol_version)
665                .field("client_id", client_id)
666                .finish(),
667        }
668    }
669}
670
671/// Protocol response messages
672///
673/// # Stability
674///
675/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
676/// Adding new variants should only be done at the end of the enum.
677#[derive(Debug, Clone, Serialize, Deserialize)]
678pub enum Response {
679    /// Authentication successful
680    Authenticated {
681        /// Session token for subsequent requests
682        session_id: String,
683        /// Session timeout in seconds
684        expires_in: u64,
685    },
686
687    /// SCRAM-SHA-256: Server-first message (challenge)
688    ScramServerFirst {
689        /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
690        #[serde(with = "crate::serde_utils::bytes_serde")]
691        message: Bytes,
692    },
693
694    /// SCRAM-SHA-256: Server-final message (verification or error)
695    ScramServerFinal {
696        /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
697        #[serde(with = "crate::serde_utils::bytes_serde")]
698        message: Bytes,
699        /// Session ID (if authentication succeeded)
700        session_id: Option<String>,
701        /// Session timeout in seconds (if authentication succeeded)
702        expires_in: Option<u64>,
703    },
704
705    /// Success response with offset
706    Published { offset: u64, partition: u32 },
707
708    /// Messages response
709    Messages { messages: Vec<MessageData> },
710
711    /// Topic created
712    TopicCreated { name: String, partitions: u32 },
713
714    /// List of topics
715    Topics { topics: Vec<String> },
716
717    /// Topic deleted
718    TopicDeleted,
719
720    /// Offset committed
721    OffsetCommitted,
722
723    /// Offset response
724    Offset { offset: Option<u64> },
725
726    /// Metadata
727    Metadata { name: String, partitions: u32 },
728
729    /// Full cluster metadata for topic(s)
730    ClusterMetadata {
731        /// Controller node ID (Raft leader)
732        controller_id: Option<String>,
733        /// Broker/node list
734        brokers: Vec<BrokerInfo>,
735        /// Topic metadata
736        topics: Vec<TopicMetadata>,
737    },
738
739    /// Pong
740    Pong,
741
742    /// Offset bounds for a partition
743    OffsetBounds { earliest: u64, latest: u64 },
744
745    /// List of consumer groups
746    Groups { groups: Vec<String> },
747
748    /// Consumer group details with all offsets
749    GroupDescription {
750        consumer_group: String,
751        /// topic → partition → offset
752        offsets: HashMap<String, HashMap<u32, u64>>,
753    },
754
755    /// Consumer group deleted
756    GroupDeleted,
757
758    /// Offset for a timestamp
759    OffsetForTimestamp {
760        /// The first offset with timestamp >= the requested timestamp
761        /// None if no matching offset was found
762        offset: Option<u64>,
763    },
764
765    /// Error response
766    Error { message: String },
767
768    /// Success
769    Ok,
770
771    // =========================================================================
772    // Idempotent Producer
773    // =========================================================================
774    /// Producer ID initialized
775    ProducerIdInitialized {
776        /// Assigned or existing producer ID
777        producer_id: u64,
778        /// Current epoch (increments on reconnect)
779        producer_epoch: u16,
780    },
781
782    /// Idempotent publish result
783    IdempotentPublished {
784        /// Offset where message was written
785        offset: u64,
786        /// Partition the message was written to
787        partition: u32,
788        /// Whether this was a duplicate (message already existed)
789        duplicate: bool,
790    },
791
792    // =========================================================================
793    // Native Transactions
794    // =========================================================================
795    /// Transaction started successfully
796    TransactionStarted {
797        /// Transaction ID
798        txn_id: String,
799    },
800
801    /// Partitions added to transaction
802    PartitionsAddedToTxn {
803        /// Transaction ID
804        txn_id: String,
805        /// Number of partitions now in transaction
806        partition_count: usize,
807    },
808
809    /// Transactional publish result
810    TransactionalPublished {
811        /// Offset where message was written (pending commit)
812        offset: u64,
813        /// Partition the message was written to
814        partition: u32,
815        /// Sequence number accepted
816        sequence: i32,
817    },
818
819    /// Offsets added to transaction
820    OffsetsAddedToTxn {
821        /// Transaction ID
822        txn_id: String,
823    },
824
825    /// Transaction committed
826    TransactionCommitted {
827        /// Transaction ID
828        txn_id: String,
829    },
830
831    /// Transaction aborted
832    TransactionAborted {
833        /// Transaction ID
834        txn_id: String,
835    },
836
837    // =========================================================================
838    // Per-Principal Quotas (Kafka Parity)
839    // =========================================================================
840    /// Quota descriptions
841    QuotasDescribed {
842        /// List of quota entries
843        entries: Vec<QuotaEntry>,
844    },
845
846    /// Quotas altered successfully
847    QuotasAltered {
848        /// Number of quota alterations applied
849        altered_count: usize,
850    },
851
852    /// Throttle response (returned when quota exceeded)
853    Throttled {
854        /// Time to wait before retrying (milliseconds)
855        throttle_time_ms: u64,
856        /// Quota type that was exceeded
857        quota_type: String,
858        /// Entity that exceeded quota
859        entity: String,
860    },
861
862    // =========================================================================
863    // Admin API (Kafka Parity)
864    // =========================================================================
865    /// Topic configuration altered
866    TopicConfigAltered {
867        /// Topic name
868        topic: String,
869        /// Number of configurations changed
870        changed_count: usize,
871    },
872
873    /// Partitions created
874    PartitionsCreated {
875        /// Topic name
876        topic: String,
877        /// New total partition count
878        new_partition_count: u32,
879    },
880
881    /// Records deleted
882    RecordsDeleted {
883        /// Topic name
884        topic: String,
885        /// Results per partition
886        results: Vec<DeleteRecordsResult>,
887    },
888
889    /// Topic configurations described
890    TopicConfigsDescribed {
891        /// Configuration descriptions per topic
892        configs: Vec<TopicConfigDescription>,
893    },
894
895    /// Protocol version handshake response
896    HandshakeResult {
897        /// Server's protocol version
898        server_version: u32,
899        /// Whether the client version is compatible
900        compatible: bool,
901        /// Human-readable message (e.g. reason for incompatibility)
902        message: String,
903    },
904}
905
906impl Request {
907    /// Serialize request to bytes (postcard format, no format prefix)
908    ///
909    /// For internal Rust-to-Rust communication where format is known.
910    /// Use `to_wire()` for wire transmission with format detection support.
911    #[inline]
912    pub fn to_bytes(&self) -> Result<Vec<u8>> {
913        Ok(postcard::to_allocvec(self)?)
914    }
915
916    /// Deserialize request from bytes (postcard format)
917    ///
918    /// For internal Rust-to-Rust communication where format is known.
919    /// Use `from_wire()` for wire transmission with format detection support.
920    #[inline]
921    pub fn from_bytes(data: &[u8]) -> Result<Self> {
922        Ok(postcard::from_bytes(data)?)
923    }
924
925    /// Serialize request with wire format prefix
926    ///
927    /// Wire format: `[format_byte][correlation_id (4 bytes BE)][payload]`
928    /// - format_byte: 0x00 = postcard, 0x01 = protobuf
929    /// - correlation_id: 4-byte big-endian u32 for request-response matching
930    /// - payload: serialized message
931    ///
932    /// ## Correlation ID sizing
933    ///
934    /// The wire protocol uses `u32` for correlation IDs (4 bytes), which is
935    /// the Kafka-compatible choice and sufficient for client-server RPCs
936    /// (4 billion in-flight requests before wrap-around). The cluster-internal
937    /// protocol (`rivven-cluster`) uses `u64` for its own RPC correlation to
938    /// avoid any wrap-around concern on high-throughput inter-node links.
939    /// The two namespaces are independent and never cross boundaries.
940    ///
941    /// Note: Length prefix is NOT included (handled by transport layer)
942    ///
943    /// # Errors
944    ///
945    /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
946    /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
947    #[inline]
948    pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
949        let result = match format {
950            crate::WireFormat::Postcard => {
951                // Single allocation — serialize directly into the output
952                // Vec via `postcard::to_extend` instead of double-allocating
953                // (to_allocvec → intermediate Vec → copy into result Vec).
954                let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + 128);
955                result.push(format.as_byte());
956                result.extend_from_slice(&correlation_id.to_be_bytes());
957                postcard::to_extend(self, result)?
958            }
959            crate::WireFormat::Protobuf => {
960                // Protobuf requires the `protobuf` feature
961                #[cfg(feature = "protobuf")]
962                {
963                    let payload = self.to_proto_bytes()?;
964                    let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
965                    result.push(format.as_byte());
966                    result.extend_from_slice(&correlation_id.to_be_bytes());
967                    result.extend_from_slice(&payload);
968                    result
969                }
970                #[cfg(not(feature = "protobuf"))]
971                {
972                    return Err(crate::ProtocolError::Serialization(
973                        "Protobuf support requires the 'protobuf' feature".into(),
974                    ));
975                }
976            }
977        };
978
979        // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
980        if result.len() > crate::MAX_MESSAGE_SIZE {
981            return Err(crate::ProtocolError::MessageTooLarge(
982                result.len(),
983                crate::MAX_MESSAGE_SIZE,
984            ));
985        }
986
987        Ok(result)
988    }
989
990    /// Deserialize request with format auto-detection
991    ///
992    /// Detects format from first byte, reads correlation_id, and deserializes accordingly.
993    /// Returns the deserialized request, the detected format, and the correlation_id.
994    #[inline]
995    pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
996        if data.len() < crate::WIRE_HEADER_SIZE {
997            return Err(crate::ProtocolError::Serialization(
998                "Wire data too short (need format byte + correlation_id)".into(),
999            ));
1000        }
1001
1002        let format_byte = data[0];
1003        let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1004            crate::ProtocolError::Serialization(format!(
1005                "Unknown wire format: 0x{:02x}",
1006                format_byte
1007            ))
1008        })?;
1009
1010        let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1011        let payload = &data[crate::WIRE_HEADER_SIZE..];
1012
1013        match format {
1014            crate::WireFormat::Postcard => {
1015                let request = postcard::from_bytes(payload)?;
1016                Ok((request, format, correlation_id))
1017            }
1018            crate::WireFormat::Protobuf => {
1019                #[cfg(feature = "protobuf")]
1020                {
1021                    let request = Self::from_proto_bytes(payload)?;
1022                    Ok((request, format, correlation_id))
1023                }
1024                #[cfg(not(feature = "protobuf"))]
1025                {
1026                    Err(crate::ProtocolError::Serialization(
1027                        "Protobuf support requires the 'protobuf' feature".into(),
1028                    ))
1029                }
1030            }
1031        }
1032    }
1033}
1034
1035impl Response {
1036    /// Serialize response to bytes (postcard format, no format prefix)
1037    #[inline]
1038    pub fn to_bytes(&self) -> Result<Vec<u8>> {
1039        Ok(postcard::to_allocvec(self)?)
1040    }
1041
1042    /// Deserialize response from bytes (postcard format)
1043    #[inline]
1044    pub fn from_bytes(data: &[u8]) -> Result<Self> {
1045        Ok(postcard::from_bytes(data)?)
1046    }
1047
1048    /// Serialize response with wire format prefix
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
1053    /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
1054    #[inline]
1055    pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
1056        let result = match format {
1057            crate::WireFormat::Postcard => {
1058                // Estimate payload size to avoid reallocations.
1059                // For Messages responses, use message count × estimated per-message
1060                // size (offset 8 + partition 4 + value ~256 + timestamp 8 + overhead ~24 ≈ 300 bytes).
1061                // For other variants the default 128-byte hint is usually sufficient.
1062                let size_hint = match self {
1063                    Response::Messages { messages } => messages.len().saturating_mul(300).max(128),
1064                    _ => 128,
1065                };
1066                let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + size_hint);
1067                result.push(format.as_byte());
1068                result.extend_from_slice(&correlation_id.to_be_bytes());
1069                postcard::to_extend(self, result)?
1070            }
1071            crate::WireFormat::Protobuf => {
1072                #[cfg(feature = "protobuf")]
1073                {
1074                    let payload = self.to_proto_bytes()?;
1075                    let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
1076                    result.push(format.as_byte());
1077                    result.extend_from_slice(&correlation_id.to_be_bytes());
1078                    result.extend_from_slice(&payload);
1079                    result
1080                }
1081                #[cfg(not(feature = "protobuf"))]
1082                {
1083                    return Err(crate::ProtocolError::Serialization(
1084                        "Protobuf support requires the 'protobuf' feature".into(),
1085                    ));
1086                }
1087            }
1088        };
1089
1090        // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
1091        if result.len() > crate::MAX_MESSAGE_SIZE {
1092            return Err(crate::ProtocolError::MessageTooLarge(
1093                result.len(),
1094                crate::MAX_MESSAGE_SIZE,
1095            ));
1096        }
1097
1098        Ok(result)
1099    }
1100
1101    /// Deserialize response with format auto-detection
1102    #[inline]
1103    pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
1104        if data.len() < crate::WIRE_HEADER_SIZE {
1105            return Err(crate::ProtocolError::Serialization(
1106                "Wire data too short (need format byte + correlation_id)".into(),
1107            ));
1108        }
1109
1110        let format_byte = data[0];
1111        let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1112            crate::ProtocolError::Serialization(format!(
1113                "Unknown wire format: 0x{:02x}",
1114                format_byte
1115            ))
1116        })?;
1117
1118        let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1119        let payload = &data[crate::WIRE_HEADER_SIZE..];
1120
1121        match format {
1122            crate::WireFormat::Postcard => {
1123                let response = postcard::from_bytes(payload)?;
1124                Ok((response, format, correlation_id))
1125            }
1126            crate::WireFormat::Protobuf => {
1127                #[cfg(feature = "protobuf")]
1128                {
1129                    let response = Self::from_proto_bytes(payload)?;
1130                    Ok((response, format, correlation_id))
1131                }
1132                #[cfg(not(feature = "protobuf"))]
1133                {
1134                    Err(crate::ProtocolError::Serialization(
1135                        "Protobuf support requires the 'protobuf' feature".into(),
1136                    ))
1137                }
1138            }
1139        }
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146
1147    #[test]
1148    #[allow(deprecated)]
1149    fn test_request_roundtrip() {
1150        let requests = vec![
1151            Request::Ping,
1152            Request::ListTopics,
1153            Request::CreateTopic {
1154                name: "test".to_string(),
1155                partitions: Some(4),
1156            },
1157            Request::Authenticate {
1158                username: "admin".to_string(),
1159                password: "secret".to_string(),
1160                require_tls: true,
1161            },
1162        ];
1163
1164        for req in requests {
1165            let bytes = req.to_bytes().unwrap();
1166            let decoded = Request::from_bytes(&bytes).unwrap();
1167            // Can't directly compare due to Debug, but serialization should succeed
1168            assert!(!bytes.is_empty());
1169            let _ = decoded; // Use decoded
1170        }
1171    }
1172
1173    #[test]
1174    fn test_response_roundtrip() {
1175        let responses = vec![
1176            Response::Pong,
1177            Response::Ok,
1178            Response::Topics {
1179                topics: vec!["a".to_string(), "b".to_string()],
1180            },
1181            Response::Error {
1182                message: "test error".to_string(),
1183            },
1184        ];
1185
1186        for resp in responses {
1187            let bytes = resp.to_bytes().unwrap();
1188            let decoded = Response::from_bytes(&bytes).unwrap();
1189            assert!(!bytes.is_empty());
1190            let _ = decoded;
1191        }
1192    }
1193
1194    #[test]
1195    fn test_request_wire_roundtrip() {
1196        let request = Request::Ping;
1197
1198        // Serialize with format prefix and correlation_id
1199        let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 42).unwrap();
1200
1201        // First byte should be format identifier
1202        assert_eq!(wire_bytes[0], 0x00); // Postcard format
1203
1204        // Deserialize with auto-detection
1205        let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1206        assert_eq!(format, crate::WireFormat::Postcard);
1207        assert_eq!(correlation_id, 42);
1208        assert!(matches!(decoded, Request::Ping));
1209    }
1210
1211    #[test]
1212    fn test_response_wire_roundtrip() {
1213        let response = Response::Pong;
1214
1215        // Serialize with format prefix and correlation_id
1216        let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 99).unwrap();
1217
1218        // First byte should be format identifier
1219        assert_eq!(wire_bytes[0], 0x00); // Postcard format
1220
1221        // Deserialize with auto-detection
1222        let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1223        assert_eq!(format, crate::WireFormat::Postcard);
1224        assert_eq!(correlation_id, 99);
1225        assert!(matches!(decoded, Response::Pong));
1226    }
1227
1228    #[test]
1229    fn test_wire_format_empty_data() {
1230        let result = Request::from_wire(&[]);
1231        assert!(result.is_err());
1232    }
1233
1234    #[test]
1235    fn test_wire_format_complex_request() {
1236        use bytes::Bytes;
1237
1238        let request = Request::Publish {
1239            topic: "test-topic".to_string(),
1240            partition: Some(3),
1241            key: Some(Bytes::from("my-key")),
1242            value: Bytes::from("hello world"),
1243            leader_epoch: None,
1244        };
1245
1246        let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 1).unwrap();
1247        assert_eq!(wire_bytes[0], 0x00);
1248
1249        let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1250        assert_eq!(format, crate::WireFormat::Postcard);
1251        assert_eq!(correlation_id, 1);
1252
1253        // Verify the decoded request matches
1254        if let Request::Publish {
1255            topic, partition, ..
1256        } = decoded
1257        {
1258            assert_eq!(topic, "test-topic");
1259            assert_eq!(partition, Some(3));
1260        } else {
1261            panic!("Expected Publish request");
1262        }
1263    }
1264
1265    #[test]
1266    fn test_wire_format_complex_response() {
1267        let response = Response::Published {
1268            offset: 12345,
1269            partition: 7,
1270        };
1271
1272        let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 2).unwrap();
1273        assert_eq!(wire_bytes[0], 0x00);
1274
1275        let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1276        assert_eq!(format, crate::WireFormat::Postcard);
1277        assert_eq!(correlation_id, 2);
1278
1279        if let Response::Published { offset, partition } = decoded {
1280            assert_eq!(offset, 12345);
1281            assert_eq!(partition, 7);
1282        } else {
1283            panic!("Expected Published response");
1284        }
1285    }
1286
1287    /// Snapshot test: postcard serializes enum variants by ordinal position.
1288    /// If someone reorders variants in Request or Response, this test fails
1289    /// because the byte prefix (discriminant) will change, breaking wire compat.
1290    ///
1291    /// Exhaustive — every Request variant is pinned.
1292    #[test]
1293    #[allow(deprecated)]
1294    fn test_postcard_wire_stability_request_discriminants() {
1295        use bytes::Bytes;
1296
1297        // Serialize a representative of each variant and check the leading
1298        // discriminant byte(s) haven't shifted. Postcard uses varint encoding
1299        // for enum discriminants.
1300        let test_cases: Vec<(Request, u8)> = vec![
1301            // variant 0: Authenticate
1302            (
1303                Request::Authenticate {
1304                    username: "u".into(),
1305                    password: "p".into(),
1306                    require_tls: false,
1307                },
1308                0,
1309            ),
1310            // variant 1: SaslAuthenticate
1311            (
1312                Request::SaslAuthenticate {
1313                    mechanism: Bytes::from("PLAIN"),
1314                    auth_bytes: Bytes::from("data"),
1315                },
1316                1,
1317            ),
1318            // variant 2: ScramClientFirst
1319            (
1320                Request::ScramClientFirst {
1321                    message: Bytes::from("n,,n=user,r=nonce"),
1322                },
1323                2,
1324            ),
1325            // variant 3: ScramClientFinal
1326            (
1327                Request::ScramClientFinal {
1328                    message: Bytes::from("c=bind,r=nonce,p=proof"),
1329                },
1330                3,
1331            ),
1332            // variant 4: Publish
1333            (
1334                Request::Publish {
1335                    topic: "t".into(),
1336                    partition: None,
1337                    key: None,
1338                    value: Bytes::from("v"),
1339                    leader_epoch: None,
1340                },
1341                4,
1342            ),
1343            // variant 5: Consume
1344            (
1345                Request::Consume {
1346                    topic: "t".into(),
1347                    partition: 0,
1348                    offset: 0,
1349                    max_messages: 1,
1350                    isolation_level: None,
1351                    max_wait_ms: None,
1352                },
1353                5,
1354            ),
1355            // variant 6: CreateTopic
1356            (
1357                Request::CreateTopic {
1358                    name: "t".into(),
1359                    partitions: None,
1360                },
1361                6,
1362            ),
1363            // variant 7: ListTopics
1364            (Request::ListTopics, 7),
1365            // variant 8: DeleteTopic
1366            (Request::DeleteTopic { name: "t".into() }, 8),
1367            // variant 9: CommitOffset
1368            (
1369                Request::CommitOffset {
1370                    consumer_group: "g".into(),
1371                    topic: "t".into(),
1372                    partition: 0,
1373                    offset: 0,
1374                },
1375                9,
1376            ),
1377            // variant 10: GetOffset
1378            (
1379                Request::GetOffset {
1380                    consumer_group: "g".into(),
1381                    topic: "t".into(),
1382                    partition: 0,
1383                },
1384                10,
1385            ),
1386            // variant 11: GetMetadata
1387            (Request::GetMetadata { topic: "t".into() }, 11),
1388            // variant 12: GetClusterMetadata
1389            (Request::GetClusterMetadata { topics: vec![] }, 12),
1390            // variant 13: Ping
1391            (Request::Ping, 13),
1392            // variant 14: GetOffsetBounds
1393            (
1394                Request::GetOffsetBounds {
1395                    topic: "t".into(),
1396                    partition: 0,
1397                },
1398                14,
1399            ),
1400            // variant 15: ListGroups
1401            (Request::ListGroups, 15),
1402            // variant 16: DescribeGroup
1403            (
1404                Request::DescribeGroup {
1405                    consumer_group: "g".into(),
1406                },
1407                16,
1408            ),
1409            // variant 17: DeleteGroup
1410            (
1411                Request::DeleteGroup {
1412                    consumer_group: "g".into(),
1413                },
1414                17,
1415            ),
1416            // variant 18: GetOffsetForTimestamp
1417            (
1418                Request::GetOffsetForTimestamp {
1419                    topic: "t".into(),
1420                    partition: 0,
1421                    timestamp_ms: 0,
1422                },
1423                18,
1424            ),
1425            // variant 19: InitProducerId
1426            (Request::InitProducerId { producer_id: None }, 19),
1427            // variant 20: IdempotentPublish
1428            (
1429                Request::IdempotentPublish {
1430                    topic: "t".into(),
1431                    partition: None,
1432                    key: None,
1433                    value: Bytes::from("v"),
1434                    producer_id: 1,
1435                    producer_epoch: 0,
1436                    sequence: 0,
1437                    leader_epoch: None,
1438                },
1439                20,
1440            ),
1441            // variant 21: BeginTransaction
1442            (
1443                Request::BeginTransaction {
1444                    txn_id: "tx".into(),
1445                    producer_id: 1,
1446                    producer_epoch: 0,
1447                    timeout_ms: None,
1448                },
1449                21,
1450            ),
1451            // variant 22: AddPartitionsToTxn
1452            (
1453                Request::AddPartitionsToTxn {
1454                    txn_id: "tx".into(),
1455                    producer_id: 1,
1456                    producer_epoch: 0,
1457                    partitions: vec![],
1458                },
1459                22,
1460            ),
1461            // variant 23: TransactionalPublish
1462            (
1463                Request::TransactionalPublish {
1464                    txn_id: "tx".into(),
1465                    topic: "t".into(),
1466                    partition: None,
1467                    key: None,
1468                    value: Bytes::from("v"),
1469                    producer_id: 1,
1470                    producer_epoch: 0,
1471                    sequence: 0,
1472                    leader_epoch: None,
1473                },
1474                23,
1475            ),
1476            // variant 24: AddOffsetsToTxn
1477            (
1478                Request::AddOffsetsToTxn {
1479                    txn_id: "tx".into(),
1480                    producer_id: 1,
1481                    producer_epoch: 0,
1482                    group_id: "g".into(),
1483                    offsets: vec![],
1484                },
1485                24,
1486            ),
1487            // variant 25: CommitTransaction
1488            (
1489                Request::CommitTransaction {
1490                    txn_id: "tx".into(),
1491                    producer_id: 1,
1492                    producer_epoch: 0,
1493                },
1494                25,
1495            ),
1496            // variant 26: AbortTransaction
1497            (
1498                Request::AbortTransaction {
1499                    txn_id: "tx".into(),
1500                    producer_id: 1,
1501                    producer_epoch: 0,
1502                },
1503                26,
1504            ),
1505            // variant 27: DescribeQuotas
1506            (Request::DescribeQuotas { entities: vec![] }, 27),
1507            // variant 28: AlterQuotas
1508            (
1509                Request::AlterQuotas {
1510                    alterations: vec![],
1511                },
1512                28,
1513            ),
1514            // variant 29: AlterTopicConfig
1515            (
1516                Request::AlterTopicConfig {
1517                    topic: "t".into(),
1518                    configs: vec![],
1519                },
1520                29,
1521            ),
1522            // variant 30: CreatePartitions
1523            (
1524                Request::CreatePartitions {
1525                    topic: "t".into(),
1526                    new_partition_count: 2,
1527                    assignments: vec![],
1528                },
1529                30,
1530            ),
1531            // variant 31: DeleteRecords
1532            (
1533                Request::DeleteRecords {
1534                    topic: "t".into(),
1535                    partition_offsets: vec![],
1536                },
1537                31,
1538            ),
1539            // variant 32: DescribeTopicConfigs
1540            (Request::DescribeTopicConfigs { topics: vec![] }, 32),
1541            // variant 33: Handshake
1542            (
1543                Request::Handshake {
1544                    protocol_version: crate::PROTOCOL_VERSION,
1545                    client_id: "test".into(),
1546                },
1547                33,
1548            ),
1549        ];
1550
1551        for (request, expected_discriminant) in test_cases {
1552            let bytes = request.to_bytes().unwrap();
1553            assert_eq!(
1554                bytes[0], expected_discriminant,
1555                "Wire discriminant changed for {:?} — enum variant order may have shifted!",
1556                request
1557            );
1558        }
1559    }
1560
1561    /// Exhaustive — every Response variant is pinned.
1562    #[test]
1563    fn test_postcard_wire_stability_response_discriminants() {
1564        use bytes::Bytes;
1565
1566        let test_cases: Vec<(Response, u8)> = vec![
1567            // variant 0: Authenticated
1568            (
1569                Response::Authenticated {
1570                    session_id: String::new(),
1571                    expires_in: 0,
1572                },
1573                0,
1574            ),
1575            // variant 1: ScramServerFirst
1576            (
1577                Response::ScramServerFirst {
1578                    message: Bytes::from("r=nonce,s=salt,i=4096"),
1579                },
1580                1,
1581            ),
1582            // variant 2: ScramServerFinal
1583            (
1584                Response::ScramServerFinal {
1585                    message: Bytes::from("v=verifier"),
1586                    session_id: None,
1587                    expires_in: None,
1588                },
1589                2,
1590            ),
1591            // variant 3: Published
1592            (
1593                Response::Published {
1594                    offset: 0,
1595                    partition: 0,
1596                },
1597                3,
1598            ),
1599            // variant 4: Messages
1600            (Response::Messages { messages: vec![] }, 4),
1601            // variant 5: TopicCreated
1602            (
1603                Response::TopicCreated {
1604                    name: "t".into(),
1605                    partitions: 1,
1606                },
1607                5,
1608            ),
1609            // variant 6: Topics
1610            (Response::Topics { topics: vec![] }, 6),
1611            // variant 7: TopicDeleted
1612            (Response::TopicDeleted, 7),
1613            // variant 8: OffsetCommitted
1614            (Response::OffsetCommitted, 8),
1615            // variant 9: Offset
1616            (Response::Offset { offset: None }, 9),
1617            // variant 10: Metadata
1618            (
1619                Response::Metadata {
1620                    name: "t".into(),
1621                    partitions: 1,
1622                },
1623                10,
1624            ),
1625            // variant 11: ClusterMetadata
1626            (
1627                Response::ClusterMetadata {
1628                    controller_id: None,
1629                    brokers: vec![],
1630                    topics: vec![],
1631                },
1632                11,
1633            ),
1634            // variant 12: Pong
1635            (Response::Pong, 12),
1636            // variant 13: OffsetBounds
1637            (
1638                Response::OffsetBounds {
1639                    earliest: 0,
1640                    latest: 0,
1641                },
1642                13,
1643            ),
1644            // variant 14: Groups
1645            (Response::Groups { groups: vec![] }, 14),
1646            // variant 15: GroupDescription
1647            (
1648                Response::GroupDescription {
1649                    consumer_group: "g".into(),
1650                    offsets: HashMap::new(),
1651                },
1652                15,
1653            ),
1654            // variant 16: GroupDeleted
1655            (Response::GroupDeleted, 16),
1656            // variant 17: OffsetForTimestamp
1657            (Response::OffsetForTimestamp { offset: None }, 17),
1658            // variant 18: Error
1659            (
1660                Response::Error {
1661                    message: "e".into(),
1662                },
1663                18,
1664            ),
1665            // variant 19: Ok
1666            (Response::Ok, 19),
1667            // variant 20: ProducerIdInitialized
1668            (
1669                Response::ProducerIdInitialized {
1670                    producer_id: 1,
1671                    producer_epoch: 0,
1672                },
1673                20,
1674            ),
1675            // variant 21: IdempotentPublished
1676            (
1677                Response::IdempotentPublished {
1678                    offset: 0,
1679                    partition: 0,
1680                    duplicate: false,
1681                },
1682                21,
1683            ),
1684            // variant 22: TransactionStarted
1685            (
1686                Response::TransactionStarted {
1687                    txn_id: "tx".into(),
1688                },
1689                22,
1690            ),
1691            // variant 23: PartitionsAddedToTxn
1692            (
1693                Response::PartitionsAddedToTxn {
1694                    txn_id: "tx".into(),
1695                    partition_count: 0,
1696                },
1697                23,
1698            ),
1699            // variant 24: TransactionalPublished
1700            (
1701                Response::TransactionalPublished {
1702                    offset: 0,
1703                    partition: 0,
1704                    sequence: 0,
1705                },
1706                24,
1707            ),
1708            // variant 25: OffsetsAddedToTxn
1709            (
1710                Response::OffsetsAddedToTxn {
1711                    txn_id: "tx".into(),
1712                },
1713                25,
1714            ),
1715            // variant 26: TransactionCommitted
1716            (
1717                Response::TransactionCommitted {
1718                    txn_id: "tx".into(),
1719                },
1720                26,
1721            ),
1722            // variant 27: TransactionAborted
1723            (
1724                Response::TransactionAborted {
1725                    txn_id: "tx".into(),
1726                },
1727                27,
1728            ),
1729            // variant 28: QuotasDescribed
1730            (Response::QuotasDescribed { entries: vec![] }, 28),
1731            // variant 29: QuotasAltered
1732            (Response::QuotasAltered { altered_count: 0 }, 29),
1733            // variant 30: Throttled
1734            (
1735                Response::Throttled {
1736                    throttle_time_ms: 0,
1737                    quota_type: "produce_bytes_rate".into(),
1738                    entity: "user".into(),
1739                },
1740                30,
1741            ),
1742            // variant 31: TopicConfigAltered
1743            (
1744                Response::TopicConfigAltered {
1745                    topic: "t".into(),
1746                    changed_count: 0,
1747                },
1748                31,
1749            ),
1750            // variant 32: PartitionsCreated
1751            (
1752                Response::PartitionsCreated {
1753                    topic: "t".into(),
1754                    new_partition_count: 2,
1755                },
1756                32,
1757            ),
1758            // variant 33: RecordsDeleted
1759            (
1760                Response::RecordsDeleted {
1761                    topic: "t".into(),
1762                    results: vec![],
1763                },
1764                33,
1765            ),
1766            // variant 34: TopicConfigsDescribed
1767            (Response::TopicConfigsDescribed { configs: vec![] }, 34),
1768            // variant 35: HandshakeResult
1769            (
1770                Response::HandshakeResult {
1771                    server_version: crate::PROTOCOL_VERSION,
1772                    compatible: true,
1773                    message: String::new(),
1774                },
1775                35,
1776            ),
1777        ];
1778
1779        for (response, expected_discriminant) in test_cases {
1780            let bytes = response.to_bytes().unwrap();
1781            assert_eq!(
1782                bytes[0], expected_discriminant,
1783                "Wire discriminant changed for {:?} — enum variant order may have shifted!",
1784                response
1785            );
1786        }
1787    }
1788
1789    /// Verify that to_wire rejects oversized messages.
1790    #[test]
1791    fn test_to_wire_rejects_oversized_request() {
1792        use bytes::Bytes;
1793        // Create a request with a payload larger than MAX_MESSAGE_SIZE
1794        let huge_value = vec![0u8; crate::MAX_MESSAGE_SIZE + 1];
1795        let request = Request::Publish {
1796            topic: "t".into(),
1797            partition: None,
1798            key: None,
1799            value: Bytes::from(huge_value),
1800            leader_epoch: None,
1801        };
1802        let result = request.to_wire(crate::WireFormat::Postcard, 0);
1803        assert!(
1804            matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
1805            "Expected MessageTooLarge error for oversized request"
1806        );
1807    }
1808
1809    #[test]
1810    fn test_to_wire_rejects_oversized_response() {
1811        let huge_messages = vec![MessageData {
1812            offset: 0,
1813            partition: 0,
1814            timestamp: 0,
1815            key: None,
1816            value: bytes::Bytes::from(vec![0u8; crate::MAX_MESSAGE_SIZE + 1]),
1817            headers: vec![],
1818        }];
1819        let response = Response::Messages {
1820            messages: huge_messages,
1821        };
1822        let result = response.to_wire(crate::WireFormat::Postcard, 0);
1823        assert!(
1824            matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
1825            "Expected MessageTooLarge error for oversized response"
1826        );
1827    }
1828}