Skip to main content

rivven_protocol/
messages.rs

1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Quota alteration request
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QuotaAlteration {
13    /// Entity type: "user", "client-id", "consumer-group", "default"
14    pub entity_type: String,
15    /// Entity name (None for defaults)
16    pub entity_name: Option<String>,
17    /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
18    pub quota_key: String,
19    /// Quota value (None to remove quota, Some to set)
20    pub quota_value: Option<u64>,
21}
22
23/// Quota entry in describe response
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct QuotaEntry {
26    /// Entity type
27    pub entity_type: String,
28    /// Entity name (None for defaults)
29    pub entity_name: Option<String>,
30    /// Quota values
31    pub quotas: HashMap<String, u64>,
32}
33
34/// Topic configuration entry for AlterTopicConfig
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TopicConfigEntry {
37    /// Configuration key (e.g., "retention.ms", "max.message.bytes")
38    pub key: String,
39    /// Configuration value (None to reset to default)
40    pub value: Option<String>,
41}
42
43/// Topic configuration in describe response
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TopicConfigDescription {
46    /// Topic name
47    pub topic: String,
48    /// Configuration entries
49    pub configs: HashMap<String, TopicConfigValue>,
50}
51
52/// Topic configuration value with metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct TopicConfigValue {
55    /// Current value
56    pub value: String,
57    /// Whether this is the default value
58    pub is_default: bool,
59    /// Whether this config is read-only
60    pub is_read_only: bool,
61    /// Whether this config is sensitive (e.g., passwords)
62    pub is_sensitive: bool,
63}
64
65/// Delete records result for a partition
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DeleteRecordsResult {
68    /// Partition ID
69    pub partition: u32,
70    /// New low watermark (first available offset after deletion)
71    pub low_watermark: u64,
72    /// Error message if deletion failed for this partition
73    pub error: Option<String>,
74}
75
76/// Protocol request messages
77///
78/// # Stability
79///
80/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
81/// Adding new variants should only be done at the end of the enum.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub enum Request {
84    /// Authenticate with username/password (SASL/PLAIN compatible)
85    Authenticate { username: String, password: String },
86
87    /// Authenticate with SASL bytes (for Kafka client compatibility)
88    SaslAuthenticate {
89        #[serde(with = "crate::serde_utils::bytes_serde")]
90        mechanism: Bytes,
91        #[serde(with = "crate::serde_utils::bytes_serde")]
92        auth_bytes: Bytes,
93    },
94
95    /// SCRAM-SHA-256: Client-first message
96    ScramClientFirst {
97        /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
98        #[serde(with = "crate::serde_utils::bytes_serde")]
99        message: Bytes,
100    },
101
102    /// SCRAM-SHA-256: Client-final message
103    ScramClientFinal {
104        /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
105        #[serde(with = "crate::serde_utils::bytes_serde")]
106        message: Bytes,
107    },
108
109    /// Publish a message to a topic
110    Publish {
111        topic: String,
112        partition: Option<u32>,
113        #[serde(with = "crate::serde_utils::option_bytes_serde")]
114        key: Option<Bytes>,
115        #[serde(with = "crate::serde_utils::bytes_serde")]
116        value: Bytes,
117    },
118
119    /// Consume messages from a topic
120    Consume {
121        topic: String,
122        partition: u32,
123        offset: u64,
124        max_messages: usize,
125        /// Isolation level for transactional reads
126        /// None = read_uncommitted (default, backward compatible)
127        /// Some(0) = read_uncommitted
128        /// Some(1) = read_committed (filters aborted transaction messages)
129        #[serde(default)]
130        isolation_level: Option<u8>,
131    },
132
133    /// Create a new topic
134    CreateTopic {
135        name: String,
136        partitions: Option<u32>,
137    },
138
139    /// List all topics
140    ListTopics,
141
142    /// Delete a topic
143    DeleteTopic { name: String },
144
145    /// Commit consumer offset
146    CommitOffset {
147        consumer_group: String,
148        topic: String,
149        partition: u32,
150        offset: u64,
151    },
152
153    /// Get consumer offset
154    GetOffset {
155        consumer_group: String,
156        topic: String,
157        partition: u32,
158    },
159
160    /// Get topic metadata
161    GetMetadata { topic: String },
162
163    /// Get cluster metadata (all topics or specific ones)
164    GetClusterMetadata {
165        /// Topics to get metadata for (empty = all topics)
166        topics: Vec<String>,
167    },
168
169    /// Ping
170    Ping,
171
172    /// Get offset bounds for a partition
173    GetOffsetBounds { topic: String, partition: u32 },
174
175    /// List all consumer groups
176    ListGroups,
177
178    /// Describe a consumer group (get all offsets)
179    DescribeGroup { consumer_group: String },
180
181    /// Delete a consumer group
182    DeleteGroup { consumer_group: String },
183
184    /// Find offset for a timestamp
185    GetOffsetForTimestamp {
186        topic: String,
187        partition: u32,
188        /// Timestamp in milliseconds since epoch
189        timestamp_ms: i64,
190    },
191
192    // =========================================================================
193    // Idempotent Producer
194    // =========================================================================
195    /// Initialize idempotent producer (request producer ID and epoch)
196    ///
197    /// Call this before sending idempotent produce requests.
198    /// If reconnecting, provide the previous producer_id to bump epoch.
199    InitProducerId {
200        /// Previous producer ID (None for new producers)
201        producer_id: Option<u64>,
202    },
203
204    /// Publish with idempotent semantics (exactly-once delivery)
205    ///
206    /// Requires InitProducerId to have been called first.
207    IdempotentPublish {
208        topic: String,
209        partition: Option<u32>,
210        #[serde(with = "crate::serde_utils::option_bytes_serde")]
211        key: Option<Bytes>,
212        #[serde(with = "crate::serde_utils::bytes_serde")]
213        value: Bytes,
214        /// Producer ID from InitProducerId response
215        producer_id: u64,
216        /// Producer epoch from InitProducerId response
217        producer_epoch: u16,
218        /// Sequence number (starts at 0, increments per partition)
219        sequence: i32,
220    },
221
222    // =========================================================================
223    // Native Transactions
224    // =========================================================================
225    /// Begin a new transaction
226    BeginTransaction {
227        /// Transaction ID (unique per producer)
228        txn_id: String,
229        /// Producer ID from InitProducerId
230        producer_id: u64,
231        /// Producer epoch
232        producer_epoch: u16,
233        /// Transaction timeout in milliseconds (optional, defaults to 60s)
234        timeout_ms: Option<u64>,
235    },
236
237    /// Add partitions to an active transaction
238    AddPartitionsToTxn {
239        /// Transaction ID
240        txn_id: String,
241        /// Producer ID
242        producer_id: u64,
243        /// Producer epoch
244        producer_epoch: u16,
245        /// Partitions to add (topic, partition pairs)
246        partitions: Vec<(String, u32)>,
247    },
248
249    /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
250    TransactionalPublish {
251        /// Transaction ID
252        txn_id: String,
253        topic: String,
254        partition: Option<u32>,
255        #[serde(with = "crate::serde_utils::option_bytes_serde")]
256        key: Option<Bytes>,
257        #[serde(with = "crate::serde_utils::bytes_serde")]
258        value: Bytes,
259        /// Producer ID
260        producer_id: u64,
261        /// Producer epoch
262        producer_epoch: u16,
263        /// Sequence number
264        sequence: i32,
265    },
266
267    /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
268    AddOffsetsToTxn {
269        /// Transaction ID
270        txn_id: String,
271        /// Producer ID
272        producer_id: u64,
273        /// Producer epoch
274        producer_epoch: u16,
275        /// Consumer group ID
276        group_id: String,
277        /// Offsets to commit (topic, partition, offset triples)
278        offsets: Vec<(String, u32, i64)>,
279    },
280
281    /// Commit a transaction
282    CommitTransaction {
283        /// Transaction ID
284        txn_id: String,
285        /// Producer ID
286        producer_id: u64,
287        /// Producer epoch
288        producer_epoch: u16,
289    },
290
291    /// Abort a transaction
292    AbortTransaction {
293        /// Transaction ID
294        txn_id: String,
295        /// Producer ID
296        producer_id: u64,
297        /// Producer epoch
298        producer_epoch: u16,
299    },
300
301    // =========================================================================
302    // Per-Principal Quotas (Kafka Parity)
303    // =========================================================================
304    /// Describe quotas for entities
305    DescribeQuotas {
306        /// Entities to describe (empty = all)
307        /// Format: Vec<(entity_type, entity_name)>
308        /// entity_type: "user", "client-id", "consumer-group", "default"
309        /// entity_name: None for defaults, Some for specific entities
310        entities: Vec<(String, Option<String>)>,
311    },
312
313    /// Alter quotas for entities
314    AlterQuotas {
315        /// Quota alterations to apply
316        /// Each item: (entity_type, entity_name, quota_key, quota_value)
317        /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
318        /// quota_value: None to remove, Some(value) to set
319        alterations: Vec<QuotaAlteration>,
320    },
321
322    // =========================================================================
323    // Admin API (Kafka Parity)
324    // =========================================================================
325    /// Alter topic configuration
326    AlterTopicConfig {
327        /// Topic name
328        topic: String,
329        /// Configuration changes to apply
330        configs: Vec<TopicConfigEntry>,
331    },
332
333    /// Create additional partitions for an existing topic
334    CreatePartitions {
335        /// Topic name
336        topic: String,
337        /// New total partition count (must be > current count)
338        new_partition_count: u32,
339        /// Optional assignment of new partitions to brokers
340        /// If empty, broker will auto-assign
341        assignments: Vec<Vec<String>>,
342    },
343
344    /// Delete records before a given offset (log truncation)
345    DeleteRecords {
346        /// Topic name
347        topic: String,
348        /// Partition-offset pairs: delete all records before these offsets
349        partition_offsets: Vec<(u32, u64)>,
350    },
351
352    /// Describe topic configurations
353    DescribeTopicConfigs {
354        /// Topics to describe (empty = all)
355        topics: Vec<String>,
356    },
357}
358
359/// Protocol response messages
360///
361/// # Stability
362///
363/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
364/// Adding new variants should only be done at the end of the enum.
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub enum Response {
367    /// Authentication successful
368    Authenticated {
369        /// Session token for subsequent requests
370        session_id: String,
371        /// Session timeout in seconds
372        expires_in: u64,
373    },
374
375    /// SCRAM-SHA-256: Server-first message (challenge)
376    ScramServerFirst {
377        /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
378        #[serde(with = "crate::serde_utils::bytes_serde")]
379        message: Bytes,
380    },
381
382    /// SCRAM-SHA-256: Server-final message (verification or error)
383    ScramServerFinal {
384        /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
385        #[serde(with = "crate::serde_utils::bytes_serde")]
386        message: Bytes,
387        /// Session ID (if authentication succeeded)
388        session_id: Option<String>,
389        /// Session timeout in seconds (if authentication succeeded)
390        expires_in: Option<u64>,
391    },
392
393    /// Success response with offset
394    Published { offset: u64, partition: u32 },
395
396    /// Messages response
397    Messages { messages: Vec<MessageData> },
398
399    /// Topic created
400    TopicCreated { name: String, partitions: u32 },
401
402    /// List of topics
403    Topics { topics: Vec<String> },
404
405    /// Topic deleted
406    TopicDeleted,
407
408    /// Offset committed
409    OffsetCommitted,
410
411    /// Offset response
412    Offset { offset: Option<u64> },
413
414    /// Metadata
415    Metadata { name: String, partitions: u32 },
416
417    /// Full cluster metadata for topic(s)
418    ClusterMetadata {
419        /// Controller node ID (Raft leader)
420        controller_id: Option<String>,
421        /// Broker/node list
422        brokers: Vec<BrokerInfo>,
423        /// Topic metadata
424        topics: Vec<TopicMetadata>,
425    },
426
427    /// Pong
428    Pong,
429
430    /// Offset bounds for a partition
431    OffsetBounds { earliest: u64, latest: u64 },
432
433    /// List of consumer groups
434    Groups { groups: Vec<String> },
435
436    /// Consumer group details with all offsets
437    GroupDescription {
438        consumer_group: String,
439        /// topic → partition → offset
440        offsets: HashMap<String, HashMap<u32, u64>>,
441    },
442
443    /// Consumer group deleted
444    GroupDeleted,
445
446    /// Offset for a timestamp
447    OffsetForTimestamp {
448        /// The first offset with timestamp >= the requested timestamp
449        /// None if no matching offset was found
450        offset: Option<u64>,
451    },
452
453    /// Error response
454    Error { message: String },
455
456    /// Success
457    Ok,
458
459    // =========================================================================
460    // Idempotent Producer
461    // =========================================================================
462    /// Producer ID initialized
463    ProducerIdInitialized {
464        /// Assigned or existing producer ID
465        producer_id: u64,
466        /// Current epoch (increments on reconnect)
467        producer_epoch: u16,
468    },
469
470    /// Idempotent publish result
471    IdempotentPublished {
472        /// Offset where message was written
473        offset: u64,
474        /// Partition the message was written to
475        partition: u32,
476        /// Whether this was a duplicate (message already existed)
477        duplicate: bool,
478    },
479
480    // =========================================================================
481    // Native Transactions
482    // =========================================================================
483    /// Transaction started successfully
484    TransactionStarted {
485        /// Transaction ID
486        txn_id: String,
487    },
488
489    /// Partitions added to transaction
490    PartitionsAddedToTxn {
491        /// Transaction ID
492        txn_id: String,
493        /// Number of partitions now in transaction
494        partition_count: usize,
495    },
496
497    /// Transactional publish result
498    TransactionalPublished {
499        /// Offset where message was written (pending commit)
500        offset: u64,
501        /// Partition the message was written to
502        partition: u32,
503        /// Sequence number accepted
504        sequence: i32,
505    },
506
507    /// Offsets added to transaction
508    OffsetsAddedToTxn {
509        /// Transaction ID
510        txn_id: String,
511    },
512
513    /// Transaction committed
514    TransactionCommitted {
515        /// Transaction ID
516        txn_id: String,
517    },
518
519    /// Transaction aborted
520    TransactionAborted {
521        /// Transaction ID
522        txn_id: String,
523    },
524
525    // =========================================================================
526    // Per-Principal Quotas (Kafka Parity)
527    // =========================================================================
528    /// Quota descriptions
529    QuotasDescribed {
530        /// List of quota entries
531        entries: Vec<QuotaEntry>,
532    },
533
534    /// Quotas altered successfully
535    QuotasAltered {
536        /// Number of quota alterations applied
537        altered_count: usize,
538    },
539
540    /// Throttle response (returned when quota exceeded)
541    Throttled {
542        /// Time to wait before retrying (milliseconds)
543        throttle_time_ms: u64,
544        /// Quota type that was exceeded
545        quota_type: String,
546        /// Entity that exceeded quota
547        entity: String,
548    },
549
550    // =========================================================================
551    // Admin API (Kafka Parity)
552    // =========================================================================
553    /// Topic configuration altered
554    TopicConfigAltered {
555        /// Topic name
556        topic: String,
557        /// Number of configurations changed
558        changed_count: usize,
559    },
560
561    /// Partitions created
562    PartitionsCreated {
563        /// Topic name
564        topic: String,
565        /// New total partition count
566        new_partition_count: u32,
567    },
568
569    /// Records deleted
570    RecordsDeleted {
571        /// Topic name
572        topic: String,
573        /// Results per partition
574        results: Vec<DeleteRecordsResult>,
575    },
576
577    /// Topic configurations described
578    TopicConfigsDescribed {
579        /// Configuration descriptions per topic
580        configs: Vec<TopicConfigDescription>,
581    },
582}
583
584impl Request {
585    /// Serialize request to bytes (postcard format, no format prefix)
586    ///
587    /// For internal Rust-to-Rust communication where format is known.
588    /// Use `to_wire()` for wire transmission with format detection support.
589    #[inline]
590    pub fn to_bytes(&self) -> Result<Vec<u8>> {
591        Ok(postcard::to_allocvec(self)?)
592    }
593
594    /// Deserialize request from bytes (postcard format)
595    ///
596    /// For internal Rust-to-Rust communication where format is known.
597    /// Use `from_wire()` for wire transmission with format detection support.
598    #[inline]
599    pub fn from_bytes(data: &[u8]) -> Result<Self> {
600        Ok(postcard::from_bytes(data)?)
601    }
602
603    /// Serialize request with wire format prefix
604    ///
605    /// Wire format: `[format_byte][payload]`
606    /// - format_byte: 0x00 = postcard, 0x01 = protobuf
607    /// - payload: serialized message
608    ///
609    /// Note: Length prefix is NOT included (handled by transport layer)
610    #[inline]
611    pub fn to_wire(&self, format: crate::WireFormat) -> Result<Vec<u8>> {
612        match format {
613            crate::WireFormat::Postcard => {
614                let payload = postcard::to_allocvec(self)?;
615                let mut result = Vec::with_capacity(1 + payload.len());
616                result.push(format.as_byte());
617                result.extend_from_slice(&payload);
618                Ok(result)
619            }
620            crate::WireFormat::Protobuf => {
621                // Protobuf requires the `protobuf` feature
622                #[cfg(feature = "protobuf")]
623                {
624                    let payload = self.to_proto_bytes()?;
625                    let mut result = Vec::with_capacity(1 + payload.len());
626                    result.push(format.as_byte());
627                    result.extend_from_slice(&payload);
628                    Ok(result)
629                }
630                #[cfg(not(feature = "protobuf"))]
631                {
632                    Err(crate::ProtocolError::Serialization(
633                        "Protobuf support requires the 'protobuf' feature".into(),
634                    ))
635                }
636            }
637        }
638    }
639
640    /// Deserialize request with format auto-detection
641    ///
642    /// Detects format from first byte and deserializes accordingly.
643    /// Returns the deserialized request and the detected format.
644    #[inline]
645    pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat)> {
646        if data.is_empty() {
647            return Err(crate::ProtocolError::Serialization(
648                "Empty wire data".into(),
649            ));
650        }
651
652        let format_byte = data[0];
653        let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
654            crate::ProtocolError::Serialization(format!(
655                "Unknown wire format: 0x{:02x}",
656                format_byte
657            ))
658        })?;
659
660        let payload = &data[1..];
661
662        match format {
663            crate::WireFormat::Postcard => {
664                let request = postcard::from_bytes(payload)?;
665                Ok((request, format))
666            }
667            crate::WireFormat::Protobuf => {
668                #[cfg(feature = "protobuf")]
669                {
670                    let request = Self::from_proto_bytes(payload)?;
671                    Ok((request, format))
672                }
673                #[cfg(not(feature = "protobuf"))]
674                {
675                    Err(crate::ProtocolError::Serialization(
676                        "Protobuf support requires the 'protobuf' feature".into(),
677                    ))
678                }
679            }
680        }
681    }
682}
683
684impl Response {
685    /// Serialize response to bytes (postcard format, no format prefix)
686    #[inline]
687    pub fn to_bytes(&self) -> Result<Vec<u8>> {
688        Ok(postcard::to_allocvec(self)?)
689    }
690
691    /// Deserialize response from bytes (postcard format)
692    #[inline]
693    pub fn from_bytes(data: &[u8]) -> Result<Self> {
694        Ok(postcard::from_bytes(data)?)
695    }
696
697    /// Serialize response with wire format prefix
698    #[inline]
699    pub fn to_wire(&self, format: crate::WireFormat) -> Result<Vec<u8>> {
700        match format {
701            crate::WireFormat::Postcard => {
702                let payload = postcard::to_allocvec(self)?;
703                let mut result = Vec::with_capacity(1 + payload.len());
704                result.push(format.as_byte());
705                result.extend_from_slice(&payload);
706                Ok(result)
707            }
708            crate::WireFormat::Protobuf => {
709                #[cfg(feature = "protobuf")]
710                {
711                    let payload = self.to_proto_bytes()?;
712                    let mut result = Vec::with_capacity(1 + payload.len());
713                    result.push(format.as_byte());
714                    result.extend_from_slice(&payload);
715                    Ok(result)
716                }
717                #[cfg(not(feature = "protobuf"))]
718                {
719                    Err(crate::ProtocolError::Serialization(
720                        "Protobuf support requires the 'protobuf' feature".into(),
721                    ))
722                }
723            }
724        }
725    }
726
727    /// Deserialize response with format auto-detection
728    #[inline]
729    pub fn from_wire(data: &[u8]) -> Result<(Self, crate::WireFormat)> {
730        if data.is_empty() {
731            return Err(crate::ProtocolError::Serialization(
732                "Empty wire data".into(),
733            ));
734        }
735
736        let format_byte = data[0];
737        let format = crate::WireFormat::from_byte(format_byte).ok_or_else(|| {
738            crate::ProtocolError::Serialization(format!(
739                "Unknown wire format: 0x{:02x}",
740                format_byte
741            ))
742        })?;
743
744        let payload = &data[1..];
745
746        match format {
747            crate::WireFormat::Postcard => {
748                let response = postcard::from_bytes(payload)?;
749                Ok((response, format))
750            }
751            crate::WireFormat::Protobuf => {
752                #[cfg(feature = "protobuf")]
753                {
754                    let response = Self::from_proto_bytes(payload)?;
755                    Ok((response, format))
756                }
757                #[cfg(not(feature = "protobuf"))]
758                {
759                    Err(crate::ProtocolError::Serialization(
760                        "Protobuf support requires the 'protobuf' feature".into(),
761                    ))
762                }
763            }
764        }
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771
772    #[test]
773    fn test_request_roundtrip() {
774        let requests = vec![
775            Request::Ping,
776            Request::ListTopics,
777            Request::CreateTopic {
778                name: "test".to_string(),
779                partitions: Some(4),
780            },
781            Request::Authenticate {
782                username: "admin".to_string(),
783                password: "secret".to_string(),
784            },
785        ];
786
787        for req in requests {
788            let bytes = req.to_bytes().unwrap();
789            let decoded = Request::from_bytes(&bytes).unwrap();
790            // Can't directly compare due to Debug, but serialization should succeed
791            assert!(!bytes.is_empty());
792            let _ = decoded; // Use decoded
793        }
794    }
795
796    #[test]
797    fn test_response_roundtrip() {
798        let responses = vec![
799            Response::Pong,
800            Response::Ok,
801            Response::Topics {
802                topics: vec!["a".to_string(), "b".to_string()],
803            },
804            Response::Error {
805                message: "test error".to_string(),
806            },
807        ];
808
809        for resp in responses {
810            let bytes = resp.to_bytes().unwrap();
811            let decoded = Response::from_bytes(&bytes).unwrap();
812            assert!(!bytes.is_empty());
813            let _ = decoded;
814        }
815    }
816
817    #[test]
818    fn test_request_wire_roundtrip() {
819        let request = Request::Ping;
820
821        // Serialize with format prefix
822        let wire_bytes = request.to_wire(crate::WireFormat::Postcard).unwrap();
823
824        // First byte should be format identifier
825        assert_eq!(wire_bytes[0], 0x00); // Postcard format
826
827        // Deserialize with auto-detection
828        let (decoded, format) = Request::from_wire(&wire_bytes).unwrap();
829        assert_eq!(format, crate::WireFormat::Postcard);
830        assert!(matches!(decoded, Request::Ping));
831    }
832
833    #[test]
834    fn test_response_wire_roundtrip() {
835        let response = Response::Pong;
836
837        // Serialize with format prefix
838        let wire_bytes = response.to_wire(crate::WireFormat::Postcard).unwrap();
839
840        // First byte should be format identifier
841        assert_eq!(wire_bytes[0], 0x00); // Postcard format
842
843        // Deserialize with auto-detection
844        let (decoded, format) = Response::from_wire(&wire_bytes).unwrap();
845        assert_eq!(format, crate::WireFormat::Postcard);
846        assert!(matches!(decoded, Response::Pong));
847    }
848
849    #[test]
850    fn test_wire_format_empty_data() {
851        let result = Request::from_wire(&[]);
852        assert!(result.is_err());
853    }
854
855    #[test]
856    fn test_wire_format_complex_request() {
857        use bytes::Bytes;
858
859        let request = Request::Publish {
860            topic: "test-topic".to_string(),
861            partition: Some(3),
862            key: Some(Bytes::from("my-key")),
863            value: Bytes::from("hello world"),
864        };
865
866        let wire_bytes = request.to_wire(crate::WireFormat::Postcard).unwrap();
867        assert_eq!(wire_bytes[0], 0x00);
868
869        let (decoded, format) = Request::from_wire(&wire_bytes).unwrap();
870        assert_eq!(format, crate::WireFormat::Postcard);
871
872        // Verify the decoded request matches
873        if let Request::Publish {
874            topic, partition, ..
875        } = decoded
876        {
877            assert_eq!(topic, "test-topic");
878            assert_eq!(partition, Some(3));
879        } else {
880            panic!("Expected Publish request");
881        }
882    }
883
884    #[test]
885    fn test_wire_format_complex_response() {
886        let response = Response::Published {
887            offset: 12345,
888            partition: 7,
889        };
890
891        let wire_bytes = response.to_wire(crate::WireFormat::Postcard).unwrap();
892        assert_eq!(wire_bytes[0], 0x00);
893
894        let (decoded, format) = Response::from_wire(&wire_bytes).unwrap();
895        assert_eq!(format, crate::WireFormat::Postcard);
896
897        if let Response::Published { offset, partition } = decoded {
898            assert_eq!(offset, 12345);
899            assert_eq!(partition, 7);
900        } else {
901            panic!("Expected Published response");
902        }
903    }
904}