rivven_protocol/messages.rs
1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Quota alteration request
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QuotaAlteration {
13 /// Entity type: "user", "client-id", "consumer-group", "default"
14 pub entity_type: String,
15 /// Entity name (None for defaults)
16 pub entity_name: Option<String>,
17 /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
18 pub quota_key: String,
19 /// Quota value (None to remove quota, Some to set)
20 pub quota_value: Option<u64>,
21}
22
23/// Quota entry in describe response
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct QuotaEntry {
26 /// Entity type
27 pub entity_type: String,
28 /// Entity name (None for defaults)
29 pub entity_name: Option<String>,
30 /// Quota values
31 pub quotas: HashMap<String, u64>,
32}
33
34/// Topic configuration entry for AlterTopicConfig
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TopicConfigEntry {
37 /// Configuration key (e.g., "retention.ms", "max.message.bytes")
38 pub key: String,
39 /// Configuration value (None to reset to default)
40 pub value: Option<String>,
41}
42
43/// Topic configuration in describe response
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TopicConfigDescription {
46 /// Topic name
47 pub topic: String,
48 /// Configuration entries
49 pub configs: HashMap<String, TopicConfigValue>,
50}
51
52/// Topic configuration value with metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct TopicConfigValue {
55 /// Current value
56 pub value: String,
57 /// Whether this is the default value
58 pub is_default: bool,
59 /// Whether this config is read-only
60 pub is_read_only: bool,
61 /// Whether this config is sensitive (e.g., passwords)
62 pub is_sensitive: bool,
63}
64
65/// Delete records result for a partition
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DeleteRecordsResult {
68 /// Partition ID
69 pub partition: u32,
70 /// New low watermark (first available offset after deletion)
71 pub low_watermark: u64,
72 /// Error message if deletion failed for this partition
73 pub error: Option<String>,
74}
75
76/// Protocol request messages
77///
78/// # Stability
79///
80/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
81/// Adding new variants should only be done at the end of the enum.
82#[derive(Clone, Serialize, Deserialize)]
83pub enum Request {
84 /// Authenticate with username/password (SASL/PLAIN compatible).
85 ///
86 /// # Security — transport encryption required
87 ///
88 /// The password is sent in **plaintext** on the wire (SASL/PLAIN).
89 /// This variant must only be used over a TLS-encrypted connection;
90 /// otherwise the password is exposed to network observers.
91 ///
92 /// # Deprecation
93 ///
94 /// Prefer `ScramClientFirst` / `ScramClientFinal` (SCRAM-SHA-256
95 /// challenge-response) which never sends the password over the wire.
96 #[deprecated(
97 note = "Use SCRAM-SHA-256 (ScramClientFirst/ScramClientFinal) instead of plaintext auth"
98 )]
99 Authenticate {
100 username: String,
101 password: String,
102 /// When `true` the server **must** reject this request if the
103 /// connection is not TLS-encrypted, preventing accidental
104 /// credential exposure on cleartext transports.
105 #[serde(default)]
106 require_tls: bool,
107 },
108
109 /// Authenticate with SASL bytes (for Kafka client compatibility)
110 SaslAuthenticate {
111 #[serde(with = "crate::serde_utils::bytes_serde")]
112 mechanism: Bytes,
113 #[serde(with = "crate::serde_utils::bytes_serde")]
114 auth_bytes: Bytes,
115 },
116
117 /// SCRAM-SHA-256: Client-first message
118 ScramClientFirst {
119 /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
120 #[serde(with = "crate::serde_utils::bytes_serde")]
121 message: Bytes,
122 },
123
124 /// SCRAM-SHA-256: Client-final message
125 ScramClientFinal {
126 /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
127 #[serde(with = "crate::serde_utils::bytes_serde")]
128 message: Bytes,
129 },
130
131 /// Publish a message to a topic
132 Publish {
133 topic: String,
134 partition: Option<u32>,
135 #[serde(with = "crate::serde_utils::option_bytes_serde")]
136 key: Option<Bytes>,
137 #[serde(with = "crate::serde_utils::bytes_serde")]
138 value: Bytes,
139 /// Leader epoch for data-path fencing (§2.4).
140 /// When set, the broker rejects the write if its current leader epoch
141 /// for this partition is higher, preventing stale-leader writes.
142 #[serde(default)]
143 leader_epoch: Option<u64>,
144 },
145
146 /// Consume messages from a topic
147 Consume {
148 topic: String,
149 partition: u32,
150 offset: u64,
151 max_messages: usize,
152 /// Isolation level for transactional reads
153 /// None = read_uncommitted (default, backward compatible)
154 /// Some(0) = read_uncommitted
155 /// Some(1) = read_committed (filters aborted transaction messages)
156 #[serde(default)]
157 isolation_level: Option<u8>,
158 /// Long-polling: maximum time (ms) to wait for new data before returning
159 /// an empty response. None or 0 = immediate (no waiting). Capped at 30 000 ms.
160 #[serde(default)]
161 max_wait_ms: Option<u64>,
162 },
163
164 /// Create a new topic
165 CreateTopic {
166 name: String,
167 partitions: Option<u32>,
168 },
169
170 /// List all topics
171 ListTopics,
172
173 /// Delete a topic
174 DeleteTopic { name: String },
175
176 /// Commit consumer offset
177 CommitOffset {
178 consumer_group: String,
179 topic: String,
180 partition: u32,
181 offset: u64,
182 },
183
184 /// Get consumer offset
185 GetOffset {
186 consumer_group: String,
187 topic: String,
188 partition: u32,
189 },
190
191 /// Get topic metadata
192 GetMetadata { topic: String },
193
194 /// Get cluster metadata (all topics or specific ones)
195 GetClusterMetadata {
196 /// Topics to get metadata for (empty = all topics)
197 topics: Vec<String>,
198 },
199
200 /// Ping
201 Ping,
202
203 /// Get offset bounds for a partition
204 GetOffsetBounds { topic: String, partition: u32 },
205
206 /// List all consumer groups
207 ListGroups,
208
209 /// Describe a consumer group (get all offsets)
210 DescribeGroup { consumer_group: String },
211
212 /// Delete a consumer group
213 DeleteGroup { consumer_group: String },
214
215 /// Find offset for a timestamp
216 GetOffsetForTimestamp {
217 topic: String,
218 partition: u32,
219 /// Timestamp in milliseconds since epoch
220 timestamp_ms: i64,
221 },
222
223 // =========================================================================
224 // Idempotent Producer
225 // =========================================================================
226 /// Initialize idempotent producer (request producer ID and epoch)
227 ///
228 /// Call this before sending idempotent produce requests.
229 /// If reconnecting, provide the previous producer_id to bump epoch.
230 InitProducerId {
231 /// Previous producer ID (None for new producers)
232 producer_id: Option<u64>,
233 },
234
235 /// Publish with idempotent semantics (exactly-once delivery)
236 ///
237 /// Requires InitProducerId to have been called first.
238 IdempotentPublish {
239 topic: String,
240 partition: Option<u32>,
241 #[serde(with = "crate::serde_utils::option_bytes_serde")]
242 key: Option<Bytes>,
243 #[serde(with = "crate::serde_utils::bytes_serde")]
244 value: Bytes,
245 /// Producer ID from InitProducerId response
246 producer_id: u64,
247 /// Producer epoch from InitProducerId response
248 producer_epoch: u16,
249 /// Sequence number (starts at 0, increments per partition)
250 sequence: i32,
251 /// Leader epoch for fencing stale writes (§2.4)
252 #[serde(default)]
253 leader_epoch: Option<u64>,
254 },
255
256 // =========================================================================
257 // Native Transactions
258 // =========================================================================
259 /// Begin a new transaction
260 BeginTransaction {
261 /// Transaction ID (unique per producer)
262 txn_id: String,
263 /// Producer ID from InitProducerId
264 producer_id: u64,
265 /// Producer epoch
266 producer_epoch: u16,
267 /// Transaction timeout in milliseconds (optional, defaults to 60s)
268 timeout_ms: Option<u64>,
269 },
270
271 /// Add partitions to an active transaction
272 AddPartitionsToTxn {
273 /// Transaction ID
274 txn_id: String,
275 /// Producer ID
276 producer_id: u64,
277 /// Producer epoch
278 producer_epoch: u16,
279 /// Partitions to add (topic, partition pairs)
280 partitions: Vec<(String, u32)>,
281 },
282
283 /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
284 TransactionalPublish {
285 /// Transaction ID
286 txn_id: String,
287 topic: String,
288 partition: Option<u32>,
289 #[serde(with = "crate::serde_utils::option_bytes_serde")]
290 key: Option<Bytes>,
291 #[serde(with = "crate::serde_utils::bytes_serde")]
292 value: Bytes,
293 /// Producer ID
294 producer_id: u64,
295 /// Producer epoch
296 producer_epoch: u16,
297 /// Sequence number
298 sequence: i32,
299 /// Leader epoch for fencing stale writes (§2.4)
300 #[serde(default)]
301 leader_epoch: Option<u64>,
302 },
303
304 /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
305 AddOffsetsToTxn {
306 /// Transaction ID
307 txn_id: String,
308 /// Producer ID
309 producer_id: u64,
310 /// Producer epoch
311 producer_epoch: u16,
312 /// Consumer group ID
313 group_id: String,
314 /// Offsets to commit (topic, partition, offset triples)
315 offsets: Vec<(String, u32, i64)>,
316 },
317
318 /// Commit a transaction
319 CommitTransaction {
320 /// Transaction ID
321 txn_id: String,
322 /// Producer ID
323 producer_id: u64,
324 /// Producer epoch
325 producer_epoch: u16,
326 },
327
328 /// Abort a transaction
329 AbortTransaction {
330 /// Transaction ID
331 txn_id: String,
332 /// Producer ID
333 producer_id: u64,
334 /// Producer epoch
335 producer_epoch: u16,
336 },
337
338 // =========================================================================
339 // Per-Principal Quotas (Kafka Parity)
340 // =========================================================================
341 /// Describe quotas for entities
342 DescribeQuotas {
343 /// Entities to describe (empty = all)
344 /// Format: Vec<(entity_type, entity_name)>
345 /// entity_type: "user", "client-id", "consumer-group", "default"
346 /// entity_name: None for defaults, Some for specific entities
347 entities: Vec<(String, Option<String>)>,
348 },
349
350 /// Alter quotas for entities
351 AlterQuotas {
352 /// Quota alterations to apply
353 /// Each item: (entity_type, entity_name, quota_key, quota_value)
354 /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
355 /// quota_value: None to remove, Some(value) to set
356 alterations: Vec<QuotaAlteration>,
357 },
358
359 // =========================================================================
360 // Admin API (Kafka Parity)
361 // =========================================================================
362 /// Alter topic configuration
363 AlterTopicConfig {
364 /// Topic name
365 topic: String,
366 /// Configuration changes to apply
367 configs: Vec<TopicConfigEntry>,
368 },
369
370 /// Create additional partitions for an existing topic
371 CreatePartitions {
372 /// Topic name
373 topic: String,
374 /// New total partition count (must be > current count)
375 new_partition_count: u32,
376 /// Optional assignment of new partitions to brokers
377 /// If empty, broker will auto-assign
378 assignments: Vec<Vec<String>>,
379 },
380
381 /// Delete records before a given offset (log truncation)
382 DeleteRecords {
383 /// Topic name
384 topic: String,
385 /// Partition-offset pairs: delete all records before these offsets
386 partition_offsets: Vec<(u32, u64)>,
387 },
388
389 /// Describe topic configurations
390 DescribeTopicConfigs {
391 /// Topics to describe (empty = all)
392 topics: Vec<String>,
393 },
394
395 /// Protocol version handshake
396 ///
397 /// Sent by the client as the first message after connecting.
398 /// The server validates the protocol version and returns compatibility info.
399 Handshake {
400 /// Client's protocol version (must match `PROTOCOL_VERSION`)
401 protocol_version: u32,
402 /// Optional client identifier for diagnostics
403 client_id: String,
404 },
405}
406
407// Manual Debug impl to redact credentials from log output.
408// `Request::Authenticate { password }` and SASL/SCRAM variants contain
409// secrets that must never appear in debug logs.
410impl std::fmt::Debug for Request {
411 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
412 match self {
413 #[allow(deprecated)]
414 Self::Authenticate {
415 username,
416 require_tls,
417 ..
418 } => f
419 .debug_struct("Authenticate")
420 .field("username", username)
421 .field("password", &"[REDACTED]")
422 .field("require_tls", require_tls)
423 .finish(),
424 Self::SaslAuthenticate { mechanism, .. } => f
425 .debug_struct("SaslAuthenticate")
426 .field("mechanism", mechanism)
427 .field("auth_bytes", &"[REDACTED]")
428 .finish(),
429 Self::ScramClientFirst { .. } => f
430 .debug_struct("ScramClientFirst")
431 .field("message", &"[REDACTED]")
432 .finish(),
433 Self::ScramClientFinal { .. } => f
434 .debug_struct("ScramClientFinal")
435 .field("message", &"[REDACTED]")
436 .finish(),
437 Self::Publish {
438 topic,
439 partition,
440 key,
441 value,
442 leader_epoch,
443 } => f
444 .debug_struct("Publish")
445 .field("topic", topic)
446 .field("partition", partition)
447 .field("key", key)
448 .field("value_len", &value.len())
449 .field("leader_epoch", leader_epoch)
450 .finish(),
451 Self::Consume {
452 topic,
453 partition,
454 offset,
455 max_messages,
456 isolation_level,
457 max_wait_ms,
458 } => f
459 .debug_struct("Consume")
460 .field("topic", topic)
461 .field("partition", partition)
462 .field("offset", offset)
463 .field("max_messages", max_messages)
464 .field("isolation_level", isolation_level)
465 .field("max_wait_ms", max_wait_ms)
466 .finish(),
467 Self::CreateTopic { name, partitions } => f
468 .debug_struct("CreateTopic")
469 .field("name", name)
470 .field("partitions", partitions)
471 .finish(),
472 Self::ListTopics => write!(f, "ListTopics"),
473 Self::DeleteTopic { name } => {
474 f.debug_struct("DeleteTopic").field("name", name).finish()
475 }
476 Self::CommitOffset {
477 consumer_group,
478 topic,
479 partition,
480 offset,
481 } => f
482 .debug_struct("CommitOffset")
483 .field("consumer_group", consumer_group)
484 .field("topic", topic)
485 .field("partition", partition)
486 .field("offset", offset)
487 .finish(),
488 Self::GetOffset {
489 consumer_group,
490 topic,
491 partition,
492 } => f
493 .debug_struct("GetOffset")
494 .field("consumer_group", consumer_group)
495 .field("topic", topic)
496 .field("partition", partition)
497 .finish(),
498 Self::GetMetadata { topic } => {
499 f.debug_struct("GetMetadata").field("topic", topic).finish()
500 }
501 Self::GetClusterMetadata { topics } => f
502 .debug_struct("GetClusterMetadata")
503 .field("topics", topics)
504 .finish(),
505 Self::Ping => write!(f, "Ping"),
506 Self::GetOffsetBounds { topic, partition } => f
507 .debug_struct("GetOffsetBounds")
508 .field("topic", topic)
509 .field("partition", partition)
510 .finish(),
511 Self::ListGroups => write!(f, "ListGroups"),
512 Self::DescribeGroup { consumer_group } => f
513 .debug_struct("DescribeGroup")
514 .field("consumer_group", consumer_group)
515 .finish(),
516 Self::DeleteGroup { consumer_group } => f
517 .debug_struct("DeleteGroup")
518 .field("consumer_group", consumer_group)
519 .finish(),
520 Self::GetOffsetForTimestamp {
521 topic,
522 partition,
523 timestamp_ms,
524 } => f
525 .debug_struct("GetOffsetForTimestamp")
526 .field("topic", topic)
527 .field("partition", partition)
528 .field("timestamp_ms", timestamp_ms)
529 .finish(),
530 Self::InitProducerId { producer_id } => f
531 .debug_struct("InitProducerId")
532 .field("producer_id", producer_id)
533 .finish(),
534 Self::IdempotentPublish {
535 topic,
536 partition,
537 producer_id,
538 producer_epoch,
539 sequence,
540 ..
541 } => f
542 .debug_struct("IdempotentPublish")
543 .field("topic", topic)
544 .field("partition", partition)
545 .field("producer_id", producer_id)
546 .field("producer_epoch", producer_epoch)
547 .field("sequence", sequence)
548 .finish(),
549 Self::BeginTransaction {
550 txn_id,
551 producer_id,
552 producer_epoch,
553 timeout_ms,
554 } => f
555 .debug_struct("BeginTransaction")
556 .field("txn_id", txn_id)
557 .field("producer_id", producer_id)
558 .field("producer_epoch", producer_epoch)
559 .field("timeout_ms", timeout_ms)
560 .finish(),
561 Self::AddPartitionsToTxn {
562 txn_id,
563 producer_id,
564 producer_epoch,
565 partitions,
566 } => f
567 .debug_struct("AddPartitionsToTxn")
568 .field("txn_id", txn_id)
569 .field("producer_id", producer_id)
570 .field("producer_epoch", producer_epoch)
571 .field("partitions", partitions)
572 .finish(),
573 Self::TransactionalPublish {
574 txn_id,
575 topic,
576 partition,
577 producer_id,
578 producer_epoch,
579 sequence,
580 ..
581 } => f
582 .debug_struct("TransactionalPublish")
583 .field("txn_id", txn_id)
584 .field("topic", topic)
585 .field("partition", partition)
586 .field("producer_id", producer_id)
587 .field("producer_epoch", producer_epoch)
588 .field("sequence", sequence)
589 .finish(),
590 Self::AddOffsetsToTxn {
591 txn_id,
592 producer_id,
593 producer_epoch,
594 group_id,
595 offsets,
596 } => f
597 .debug_struct("AddOffsetsToTxn")
598 .field("txn_id", txn_id)
599 .field("producer_id", producer_id)
600 .field("producer_epoch", producer_epoch)
601 .field("group_id", group_id)
602 .field("offsets", offsets)
603 .finish(),
604 Self::CommitTransaction {
605 txn_id,
606 producer_id,
607 producer_epoch,
608 } => f
609 .debug_struct("CommitTransaction")
610 .field("txn_id", txn_id)
611 .field("producer_id", producer_id)
612 .field("producer_epoch", producer_epoch)
613 .finish(),
614 Self::AbortTransaction {
615 txn_id,
616 producer_id,
617 producer_epoch,
618 } => f
619 .debug_struct("AbortTransaction")
620 .field("txn_id", txn_id)
621 .field("producer_id", producer_id)
622 .field("producer_epoch", producer_epoch)
623 .finish(),
624 Self::DescribeQuotas { entities } => f
625 .debug_struct("DescribeQuotas")
626 .field("entities", entities)
627 .finish(),
628 Self::AlterQuotas { alterations } => f
629 .debug_struct("AlterQuotas")
630 .field("alterations", alterations)
631 .finish(),
632 Self::AlterTopicConfig { topic, configs } => f
633 .debug_struct("AlterTopicConfig")
634 .field("topic", topic)
635 .field("configs", configs)
636 .finish(),
637 Self::CreatePartitions {
638 topic,
639 new_partition_count,
640 assignments,
641 } => f
642 .debug_struct("CreatePartitions")
643 .field("topic", topic)
644 .field("new_partition_count", new_partition_count)
645 .field("assignments", assignments)
646 .finish(),
647 Self::DeleteRecords {
648 topic,
649 partition_offsets,
650 } => f
651 .debug_struct("DeleteRecords")
652 .field("topic", topic)
653 .field("partition_offsets", partition_offsets)
654 .finish(),
655 Self::DescribeTopicConfigs { topics } => f
656 .debug_struct("DescribeTopicConfigs")
657 .field("topics", topics)
658 .finish(),
659 Self::Handshake {
660 protocol_version,
661 client_id,
662 } => f
663 .debug_struct("Handshake")
664 .field("protocol_version", protocol_version)
665 .field("client_id", client_id)
666 .finish(),
667 }
668 }
669}
670
671/// Protocol response messages
672///
673/// # Stability
674///
675/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
676/// Adding new variants should only be done at the end of the enum.
677#[derive(Debug, Clone, Serialize, Deserialize)]
678pub enum Response {
679 /// Authentication successful
680 Authenticated {
681 /// Session token for subsequent requests
682 session_id: String,
683 /// Session timeout in seconds
684 expires_in: u64,
685 },
686
687 /// SCRAM-SHA-256: Server-first message (challenge)
688 ScramServerFirst {
689 /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
690 #[serde(with = "crate::serde_utils::bytes_serde")]
691 message: Bytes,
692 },
693
694 /// SCRAM-SHA-256: Server-final message (verification or error)
695 ScramServerFinal {
696 /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
697 #[serde(with = "crate::serde_utils::bytes_serde")]
698 message: Bytes,
699 /// Session ID (if authentication succeeded)
700 session_id: Option<String>,
701 /// Session timeout in seconds (if authentication succeeded)
702 expires_in: Option<u64>,
703 },
704
705 /// Success response with offset
706 Published { offset: u64, partition: u32 },
707
708 /// Messages response
709 Messages { messages: Vec<MessageData> },
710
711 /// Topic created
712 TopicCreated { name: String, partitions: u32 },
713
714 /// List of topics
715 Topics { topics: Vec<String> },
716
717 /// Topic deleted
718 TopicDeleted,
719
720 /// Offset committed
721 OffsetCommitted,
722
723 /// Offset response
724 Offset { offset: Option<u64> },
725
726 /// Metadata
727 Metadata { name: String, partitions: u32 },
728
729 /// Full cluster metadata for topic(s)
730 ClusterMetadata {
731 /// Controller node ID (Raft leader)
732 controller_id: Option<String>,
733 /// Broker/node list
734 brokers: Vec<BrokerInfo>,
735 /// Topic metadata
736 topics: Vec<TopicMetadata>,
737 },
738
739 /// Pong
740 Pong,
741
742 /// Offset bounds for a partition
743 OffsetBounds { earliest: u64, latest: u64 },
744
745 /// List of consumer groups
746 Groups { groups: Vec<String> },
747
748 /// Consumer group details with all offsets
749 GroupDescription {
750 consumer_group: String,
751 /// topic → partition → offset
752 offsets: HashMap<String, HashMap<u32, u64>>,
753 },
754
755 /// Consumer group deleted
756 GroupDeleted,
757
758 /// Offset for a timestamp
759 OffsetForTimestamp {
760 /// The first offset with timestamp >= the requested timestamp
761 /// None if no matching offset was found
762 offset: Option<u64>,
763 },
764
765 /// Error response
766 Error { message: String },
767
768 /// Success
769 Ok,
770
771 // =========================================================================
772 // Idempotent Producer
773 // =========================================================================
774 /// Producer ID initialized
775 ProducerIdInitialized {
776 /// Assigned or existing producer ID
777 producer_id: u64,
778 /// Current epoch (increments on reconnect)
779 producer_epoch: u16,
780 },
781
782 /// Idempotent publish result
783 IdempotentPublished {
784 /// Offset where message was written
785 offset: u64,
786 /// Partition the message was written to
787 partition: u32,
788 /// Whether this was a duplicate (message already existed)
789 duplicate: bool,
790 },
791
792 // =========================================================================
793 // Native Transactions
794 // =========================================================================
795 /// Transaction started successfully
796 TransactionStarted {
797 /// Transaction ID
798 txn_id: String,
799 },
800
801 /// Partitions added to transaction
802 PartitionsAddedToTxn {
803 /// Transaction ID
804 txn_id: String,
805 /// Number of partitions now in transaction
806 partition_count: usize,
807 },
808
809 /// Transactional publish result
810 TransactionalPublished {
811 /// Offset where message was written (pending commit)
812 offset: u64,
813 /// Partition the message was written to
814 partition: u32,
815 /// Sequence number accepted
816 sequence: i32,
817 },
818
819 /// Offsets added to transaction
820 OffsetsAddedToTxn {
821 /// Transaction ID
822 txn_id: String,
823 },
824
825 /// Transaction committed
826 TransactionCommitted {
827 /// Transaction ID
828 txn_id: String,
829 },
830
831 /// Transaction aborted
832 TransactionAborted {
833 /// Transaction ID
834 txn_id: String,
835 },
836
837 // =========================================================================
838 // Per-Principal Quotas (Kafka Parity)
839 // =========================================================================
840 /// Quota descriptions
841 QuotasDescribed {
842 /// List of quota entries
843 entries: Vec<QuotaEntry>,
844 },
845
846 /// Quotas altered successfully
847 QuotasAltered {
848 /// Number of quota alterations applied
849 altered_count: usize,
850 },
851
852 /// Throttle response (returned when quota exceeded)
853 Throttled {
854 /// Time to wait before retrying (milliseconds)
855 throttle_time_ms: u64,
856 /// Quota type that was exceeded
857 quota_type: String,
858 /// Entity that exceeded quota
859 entity: String,
860 },
861
862 // =========================================================================
863 // Admin API (Kafka Parity)
864 // =========================================================================
865 /// Topic configuration altered
866 TopicConfigAltered {
867 /// Topic name
868 topic: String,
869 /// Number of configurations changed
870 changed_count: usize,
871 },
872
873 /// Partitions created
874 PartitionsCreated {
875 /// Topic name
876 topic: String,
877 /// New total partition count
878 new_partition_count: u32,
879 },
880
881 /// Records deleted
882 RecordsDeleted {
883 /// Topic name
884 topic: String,
885 /// Results per partition
886 results: Vec<DeleteRecordsResult>,
887 },
888
889 /// Topic configurations described
890 TopicConfigsDescribed {
891 /// Configuration descriptions per topic
892 configs: Vec<TopicConfigDescription>,
893 },
894
895 /// Protocol version handshake response
896 HandshakeResult {
897 /// Server's protocol version
898 server_version: u32,
899 /// Whether the client version is compatible
900 compatible: bool,
901 /// Human-readable message (e.g. reason for incompatibility)
902 message: String,
903 },
904}
905
906impl Request {
907 /// Serialize request to bytes (postcard format, no format prefix)
908 ///
909 /// For internal Rust-to-Rust communication where format is known.
910 /// Use `to_wire()` for wire transmission with format detection support.
911 #[inline]
912 pub fn to_bytes(&self) -> Result<Vec<u8>> {
913 Ok(postcard::to_allocvec(self)?)
914 }
915
916 /// Deserialize request from bytes (postcard format)
917 ///
918 /// For internal Rust-to-Rust communication where format is known.
919 /// Use `from_wire()` for wire transmission with format detection support.
920 #[inline]
921 pub fn from_bytes(data: &[u8]) -> Result<Self> {
922 Ok(postcard::from_bytes(data)?)
923 }
924
925 /// Serialize request with wire format prefix
926 ///
927 /// Wire format: `[format_byte][correlation_id (4 bytes BE)][payload]`
928 /// - format_byte: 0x00 = postcard, 0x01 = protobuf
929 /// - correlation_id: 4-byte big-endian u32 for request-response matching
930 /// - payload: serialized message
931 ///
932 /// ## Correlation ID sizing
933 ///
934 /// The wire protocol uses `u32` for correlation IDs (4 bytes), which is
935 /// the Kafka-compatible choice and sufficient for client-server RPCs
936 /// (4 billion in-flight requests before wrap-around). The cluster-internal
937 /// protocol (`rivven-cluster`) uses `u64` for its own RPC correlation to
938 /// avoid any wrap-around concern on high-throughput inter-node links.
939 /// The two namespaces are independent and never cross boundaries.
940 ///
941 /// Note: Length prefix is NOT included (handled by transport layer)
942 ///
943 /// # Errors
944 ///
945 /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
946 /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
947 #[inline]
948 pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
949 let result = match format {
950 crate::WireFormat::Postcard => {
951 // Single allocation — serialize directly into the output
952 // Vec via `postcard::to_extend` instead of double-allocating
953 // (to_allocvec → intermediate Vec → copy into result Vec).
954 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + 128);
955 result.push(format.as_byte());
956 result.extend_from_slice(&correlation_id.to_be_bytes());
957 postcard::to_extend(self, result)?
958 }
959 crate::WireFormat::Protobuf => {
960 // Protobuf requires the `protobuf` feature
961 #[cfg(feature = "protobuf")]
962 {
963 let payload = self.to_proto_bytes()?;
964 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
965 result.push(format.as_byte());
966 result.extend_from_slice(&correlation_id.to_be_bytes());
967 result.extend_from_slice(&payload);
968 result
969 }
970 #[cfg(not(feature = "protobuf"))]
971 {
972 return Err(crate::ProtocolError::Serialization(
973 "Protobuf support requires the 'protobuf' feature".into(),
974 ));
975 }
976 }
977 };
978
979 // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
980 if result.len() > crate::MAX_MESSAGE_SIZE {
981 return Err(crate::ProtocolError::MessageTooLarge(
982 result.len(),
983 crate::MAX_MESSAGE_SIZE,
984 ));
985 }
986
987 Ok(result)
988 }
989
990 /// Deserialize request with format auto-detection
991 ///
992 /// Detects format from first byte, reads correlation_id, and deserializes accordingly.
993 /// Returns the deserialized request, the detected format, and the correlation_id.
994 #[inline]
995 pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
996 if data.len() < crate::WIRE_HEADER_SIZE {
997 return Err(crate::ProtocolError::Serialization(
998 "Wire data too short (need format byte + correlation_id)".into(),
999 ));
1000 }
1001
1002 let format_byte = data[0];
1003 let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1004 crate::ProtocolError::Serialization(format!(
1005 "Unknown wire format: 0x{:02x}",
1006 format_byte
1007 ))
1008 })?;
1009
1010 let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1011 let payload = &data[crate::WIRE_HEADER_SIZE..];
1012
1013 match format {
1014 crate::WireFormat::Postcard => {
1015 let request = postcard::from_bytes(payload)?;
1016 Ok((request, format, correlation_id))
1017 }
1018 crate::WireFormat::Protobuf => {
1019 #[cfg(feature = "protobuf")]
1020 {
1021 let request = Self::from_proto_bytes(payload)?;
1022 Ok((request, format, correlation_id))
1023 }
1024 #[cfg(not(feature = "protobuf"))]
1025 {
1026 Err(crate::ProtocolError::Serialization(
1027 "Protobuf support requires the 'protobuf' feature".into(),
1028 ))
1029 }
1030 }
1031 }
1032 }
1033}
1034
1035impl Response {
1036 /// Serialize response to bytes (postcard format, no format prefix)
1037 #[inline]
1038 pub fn to_bytes(&self) -> Result<Vec<u8>> {
1039 Ok(postcard::to_allocvec(self)?)
1040 }
1041
1042 /// Deserialize response from bytes (postcard format)
1043 #[inline]
1044 pub fn from_bytes(data: &[u8]) -> Result<Self> {
1045 Ok(postcard::from_bytes(data)?)
1046 }
1047
1048 /// Serialize response with wire format prefix
1049 ///
1050 /// # Errors
1051 ///
1052 /// Returns [`ProtocolError::MessageTooLarge`](crate::ProtocolError::MessageTooLarge) if the serialized message
1053 /// exceeds [`MAX_MESSAGE_SIZE`](crate::MAX_MESSAGE_SIZE).
1054 #[inline]
1055 pub fn to_wire(&self, format: crate::WireFormat, correlation_id: u32) -> Result<Vec<u8>> {
1056 let result = match format {
1057 crate::WireFormat::Postcard => {
1058 // Estimate payload size to avoid reallocations.
1059 // For Messages responses, use message count × estimated per-message
1060 // size (offset 8 + partition 4 + value ~256 + timestamp 8 + overhead ~24 ≈ 300 bytes).
1061 // For other variants the default 128-byte hint is usually sufficient.
1062 let size_hint = match self {
1063 Response::Messages { messages } => messages.len().saturating_mul(300).max(128),
1064 _ => 128,
1065 };
1066 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + size_hint);
1067 result.push(format.as_byte());
1068 result.extend_from_slice(&correlation_id.to_be_bytes());
1069 postcard::to_extend(self, result)?
1070 }
1071 crate::WireFormat::Protobuf => {
1072 #[cfg(feature = "protobuf")]
1073 {
1074 let payload = self.to_proto_bytes()?;
1075 let mut result = Vec::with_capacity(crate::WIRE_HEADER_SIZE + payload.len());
1076 result.push(format.as_byte());
1077 result.extend_from_slice(&correlation_id.to_be_bytes());
1078 result.extend_from_slice(&payload);
1079 result
1080 }
1081 #[cfg(not(feature = "protobuf"))]
1082 {
1083 return Err(crate::ProtocolError::Serialization(
1084 "Protobuf support requires the 'protobuf' feature".into(),
1085 ));
1086 }
1087 }
1088 };
1089
1090 // Enforce MAX_MESSAGE_SIZE before the bytes leave this crate.
1091 if result.len() > crate::MAX_MESSAGE_SIZE {
1092 return Err(crate::ProtocolError::MessageTooLarge(
1093 result.len(),
1094 crate::MAX_MESSAGE_SIZE,
1095 ));
1096 }
1097
1098 Ok(result)
1099 }
1100
1101 /// Deserialize response with format auto-detection
1102 #[inline]
1103 pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat, u32)> {
1104 if data.len() < crate::WIRE_HEADER_SIZE {
1105 return Err(crate::ProtocolError::Serialization(
1106 "Wire data too short (need format byte + correlation_id)".into(),
1107 ));
1108 }
1109
1110 let format_byte = data[0];
1111 let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
1112 crate::ProtocolError::Serialization(format!(
1113 "Unknown wire format: 0x{:02x}",
1114 format_byte
1115 ))
1116 })?;
1117
1118 let correlation_id = u32::from_be_bytes([data[1], data[2], data[3], data[4]]);
1119 let payload = &data[crate::WIRE_HEADER_SIZE..];
1120
1121 match format {
1122 crate::WireFormat::Postcard => {
1123 let response = postcard::from_bytes(payload)?;
1124 Ok((response, format, correlation_id))
1125 }
1126 crate::WireFormat::Protobuf => {
1127 #[cfg(feature = "protobuf")]
1128 {
1129 let response = Self::from_proto_bytes(payload)?;
1130 Ok((response, format, correlation_id))
1131 }
1132 #[cfg(not(feature = "protobuf"))]
1133 {
1134 Err(crate::ProtocolError::Serialization(
1135 "Protobuf support requires the 'protobuf' feature".into(),
1136 ))
1137 }
1138 }
1139 }
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use super::*;
1146
1147 #[test]
1148 #[allow(deprecated)]
1149 fn test_request_roundtrip() {
1150 let requests = vec![
1151 Request::Ping,
1152 Request::ListTopics,
1153 Request::CreateTopic {
1154 name: "test".to_string(),
1155 partitions: Some(4),
1156 },
1157 Request::Authenticate {
1158 username: "admin".to_string(),
1159 password: "secret".to_string(),
1160 require_tls: true,
1161 },
1162 ];
1163
1164 for req in requests {
1165 let bytes = req.to_bytes().unwrap();
1166 let decoded = Request::from_bytes(&bytes).unwrap();
1167 // Can't directly compare due to Debug, but serialization should succeed
1168 assert!(!bytes.is_empty());
1169 let _ = decoded; // Use decoded
1170 }
1171 }
1172
1173 #[test]
1174 fn test_response_roundtrip() {
1175 let responses = vec![
1176 Response::Pong,
1177 Response::Ok,
1178 Response::Topics {
1179 topics: vec!["a".to_string(), "b".to_string()],
1180 },
1181 Response::Error {
1182 message: "test error".to_string(),
1183 },
1184 ];
1185
1186 for resp in responses {
1187 let bytes = resp.to_bytes().unwrap();
1188 let decoded = Response::from_bytes(&bytes).unwrap();
1189 assert!(!bytes.is_empty());
1190 let _ = decoded;
1191 }
1192 }
1193
1194 #[test]
1195 fn test_request_wire_roundtrip() {
1196 let request = Request::Ping;
1197
1198 // Serialize with format prefix and correlation_id
1199 let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 42).unwrap();
1200
1201 // First byte should be format identifier
1202 assert_eq!(wire_bytes[0], 0x00); // Postcard format
1203
1204 // Deserialize with auto-detection
1205 let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1206 assert_eq!(format, crate::WireFormat::Postcard);
1207 assert_eq!(correlation_id, 42);
1208 assert!(matches!(decoded, Request::Ping));
1209 }
1210
1211 #[test]
1212 fn test_response_wire_roundtrip() {
1213 let response = Response::Pong;
1214
1215 // Serialize with format prefix and correlation_id
1216 let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 99).unwrap();
1217
1218 // First byte should be format identifier
1219 assert_eq!(wire_bytes[0], 0x00); // Postcard format
1220
1221 // Deserialize with auto-detection
1222 let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1223 assert_eq!(format, crate::WireFormat::Postcard);
1224 assert_eq!(correlation_id, 99);
1225 assert!(matches!(decoded, Response::Pong));
1226 }
1227
1228 #[test]
1229 fn test_wire_format_empty_data() {
1230 let result = Request::from_wire(&[]);
1231 assert!(result.is_err());
1232 }
1233
1234 #[test]
1235 fn test_wire_format_complex_request() {
1236 use bytes::Bytes;
1237
1238 let request = Request::Publish {
1239 topic: "test-topic".to_string(),
1240 partition: Some(3),
1241 key: Some(Bytes::from("my-key")),
1242 value: Bytes::from("hello world"),
1243 leader_epoch: None,
1244 };
1245
1246 let wire_bytes = request.to_wire(crate::WireFormat::Postcard, 1).unwrap();
1247 assert_eq!(wire_bytes[0], 0x00);
1248
1249 let (decoded, format, correlation_id) = Request::from_wire(&wire_bytes).unwrap();
1250 assert_eq!(format, crate::WireFormat::Postcard);
1251 assert_eq!(correlation_id, 1);
1252
1253 // Verify the decoded request matches
1254 if let Request::Publish {
1255 topic, partition, ..
1256 } = decoded
1257 {
1258 assert_eq!(topic, "test-topic");
1259 assert_eq!(partition, Some(3));
1260 } else {
1261 panic!("Expected Publish request");
1262 }
1263 }
1264
1265 #[test]
1266 fn test_wire_format_complex_response() {
1267 let response = Response::Published {
1268 offset: 12345,
1269 partition: 7,
1270 };
1271
1272 let wire_bytes = response.to_wire(crate::WireFormat::Postcard, 2).unwrap();
1273 assert_eq!(wire_bytes[0], 0x00);
1274
1275 let (decoded, format, correlation_id) = Response::from_wire(&wire_bytes).unwrap();
1276 assert_eq!(format, crate::WireFormat::Postcard);
1277 assert_eq!(correlation_id, 2);
1278
1279 if let Response::Published { offset, partition } = decoded {
1280 assert_eq!(offset, 12345);
1281 assert_eq!(partition, 7);
1282 } else {
1283 panic!("Expected Published response");
1284 }
1285 }
1286
1287 /// Snapshot test: postcard serializes enum variants by ordinal position.
1288 /// If someone reorders variants in Request or Response, this test fails
1289 /// because the byte prefix (discriminant) will change, breaking wire compat.
1290 ///
1291 /// Exhaustive — every Request variant is pinned.
1292 #[test]
1293 #[allow(deprecated)]
1294 fn test_postcard_wire_stability_request_discriminants() {
1295 use bytes::Bytes;
1296
1297 // Serialize a representative of each variant and check the leading
1298 // discriminant byte(s) haven't shifted. Postcard uses varint encoding
1299 // for enum discriminants.
1300 let test_cases: Vec<(Request, u8)> = vec![
1301 // variant 0: Authenticate
1302 (
1303 Request::Authenticate {
1304 username: "u".into(),
1305 password: "p".into(),
1306 require_tls: false,
1307 },
1308 0,
1309 ),
1310 // variant 1: SaslAuthenticate
1311 (
1312 Request::SaslAuthenticate {
1313 mechanism: Bytes::from("PLAIN"),
1314 auth_bytes: Bytes::from("data"),
1315 },
1316 1,
1317 ),
1318 // variant 2: ScramClientFirst
1319 (
1320 Request::ScramClientFirst {
1321 message: Bytes::from("n,,n=user,r=nonce"),
1322 },
1323 2,
1324 ),
1325 // variant 3: ScramClientFinal
1326 (
1327 Request::ScramClientFinal {
1328 message: Bytes::from("c=bind,r=nonce,p=proof"),
1329 },
1330 3,
1331 ),
1332 // variant 4: Publish
1333 (
1334 Request::Publish {
1335 topic: "t".into(),
1336 partition: None,
1337 key: None,
1338 value: Bytes::from("v"),
1339 leader_epoch: None,
1340 },
1341 4,
1342 ),
1343 // variant 5: Consume
1344 (
1345 Request::Consume {
1346 topic: "t".into(),
1347 partition: 0,
1348 offset: 0,
1349 max_messages: 1,
1350 isolation_level: None,
1351 max_wait_ms: None,
1352 },
1353 5,
1354 ),
1355 // variant 6: CreateTopic
1356 (
1357 Request::CreateTopic {
1358 name: "t".into(),
1359 partitions: None,
1360 },
1361 6,
1362 ),
1363 // variant 7: ListTopics
1364 (Request::ListTopics, 7),
1365 // variant 8: DeleteTopic
1366 (Request::DeleteTopic { name: "t".into() }, 8),
1367 // variant 9: CommitOffset
1368 (
1369 Request::CommitOffset {
1370 consumer_group: "g".into(),
1371 topic: "t".into(),
1372 partition: 0,
1373 offset: 0,
1374 },
1375 9,
1376 ),
1377 // variant 10: GetOffset
1378 (
1379 Request::GetOffset {
1380 consumer_group: "g".into(),
1381 topic: "t".into(),
1382 partition: 0,
1383 },
1384 10,
1385 ),
1386 // variant 11: GetMetadata
1387 (Request::GetMetadata { topic: "t".into() }, 11),
1388 // variant 12: GetClusterMetadata
1389 (Request::GetClusterMetadata { topics: vec![] }, 12),
1390 // variant 13: Ping
1391 (Request::Ping, 13),
1392 // variant 14: GetOffsetBounds
1393 (
1394 Request::GetOffsetBounds {
1395 topic: "t".into(),
1396 partition: 0,
1397 },
1398 14,
1399 ),
1400 // variant 15: ListGroups
1401 (Request::ListGroups, 15),
1402 // variant 16: DescribeGroup
1403 (
1404 Request::DescribeGroup {
1405 consumer_group: "g".into(),
1406 },
1407 16,
1408 ),
1409 // variant 17: DeleteGroup
1410 (
1411 Request::DeleteGroup {
1412 consumer_group: "g".into(),
1413 },
1414 17,
1415 ),
1416 // variant 18: GetOffsetForTimestamp
1417 (
1418 Request::GetOffsetForTimestamp {
1419 topic: "t".into(),
1420 partition: 0,
1421 timestamp_ms: 0,
1422 },
1423 18,
1424 ),
1425 // variant 19: InitProducerId
1426 (Request::InitProducerId { producer_id: None }, 19),
1427 // variant 20: IdempotentPublish
1428 (
1429 Request::IdempotentPublish {
1430 topic: "t".into(),
1431 partition: None,
1432 key: None,
1433 value: Bytes::from("v"),
1434 producer_id: 1,
1435 producer_epoch: 0,
1436 sequence: 0,
1437 leader_epoch: None,
1438 },
1439 20,
1440 ),
1441 // variant 21: BeginTransaction
1442 (
1443 Request::BeginTransaction {
1444 txn_id: "tx".into(),
1445 producer_id: 1,
1446 producer_epoch: 0,
1447 timeout_ms: None,
1448 },
1449 21,
1450 ),
1451 // variant 22: AddPartitionsToTxn
1452 (
1453 Request::AddPartitionsToTxn {
1454 txn_id: "tx".into(),
1455 producer_id: 1,
1456 producer_epoch: 0,
1457 partitions: vec![],
1458 },
1459 22,
1460 ),
1461 // variant 23: TransactionalPublish
1462 (
1463 Request::TransactionalPublish {
1464 txn_id: "tx".into(),
1465 topic: "t".into(),
1466 partition: None,
1467 key: None,
1468 value: Bytes::from("v"),
1469 producer_id: 1,
1470 producer_epoch: 0,
1471 sequence: 0,
1472 leader_epoch: None,
1473 },
1474 23,
1475 ),
1476 // variant 24: AddOffsetsToTxn
1477 (
1478 Request::AddOffsetsToTxn {
1479 txn_id: "tx".into(),
1480 producer_id: 1,
1481 producer_epoch: 0,
1482 group_id: "g".into(),
1483 offsets: vec![],
1484 },
1485 24,
1486 ),
1487 // variant 25: CommitTransaction
1488 (
1489 Request::CommitTransaction {
1490 txn_id: "tx".into(),
1491 producer_id: 1,
1492 producer_epoch: 0,
1493 },
1494 25,
1495 ),
1496 // variant 26: AbortTransaction
1497 (
1498 Request::AbortTransaction {
1499 txn_id: "tx".into(),
1500 producer_id: 1,
1501 producer_epoch: 0,
1502 },
1503 26,
1504 ),
1505 // variant 27: DescribeQuotas
1506 (Request::DescribeQuotas { entities: vec![] }, 27),
1507 // variant 28: AlterQuotas
1508 (
1509 Request::AlterQuotas {
1510 alterations: vec![],
1511 },
1512 28,
1513 ),
1514 // variant 29: AlterTopicConfig
1515 (
1516 Request::AlterTopicConfig {
1517 topic: "t".into(),
1518 configs: vec![],
1519 },
1520 29,
1521 ),
1522 // variant 30: CreatePartitions
1523 (
1524 Request::CreatePartitions {
1525 topic: "t".into(),
1526 new_partition_count: 2,
1527 assignments: vec![],
1528 },
1529 30,
1530 ),
1531 // variant 31: DeleteRecords
1532 (
1533 Request::DeleteRecords {
1534 topic: "t".into(),
1535 partition_offsets: vec![],
1536 },
1537 31,
1538 ),
1539 // variant 32: DescribeTopicConfigs
1540 (Request::DescribeTopicConfigs { topics: vec![] }, 32),
1541 // variant 33: Handshake
1542 (
1543 Request::Handshake {
1544 protocol_version: crate::PROTOCOL_VERSION,
1545 client_id: "test".into(),
1546 },
1547 33,
1548 ),
1549 ];
1550
1551 for (request, expected_discriminant) in test_cases {
1552 let bytes = request.to_bytes().unwrap();
1553 assert_eq!(
1554 bytes[0], expected_discriminant,
1555 "Wire discriminant changed for {:?} — enum variant order may have shifted!",
1556 request
1557 );
1558 }
1559 }
1560
1561 /// Exhaustive — every Response variant is pinned.
1562 #[test]
1563 fn test_postcard_wire_stability_response_discriminants() {
1564 use bytes::Bytes;
1565
1566 let test_cases: Vec<(Response, u8)> = vec![
1567 // variant 0: Authenticated
1568 (
1569 Response::Authenticated {
1570 session_id: String::new(),
1571 expires_in: 0,
1572 },
1573 0,
1574 ),
1575 // variant 1: ScramServerFirst
1576 (
1577 Response::ScramServerFirst {
1578 message: Bytes::from("r=nonce,s=salt,i=4096"),
1579 },
1580 1,
1581 ),
1582 // variant 2: ScramServerFinal
1583 (
1584 Response::ScramServerFinal {
1585 message: Bytes::from("v=verifier"),
1586 session_id: None,
1587 expires_in: None,
1588 },
1589 2,
1590 ),
1591 // variant 3: Published
1592 (
1593 Response::Published {
1594 offset: 0,
1595 partition: 0,
1596 },
1597 3,
1598 ),
1599 // variant 4: Messages
1600 (Response::Messages { messages: vec![] }, 4),
1601 // variant 5: TopicCreated
1602 (
1603 Response::TopicCreated {
1604 name: "t".into(),
1605 partitions: 1,
1606 },
1607 5,
1608 ),
1609 // variant 6: Topics
1610 (Response::Topics { topics: vec![] }, 6),
1611 // variant 7: TopicDeleted
1612 (Response::TopicDeleted, 7),
1613 // variant 8: OffsetCommitted
1614 (Response::OffsetCommitted, 8),
1615 // variant 9: Offset
1616 (Response::Offset { offset: None }, 9),
1617 // variant 10: Metadata
1618 (
1619 Response::Metadata {
1620 name: "t".into(),
1621 partitions: 1,
1622 },
1623 10,
1624 ),
1625 // variant 11: ClusterMetadata
1626 (
1627 Response::ClusterMetadata {
1628 controller_id: None,
1629 brokers: vec![],
1630 topics: vec![],
1631 },
1632 11,
1633 ),
1634 // variant 12: Pong
1635 (Response::Pong, 12),
1636 // variant 13: OffsetBounds
1637 (
1638 Response::OffsetBounds {
1639 earliest: 0,
1640 latest: 0,
1641 },
1642 13,
1643 ),
1644 // variant 14: Groups
1645 (Response::Groups { groups: vec![] }, 14),
1646 // variant 15: GroupDescription
1647 (
1648 Response::GroupDescription {
1649 consumer_group: "g".into(),
1650 offsets: HashMap::new(),
1651 },
1652 15,
1653 ),
1654 // variant 16: GroupDeleted
1655 (Response::GroupDeleted, 16),
1656 // variant 17: OffsetForTimestamp
1657 (Response::OffsetForTimestamp { offset: None }, 17),
1658 // variant 18: Error
1659 (
1660 Response::Error {
1661 message: "e".into(),
1662 },
1663 18,
1664 ),
1665 // variant 19: Ok
1666 (Response::Ok, 19),
1667 // variant 20: ProducerIdInitialized
1668 (
1669 Response::ProducerIdInitialized {
1670 producer_id: 1,
1671 producer_epoch: 0,
1672 },
1673 20,
1674 ),
1675 // variant 21: IdempotentPublished
1676 (
1677 Response::IdempotentPublished {
1678 offset: 0,
1679 partition: 0,
1680 duplicate: false,
1681 },
1682 21,
1683 ),
1684 // variant 22: TransactionStarted
1685 (
1686 Response::TransactionStarted {
1687 txn_id: "tx".into(),
1688 },
1689 22,
1690 ),
1691 // variant 23: PartitionsAddedToTxn
1692 (
1693 Response::PartitionsAddedToTxn {
1694 txn_id: "tx".into(),
1695 partition_count: 0,
1696 },
1697 23,
1698 ),
1699 // variant 24: TransactionalPublished
1700 (
1701 Response::TransactionalPublished {
1702 offset: 0,
1703 partition: 0,
1704 sequence: 0,
1705 },
1706 24,
1707 ),
1708 // variant 25: OffsetsAddedToTxn
1709 (
1710 Response::OffsetsAddedToTxn {
1711 txn_id: "tx".into(),
1712 },
1713 25,
1714 ),
1715 // variant 26: TransactionCommitted
1716 (
1717 Response::TransactionCommitted {
1718 txn_id: "tx".into(),
1719 },
1720 26,
1721 ),
1722 // variant 27: TransactionAborted
1723 (
1724 Response::TransactionAborted {
1725 txn_id: "tx".into(),
1726 },
1727 27,
1728 ),
1729 // variant 28: QuotasDescribed
1730 (Response::QuotasDescribed { entries: vec![] }, 28),
1731 // variant 29: QuotasAltered
1732 (Response::QuotasAltered { altered_count: 0 }, 29),
1733 // variant 30: Throttled
1734 (
1735 Response::Throttled {
1736 throttle_time_ms: 0,
1737 quota_type: "produce_bytes_rate".into(),
1738 entity: "user".into(),
1739 },
1740 30,
1741 ),
1742 // variant 31: TopicConfigAltered
1743 (
1744 Response::TopicConfigAltered {
1745 topic: "t".into(),
1746 changed_count: 0,
1747 },
1748 31,
1749 ),
1750 // variant 32: PartitionsCreated
1751 (
1752 Response::PartitionsCreated {
1753 topic: "t".into(),
1754 new_partition_count: 2,
1755 },
1756 32,
1757 ),
1758 // variant 33: RecordsDeleted
1759 (
1760 Response::RecordsDeleted {
1761 topic: "t".into(),
1762 results: vec![],
1763 },
1764 33,
1765 ),
1766 // variant 34: TopicConfigsDescribed
1767 (Response::TopicConfigsDescribed { configs: vec![] }, 34),
1768 // variant 35: HandshakeResult
1769 (
1770 Response::HandshakeResult {
1771 server_version: crate::PROTOCOL_VERSION,
1772 compatible: true,
1773 message: String::new(),
1774 },
1775 35,
1776 ),
1777 ];
1778
1779 for (response, expected_discriminant) in test_cases {
1780 let bytes = response.to_bytes().unwrap();
1781 assert_eq!(
1782 bytes[0], expected_discriminant,
1783 "Wire discriminant changed for {:?} — enum variant order may have shifted!",
1784 response
1785 );
1786 }
1787 }
1788
1789 /// Verify that to_wire rejects oversized messages.
1790 #[test]
1791 fn test_to_wire_rejects_oversized_request() {
1792 use bytes::Bytes;
1793 // Create a request with a payload larger than MAX_MESSAGE_SIZE
1794 let huge_value = vec![0u8; crate::MAX_MESSAGE_SIZE + 1];
1795 let request = Request::Publish {
1796 topic: "t".into(),
1797 partition: None,
1798 key: None,
1799 value: Bytes::from(huge_value),
1800 leader_epoch: None,
1801 };
1802 let result = request.to_wire(crate::WireFormat::Postcard, 0);
1803 assert!(
1804 matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
1805 "Expected MessageTooLarge error for oversized request"
1806 );
1807 }
1808
1809 #[test]
1810 fn test_to_wire_rejects_oversized_response() {
1811 let huge_messages = vec![MessageData {
1812 offset: 0,
1813 partition: 0,
1814 timestamp: 0,
1815 key: None,
1816 value: bytes::Bytes::from(vec![0u8; crate::MAX_MESSAGE_SIZE + 1]),
1817 headers: vec![],
1818 }];
1819 let response = Response::Messages {
1820 messages: huge_messages,
1821 };
1822 let result = response.to_wire(crate::WireFormat::Postcard, 0);
1823 assert!(
1824 matches!(result, Err(crate::ProtocolError::MessageTooLarge(_, _))),
1825 "Expected MessageTooLarge error for oversized response"
1826 );
1827 }
1828}