rivven_protocol/messages.rs
1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Quota alteration request
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QuotaAlteration {
13 /// Entity type: "user", "client-id", "consumer-group", "default"
14 pub entity_type: String,
15 /// Entity name (None for defaults)
16 pub entity_name: Option<String>,
17 /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
18 pub quota_key: String,
19 /// Quota value (None to remove quota, Some to set)
20 pub quota_value: Option<u64>,
21}
22
23/// Quota entry in describe response
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct QuotaEntry {
26 /// Entity type
27 pub entity_type: String,
28 /// Entity name (None for defaults)
29 pub entity_name: Option<String>,
30 /// Quota values
31 pub quotas: HashMap<String, u64>,
32}
33
34/// Topic configuration entry for AlterTopicConfig
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TopicConfigEntry {
37 /// Configuration key (e.g., "retention.ms", "max.message.bytes")
38 pub key: String,
39 /// Configuration value (None to reset to default)
40 pub value: Option<String>,
41}
42
43/// Topic configuration in describe response
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TopicConfigDescription {
46 /// Topic name
47 pub topic: String,
48 /// Configuration entries
49 pub configs: HashMap<String, TopicConfigValue>,
50}
51
52/// Topic configuration value with metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct TopicConfigValue {
55 /// Current value
56 pub value: String,
57 /// Whether this is the default value
58 pub is_default: bool,
59 /// Whether this config is read-only
60 pub is_read_only: bool,
61 /// Whether this config is sensitive (e.g., passwords)
62 pub is_sensitive: bool,
63}
64
65/// Delete records result for a partition
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DeleteRecordsResult {
68 /// Partition ID
69 pub partition: u32,
70 /// New low watermark (first available offset after deletion)
71 pub low_watermark: u64,
72 /// Error message if deletion failed for this partition
73 pub error: Option<String>,
74}
75
76/// Protocol request messages
77///
78/// # Stability
79///
80/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
81/// Adding new variants should only be done at the end of the enum.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub enum Request {
84 /// Authenticate with username/password (SASL/PLAIN compatible)
85 Authenticate { username: String, password: String },
86
87 /// Authenticate with SASL bytes (for Kafka client compatibility)
88 SaslAuthenticate {
89 #[serde(with = "crate::serde_utils::bytes_serde")]
90 mechanism: Bytes,
91 #[serde(with = "crate::serde_utils::bytes_serde")]
92 auth_bytes: Bytes,
93 },
94
95 /// SCRAM-SHA-256: Client-first message
96 ScramClientFirst {
97 /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
98 #[serde(with = "crate::serde_utils::bytes_serde")]
99 message: Bytes,
100 },
101
102 /// SCRAM-SHA-256: Client-final message
103 ScramClientFinal {
104 /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
105 #[serde(with = "crate::serde_utils::bytes_serde")]
106 message: Bytes,
107 },
108
109 /// Publish a message to a topic
110 Publish {
111 topic: String,
112 partition: Option<u32>,
113 #[serde(with = "crate::serde_utils::option_bytes_serde")]
114 key: Option<Bytes>,
115 #[serde(with = "crate::serde_utils::bytes_serde")]
116 value: Bytes,
117 },
118
119 /// Consume messages from a topic
120 Consume {
121 topic: String,
122 partition: u32,
123 offset: u64,
124 max_messages: usize,
125 /// Isolation level for transactional reads
126 /// None = read_uncommitted (default, backward compatible)
127 /// Some(0) = read_uncommitted
128 /// Some(1) = read_committed (filters aborted transaction messages)
129 #[serde(default)]
130 isolation_level: Option<u8>,
131 },
132
133 /// Create a new topic
134 CreateTopic {
135 name: String,
136 partitions: Option<u32>,
137 },
138
139 /// List all topics
140 ListTopics,
141
142 /// Delete a topic
143 DeleteTopic { name: String },
144
145 /// Commit consumer offset
146 CommitOffset {
147 consumer_group: String,
148 topic: String,
149 partition: u32,
150 offset: u64,
151 },
152
153 /// Get consumer offset
154 GetOffset {
155 consumer_group: String,
156 topic: String,
157 partition: u32,
158 },
159
160 /// Get topic metadata
161 GetMetadata { topic: String },
162
163 /// Get cluster metadata (all topics or specific ones)
164 GetClusterMetadata {
165 /// Topics to get metadata for (empty = all topics)
166 topics: Vec<String>,
167 },
168
169 /// Ping
170 Ping,
171
172 /// Get offset bounds for a partition
173 GetOffsetBounds { topic: String, partition: u32 },
174
175 /// List all consumer groups
176 ListGroups,
177
178 /// Describe a consumer group (get all offsets)
179 DescribeGroup { consumer_group: String },
180
181 /// Delete a consumer group
182 DeleteGroup { consumer_group: String },
183
184 /// Find offset for a timestamp
185 GetOffsetForTimestamp {
186 topic: String,
187 partition: u32,
188 /// Timestamp in milliseconds since epoch
189 timestamp_ms: i64,
190 },
191
192 // =========================================================================
193 // Idempotent Producer
194 // =========================================================================
195 /// Initialize idempotent producer (request producer ID and epoch)
196 ///
197 /// Call this before sending idempotent produce requests.
198 /// If reconnecting, provide the previous producer_id to bump epoch.
199 InitProducerId {
200 /// Previous producer ID (None for new producers)
201 producer_id: Option<u64>,
202 },
203
204 /// Publish with idempotent semantics (exactly-once delivery)
205 ///
206 /// Requires InitProducerId to have been called first.
207 IdempotentPublish {
208 topic: String,
209 partition: Option<u32>,
210 #[serde(with = "crate::serde_utils::option_bytes_serde")]
211 key: Option<Bytes>,
212 #[serde(with = "crate::serde_utils::bytes_serde")]
213 value: Bytes,
214 /// Producer ID from InitProducerId response
215 producer_id: u64,
216 /// Producer epoch from InitProducerId response
217 producer_epoch: u16,
218 /// Sequence number (starts at 0, increments per partition)
219 sequence: i32,
220 },
221
222 // =========================================================================
223 // Native Transactions
224 // =========================================================================
225 /// Begin a new transaction
226 BeginTransaction {
227 /// Transaction ID (unique per producer)
228 txn_id: String,
229 /// Producer ID from InitProducerId
230 producer_id: u64,
231 /// Producer epoch
232 producer_epoch: u16,
233 /// Transaction timeout in milliseconds (optional, defaults to 60s)
234 timeout_ms: Option<u64>,
235 },
236
237 /// Add partitions to an active transaction
238 AddPartitionsToTxn {
239 /// Transaction ID
240 txn_id: String,
241 /// Producer ID
242 producer_id: u64,
243 /// Producer epoch
244 producer_epoch: u16,
245 /// Partitions to add (topic, partition pairs)
246 partitions: Vec<(String, u32)>,
247 },
248
249 /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
250 TransactionalPublish {
251 /// Transaction ID
252 txn_id: String,
253 topic: String,
254 partition: Option<u32>,
255 #[serde(with = "crate::serde_utils::option_bytes_serde")]
256 key: Option<Bytes>,
257 #[serde(with = "crate::serde_utils::bytes_serde")]
258 value: Bytes,
259 /// Producer ID
260 producer_id: u64,
261 /// Producer epoch
262 producer_epoch: u16,
263 /// Sequence number
264 sequence: i32,
265 },
266
267 /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
268 AddOffsetsToTxn {
269 /// Transaction ID
270 txn_id: String,
271 /// Producer ID
272 producer_id: u64,
273 /// Producer epoch
274 producer_epoch: u16,
275 /// Consumer group ID
276 group_id: String,
277 /// Offsets to commit (topic, partition, offset triples)
278 offsets: Vec<(String, u32, i64)>,
279 },
280
281 /// Commit a transaction
282 CommitTransaction {
283 /// Transaction ID
284 txn_id: String,
285 /// Producer ID
286 producer_id: u64,
287 /// Producer epoch
288 producer_epoch: u16,
289 },
290
291 /// Abort a transaction
292 AbortTransaction {
293 /// Transaction ID
294 txn_id: String,
295 /// Producer ID
296 producer_id: u64,
297 /// Producer epoch
298 producer_epoch: u16,
299 },
300
301 // =========================================================================
302 // Per-Principal Quotas (Kafka Parity)
303 // =========================================================================
304 /// Describe quotas for entities
305 DescribeQuotas {
306 /// Entities to describe (empty = all)
307 /// Format: Vec<(entity_type, entity_name)>
308 /// entity_type: "user", "client-id", "consumer-group", "default"
309 /// entity_name: None for defaults, Some for specific entities
310 entities: Vec<(String, Option<String>)>,
311 },
312
313 /// Alter quotas for entities
314 AlterQuotas {
315 /// Quota alterations to apply
316 /// Each item: (entity_type, entity_name, quota_key, quota_value)
317 /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
318 /// quota_value: None to remove, Some(value) to set
319 alterations: Vec<QuotaAlteration>,
320 },
321
322 // =========================================================================
323 // Admin API (Kafka Parity)
324 // =========================================================================
325 /// Alter topic configuration
326 AlterTopicConfig {
327 /// Topic name
328 topic: String,
329 /// Configuration changes to apply
330 configs: Vec<TopicConfigEntry>,
331 },
332
333 /// Create additional partitions for an existing topic
334 CreatePartitions {
335 /// Topic name
336 topic: String,
337 /// New total partition count (must be > current count)
338 new_partition_count: u32,
339 /// Optional assignment of new partitions to brokers
340 /// If empty, broker will auto-assign
341 assignments: Vec<Vec<String>>,
342 },
343
344 /// Delete records before a given offset (log truncation)
345 DeleteRecords {
346 /// Topic name
347 topic: String,
348 /// Partition-offset pairs: delete all records before these offsets
349 partition_offsets: Vec<(u32, u64)>,
350 },
351
352 /// Describe topic configurations
353 DescribeTopicConfigs {
354 /// Topics to describe (empty = all)
355 topics: Vec<String>,
356 },
357}
358
359/// Protocol response messages
360///
361/// # Stability
362///
363/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
364/// Adding new variants should only be done at the end of the enum.
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub enum Response {
367 /// Authentication successful
368 Authenticated {
369 /// Session token for subsequent requests
370 session_id: String,
371 /// Session timeout in seconds
372 expires_in: u64,
373 },
374
375 /// SCRAM-SHA-256: Server-first message (challenge)
376 ScramServerFirst {
377 /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
378 #[serde(with = "crate::serde_utils::bytes_serde")]
379 message: Bytes,
380 },
381
382 /// SCRAM-SHA-256: Server-final message (verification or error)
383 ScramServerFinal {
384 /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
385 #[serde(with = "crate::serde_utils::bytes_serde")]
386 message: Bytes,
387 /// Session ID (if authentication succeeded)
388 session_id: Option<String>,
389 /// Session timeout in seconds (if authentication succeeded)
390 expires_in: Option<u64>,
391 },
392
393 /// Success response with offset
394 Published { offset: u64, partition: u32 },
395
396 /// Messages response
397 Messages { messages: Vec<MessageData> },
398
399 /// Topic created
400 TopicCreated { name: String, partitions: u32 },
401
402 /// List of topics
403 Topics { topics: Vec<String> },
404
405 /// Topic deleted
406 TopicDeleted,
407
408 /// Offset committed
409 OffsetCommitted,
410
411 /// Offset response
412 Offset { offset: Option<u64> },
413
414 /// Metadata
415 Metadata { name: String, partitions: u32 },
416
417 /// Full cluster metadata for topic(s)
418 ClusterMetadata {
419 /// Controller node ID (Raft leader)
420 controller_id: Option<String>,
421 /// Broker/node list
422 brokers: Vec<BrokerInfo>,
423 /// Topic metadata
424 topics: Vec<TopicMetadata>,
425 },
426
427 /// Pong
428 Pong,
429
430 /// Offset bounds for a partition
431 OffsetBounds { earliest: u64, latest: u64 },
432
433 /// List of consumer groups
434 Groups { groups: Vec<String> },
435
436 /// Consumer group details with all offsets
437 GroupDescription {
438 consumer_group: String,
439 /// topic → partition → offset
440 offsets: HashMap<String, HashMap<u32, u64>>,
441 },
442
443 /// Consumer group deleted
444 GroupDeleted,
445
446 /// Offset for a timestamp
447 OffsetForTimestamp {
448 /// The first offset with timestamp >= the requested timestamp
449 /// None if no matching offset was found
450 offset: Option<u64>,
451 },
452
453 /// Error response
454 Error { message: String },
455
456 /// Success
457 Ok,
458
459 // =========================================================================
460 // Idempotent Producer
461 // =========================================================================
462 /// Producer ID initialized
463 ProducerIdInitialized {
464 /// Assigned or existing producer ID
465 producer_id: u64,
466 /// Current epoch (increments on reconnect)
467 producer_epoch: u16,
468 },
469
470 /// Idempotent publish result
471 IdempotentPublished {
472 /// Offset where message was written
473 offset: u64,
474 /// Partition the message was written to
475 partition: u32,
476 /// Whether this was a duplicate (message already existed)
477 duplicate: bool,
478 },
479
480 // =========================================================================
481 // Native Transactions
482 // =========================================================================
483 /// Transaction started successfully
484 TransactionStarted {
485 /// Transaction ID
486 txn_id: String,
487 },
488
489 /// Partitions added to transaction
490 PartitionsAddedToTxn {
491 /// Transaction ID
492 txn_id: String,
493 /// Number of partitions now in transaction
494 partition_count: usize,
495 },
496
497 /// Transactional publish result
498 TransactionalPublished {
499 /// Offset where message was written (pending commit)
500 offset: u64,
501 /// Partition the message was written to
502 partition: u32,
503 /// Sequence number accepted
504 sequence: i32,
505 },
506
507 /// Offsets added to transaction
508 OffsetsAddedToTxn {
509 /// Transaction ID
510 txn_id: String,
511 },
512
513 /// Transaction committed
514 TransactionCommitted {
515 /// Transaction ID
516 txn_id: String,
517 },
518
519 /// Transaction aborted
520 TransactionAborted {
521 /// Transaction ID
522 txn_id: String,
523 },
524
525 // =========================================================================
526 // Per-Principal Quotas (Kafka Parity)
527 // =========================================================================
528 /// Quota descriptions
529 QuotasDescribed {
530 /// List of quota entries
531 entries: Vec<QuotaEntry>,
532 },
533
534 /// Quotas altered successfully
535 QuotasAltered {
536 /// Number of quota alterations applied
537 altered_count: usize,
538 },
539
540 /// Throttle response (returned when quota exceeded)
541 Throttled {
542 /// Time to wait before retrying (milliseconds)
543 throttle_time_ms: u64,
544 /// Quota type that was exceeded
545 quota_type: String,
546 /// Entity that exceeded quota
547 entity: String,
548 },
549
550 // =========================================================================
551 // Admin API (Kafka Parity)
552 // =========================================================================
553 /// Topic configuration altered
554 TopicConfigAltered {
555 /// Topic name
556 topic: String,
557 /// Number of configurations changed
558 changed_count: usize,
559 },
560
561 /// Partitions created
562 PartitionsCreated {
563 /// Topic name
564 topic: String,
565 /// New total partition count
566 new_partition_count: u32,
567 },
568
569 /// Records deleted
570 RecordsDeleted {
571 /// Topic name
572 topic: String,
573 /// Results per partition
574 results: Vec<DeleteRecordsResult>,
575 },
576
577 /// Topic configurations described
578 TopicConfigsDescribed {
579 /// Configuration descriptions per topic
580 configs: Vec<TopicConfigDescription>,
581 },
582}
583
584impl Request {
585 /// Serialize request to bytes (postcard format, no format prefix)
586 ///
587 /// For internal Rust-to-Rust communication where format is known.
588 /// Use `to_wire()` for wire transmission with format detection support.
589 #[inline]
590 pub fn to_bytes(&self) -> Result<Vec<u8>> {
591 Ok(postcard::to_allocvec(self)?)
592 }
593
594 /// Deserialize request from bytes (postcard format)
595 ///
596 /// For internal Rust-to-Rust communication where format is known.
597 /// Use `from_wire()` for wire transmission with format detection support.
598 #[inline]
599 pub fn from_bytes(data: &[u8]) -> Result<Self> {
600 Ok(postcard::from_bytes(data)?)
601 }
602
603 /// Serialize request with wire format prefix
604 ///
605 /// Wire format: `[format_byte][payload]`
606 /// - format_byte: 0x00 = postcard, 0x01 = protobuf
607 /// - payload: serialized message
608 ///
609 /// Note: Length prefix is NOT included (handled by transport layer)
610 #[inline]
611 pub fn to_wire(&self, format: crate::WireFormat) -> Result<Vec<u8>> {
612 match format {
613 crate::WireFormat::Postcard => {
614 let payload = postcard::to_allocvec(self)?;
615 let mut result = Vec::with_capacity(1 + payload.len());
616 result.push(format.as_byte());
617 result.extend_from_slice(&payload);
618 Ok(result)
619 }
620 crate::WireFormat::Protobuf => {
621 // Protobuf requires the `protobuf` feature
622 #[cfg(feature = "protobuf")]
623 {
624 let payload = self.to_proto_bytes()?;
625 let mut result = Vec::with_capacity(1 + payload.len());
626 result.push(format.as_byte());
627 result.extend_from_slice(&payload);
628 Ok(result)
629 }
630 #[cfg(not(feature = "protobuf"))]
631 {
632 Err(crate::ProtocolError::Serialization(
633 "Protobuf support requires the 'protobuf' feature".into(),
634 ))
635 }
636 }
637 }
638 }
639
640 /// Deserialize request with format auto-detection
641 ///
642 /// Detects format from first byte and deserializes accordingly.
643 /// Returns the deserialized request and the detected format.
644 #[inline]
645 pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat)> {
646 if data.is_empty() {
647 return Err(crate::ProtocolError::Serialization(
648 "Empty wire data".into(),
649 ));
650 }
651
652 let format_byte = data[0];
653 let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
654 crate::ProtocolError::Serialization(format!(
655 "Unknown wire format: 0x{:02x}",
656 format_byte
657 ))
658 })?;
659
660 let payload = &data[1..];
661
662 match format {
663 crate::WireFormat::Postcard => {
664 let request = postcard::from_bytes(payload)?;
665 Ok((request, format))
666 }
667 crate::WireFormat::Protobuf => {
668 #[cfg(feature = "protobuf")]
669 {
670 let request = Self::from_proto_bytes(payload)?;
671 Ok((request, format))
672 }
673 #[cfg(not(feature = "protobuf"))]
674 {
675 Err(crate::ProtocolError::Serialization(
676 "Protobuf support requires the 'protobuf' feature".into(),
677 ))
678 }
679 }
680 }
681 }
682}
683
684impl Response {
685 /// Serialize response to bytes (postcard format, no format prefix)
686 #[inline]
687 pub fn to_bytes(&self) -> Result<Vec<u8>> {
688 Ok(postcard::to_allocvec(self)?)
689 }
690
691 /// Deserialize response from bytes (postcard format)
692 #[inline]
693 pub fn from_bytes(data: &[u8]) -> Result<Self> {
694 Ok(postcard::from_bytes(data)?)
695 }
696
697 /// Serialize response with wire format prefix
698 #[inline]
699 pub fn to_wire(&self, format: crate::WireFormat) -> Result<Vec<u8>> {
700 match format {
701 crate::WireFormat::Postcard => {
702 let payload = postcard::to_allocvec(self)?;
703 let mut result = Vec::with_capacity(1 + payload.len());
704 result.push(format.as_byte());
705 result.extend_from_slice(&payload);
706 Ok(result)
707 }
708 crate::WireFormat::Protobuf => {
709 #[cfg(feature = "protobuf")]
710 {
711 let payload = self.to_proto_bytes()?;
712 let mut result = Vec::with_capacity(1 + payload.len());
713 result.push(format.as_byte());
714 result.extend_from_slice(&payload);
715 Ok(result)
716 }
717 #[cfg(not(feature = "protobuf"))]
718 {
719 Err(crate::ProtocolError::Serialization(
720 "Protobuf support requires the 'protobuf' feature".into(),
721 ))
722 }
723 }
724 }
725 }
726
727 /// Deserialize response with format auto-detection
728 #[inline]
729 pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat)> {
730 if data.is_empty() {
731 return Err(crate::ProtocolError::Serialization(
732 "Empty wire data".into(),
733 ));
734 }
735
736 let format_byte = data[0];
737 let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
738 crate::ProtocolError::Serialization(format!(
739 "Unknown wire format: 0x{:02x}",
740 format_byte
741 ))
742 })?;
743
744 let payload = &data[1..];
745
746 match format {
747 crate::WireFormat::Postcard => {
748 let response = postcard::from_bytes(payload)?;
749 Ok((response, format))
750 }
751 crate::WireFormat::Protobuf => {
752 #[cfg(feature = "protobuf")]
753 {
754 let response = Self::from_proto_bytes(payload)?;
755 Ok((response, format))
756 }
757 #[cfg(not(feature = "protobuf"))]
758 {
759 Err(crate::ProtocolError::Serialization(
760 "Protobuf support requires the 'protobuf' feature".into(),
761 ))
762 }
763 }
764 }
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771
772 #[test]
773 fn test_request_roundtrip() {
774 let requests = vec![
775 Request::Ping,
776 Request::ListTopics,
777 Request::CreateTopic {
778 name: "test".to_string(),
779 partitions: Some(4),
780 },
781 Request::Authenticate {
782 username: "admin".to_string(),
783 password: "secret".to_string(),
784 },
785 ];
786
787 for req in requests {
788 let bytes = req.to_bytes().unwrap();
789 let decoded = Request::from_bytes(&bytes).unwrap();
790 // Can't directly compare due to Debug, but serialization should succeed
791 assert!(!bytes.is_empty());
792 let _ = decoded; // Use decoded
793 }
794 }
795
796 #[test]
797 fn test_response_roundtrip() {
798 let responses = vec![
799 Response::Pong,
800 Response::Ok,
801 Response::Topics {
802 topics: vec!["a".to_string(), "b".to_string()],
803 },
804 Response::Error {
805 message: "test error".to_string(),
806 },
807 ];
808
809 for resp in responses {
810 let bytes = resp.to_bytes().unwrap();
811 let decoded = Response::from_bytes(&bytes).unwrap();
812 assert!(!bytes.is_empty());
813 let _ = decoded;
814 }
815 }
816
817 #[test]
818 fn test_request_wire_roundtrip() {
819 let request = Request::Ping;
820
821 // Serialize with format prefix
822 let wire_bytes = request.to_wire(crate::WireFormat::Postcard).unwrap();
823
824 // First byte should be format identifier
825 assert_eq!(wire_bytes[0], 0x00); // Postcard format
826
827 // Deserialize with auto-detection
828 let (decoded, format) = Request::from_wire(&wire_bytes).unwrap();
829 assert_eq!(format, crate::WireFormat::Postcard);
830 assert!(matches!(decoded, Request::Ping));
831 }
832
833 #[test]
834 fn test_response_wire_roundtrip() {
835 let response = Response::Pong;
836
837 // Serialize with format prefix
838 let wire_bytes = response.to_wire(crate::WireFormat::Postcard).unwrap();
839
840 // First byte should be format identifier
841 assert_eq!(wire_bytes[0], 0x00); // Postcard format
842
843 // Deserialize with auto-detection
844 let (decoded, format) = Response::from_wire(&wire_bytes).unwrap();
845 assert_eq!(format, crate::WireFormat::Postcard);
846 assert!(matches!(decoded, Response::Pong));
847 }
848
849 #[test]
850 fn test_wire_format_empty_data() {
851 let result = Request::from_wire(&[]);
852 assert!(result.is_err());
853 }
854
855 #[test]
856 fn test_wire_format_complex_request() {
857 use bytes::Bytes;
858
859 let request = Request::Publish {
860 topic: "test-topic".to_string(),
861 partition: Some(3),
862 key: Some(Bytes::from("my-key")),
863 value: Bytes::from("hello world"),
864 };
865
866 let wire_bytes = request.to_wire(crate::WireFormat::Postcard).unwrap();
867 assert_eq!(wire_bytes[0], 0x00);
868
869 let (decoded, format) = Request::from_wire(&wire_bytes).unwrap();
870 assert_eq!(format, crate::WireFormat::Postcard);
871
872 // Verify the decoded request matches
873 if let Request::Publish {
874 topic, partition, ..
875 } = decoded
876 {
877 assert_eq!(topic, "test-topic");
878 assert_eq!(partition, Some(3));
879 } else {
880 panic!("Expected Publish request");
881 }
882 }
883
884 #[test]
885 fn test_wire_format_complex_response() {
886 let response = Response::Published {
887 offset: 12345,
888 partition: 7,
889 };
890
891 let wire_bytes = response.to_wire(crate::WireFormat::Postcard).unwrap();
892 assert_eq!(wire_bytes[0], 0x00);
893
894 let (decoded, format) = Response::from_wire(&wire_bytes).unwrap();
895 assert_eq!(format, crate::WireFormat::Postcard);
896
897 if let Response::Published { offset, partition } = decoded {
898 assert_eq!(offset, 12345);
899 assert_eq!(partition, 7);
900 } else {
901 panic!("Expected Published response");
902 }
903 }
904
905 /// Snapshot test: postcard serializes enum variants by ordinal position.
906 /// If someone reorders variants in Request or Response, this test fails
907 /// because the byte prefix (discriminant) will change, breaking wire compat.
908 #[test]
909 fn test_postcard_wire_stability_request_discriminants() {
910 use bytes::Bytes;
911
912 // Serialize a representative of each variant and check the leading
913 // discriminant byte(s) haven't shifted. Postcard uses varint encoding
914 // for enum discriminants.
915 let test_cases: Vec<(Request, u8)> = vec![
916 // variant 0: Authenticate
917 (
918 Request::Authenticate {
919 username: "u".into(),
920 password: "p".into(),
921 },
922 0,
923 ),
924 // variant 4: Publish
925 (
926 Request::Publish {
927 topic: "t".into(),
928 partition: None,
929 key: None,
930 value: Bytes::from("v"),
931 },
932 4,
933 ),
934 // variant 6: CreateTopic
935 (
936 Request::CreateTopic {
937 name: "t".into(),
938 partitions: None,
939 },
940 6,
941 ),
942 // variant 7: ListTopics
943 (Request::ListTopics, 7),
944 // variant 8: DeleteTopic
945 (Request::DeleteTopic { name: "t".into() }, 8),
946 // variant 11: GetMetadata
947 (Request::GetMetadata { topic: "t".into() }, 11),
948 // variant 13: Ping
949 (Request::Ping, 13),
950 ];
951
952 for (request, expected_discriminant) in test_cases {
953 let bytes = request.to_bytes().unwrap();
954 assert_eq!(
955 bytes[0], expected_discriminant,
956 "Wire discriminant changed for {:?} — enum variant order may have shifted!",
957 request
958 );
959 }
960 }
961
962 #[test]
963 fn test_postcard_wire_stability_response_discriminants() {
964 let test_cases: Vec<(Response, u8)> = vec![
965 // variant 0: Authenticated
966 (
967 Response::Authenticated {
968 session_id: String::new(),
969 expires_in: 0,
970 },
971 0,
972 ),
973 // variant 3: Published
974 (
975 Response::Published {
976 offset: 0,
977 partition: 0,
978 },
979 3,
980 ),
981 // variant 5: TopicCreated
982 (
983 Response::TopicCreated {
984 name: "t".into(),
985 partitions: 1,
986 },
987 5,
988 ),
989 // variant 6: Topics
990 (Response::Topics { topics: vec![] }, 6),
991 // variant 7: TopicDeleted
992 (Response::TopicDeleted, 7),
993 // variant 12: Pong
994 (Response::Pong, 12),
995 // variant 14: Groups
996 (Response::Groups { groups: vec![] }, 14),
997 // variant 18: Error
998 (
999 Response::Error {
1000 message: "e".into(),
1001 },
1002 18,
1003 ),
1004 // variant 19: Ok
1005 (Response::Ok, 19),
1006 ];
1007
1008 for (response, expected_discriminant) in test_cases {
1009 let bytes = response.to_bytes().unwrap();
1010 assert_eq!(
1011 bytes[0], expected_discriminant,
1012 "Wire discriminant changed for {:?} — enum variant order may have shifted!",
1013 response
1014 );
1015 }
1016 }
1017}