Skip to main content

rivven_protocol/
messages.rs

1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Quota alteration request
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QuotaAlteration {
13    /// Entity type: "user", "client-id", "consumer-group", "default"
14    pub entity_type: String,
15    /// Entity name (None for defaults)
16    pub entity_name: Option<String>,
17    /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
18    pub quota_key: String,
19    /// Quota value (None to remove quota, Some to set)
20    pub quota_value: Option<u64>,
21}
22
23/// Quota entry in describe response
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct QuotaEntry {
26    /// Entity type
27    pub entity_type: String,
28    /// Entity name (None for defaults)
29    pub entity_name: Option<String>,
30    /// Quota values
31    pub quotas: HashMap<String, u64>,
32}
33
34/// Topic configuration entry for AlterTopicConfig
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TopicConfigEntry {
37    /// Configuration key (e.g., "retention.ms", "max.message.bytes")
38    pub key: String,
39    /// Configuration value (None to reset to default)
40    pub value: Option<String>,
41}
42
43/// Topic configuration in describe response
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TopicConfigDescription {
46    /// Topic name
47    pub topic: String,
48    /// Configuration entries
49    pub configs: HashMap<String, TopicConfigValue>,
50}
51
52/// Topic configuration value with metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct TopicConfigValue {
55    /// Current value
56    pub value: String,
57    /// Whether this is the default value
58    pub is_default: bool,
59    /// Whether this config is read-only
60    pub is_read_only: bool,
61    /// Whether this config is sensitive (e.g., passwords)
62    pub is_sensitive: bool,
63}
64
65/// Delete records result for a partition
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DeleteRecordsResult {
68    /// Partition ID
69    pub partition: u32,
70    /// New low watermark (first available offset after deletion)
71    pub low_watermark: u64,
72    /// Error message if deletion failed for this partition
73    pub error: Option<String>,
74}
75
76/// Protocol request messages
77///
78/// # Stability
79///
80/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
81/// Adding new variants should only be done at the end of the enum.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub enum Request {
84    /// Authenticate with username/password (SASL/PLAIN compatible)
85    Authenticate { username: String, password: String },
86
87    /// Authenticate with SASL bytes (for Kafka client compatibility)
88    SaslAuthenticate {
89        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
90        mechanism: Bytes,
91        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
92        auth_bytes: Bytes,
93    },
94
95    /// SCRAM-SHA-256: Client-first message
96    ScramClientFirst {
97        /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
98        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
99        message: Bytes,
100    },
101
102    /// SCRAM-SHA-256: Client-final message
103    ScramClientFinal {
104        /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
105        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
106        message: Bytes,
107    },
108
109    /// Publish a message to a topic
110    Publish {
111        topic: String,
112        partition: Option<u32>,
113        #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
114        key: Option<Bytes>,
115        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
116        value: Bytes,
117    },
118
119    /// Consume messages from a topic
120    Consume {
121        topic: String,
122        partition: u32,
123        offset: u64,
124        max_messages: usize,
125    },
126
127    /// Create a new topic
128    CreateTopic {
129        name: String,
130        partitions: Option<u32>,
131    },
132
133    /// List all topics
134    ListTopics,
135
136    /// Delete a topic
137    DeleteTopic { name: String },
138
139    /// Commit consumer offset
140    CommitOffset {
141        consumer_group: String,
142        topic: String,
143        partition: u32,
144        offset: u64,
145    },
146
147    /// Get consumer offset
148    GetOffset {
149        consumer_group: String,
150        topic: String,
151        partition: u32,
152    },
153
154    /// Get topic metadata
155    GetMetadata { topic: String },
156
157    /// Get cluster metadata (all topics or specific ones)
158    GetClusterMetadata {
159        /// Topics to get metadata for (empty = all topics)
160        topics: Vec<String>,
161    },
162
163    /// Ping
164    Ping,
165
166    /// Register a schema
167    RegisterSchema { subject: String, schema: String },
168
169    /// Get a schema
170    GetSchema { id: i32 },
171
172    /// Get offset bounds for a partition
173    GetOffsetBounds { topic: String, partition: u32 },
174
175    /// List all consumer groups
176    ListGroups,
177
178    /// Describe a consumer group (get all offsets)
179    DescribeGroup { consumer_group: String },
180
181    /// Delete a consumer group
182    DeleteGroup { consumer_group: String },
183
184    /// Find offset for a timestamp
185    GetOffsetForTimestamp {
186        topic: String,
187        partition: u32,
188        /// Timestamp in milliseconds since epoch
189        timestamp_ms: i64,
190    },
191
192    // =========================================================================
193    // Idempotent Producer (KIP-98)
194    // =========================================================================
195    /// Initialize idempotent producer (request producer ID and epoch)
196    ///
197    /// Call this before sending idempotent produce requests.
198    /// If reconnecting, provide the previous producer_id to bump epoch.
199    InitProducerId {
200        /// Previous producer ID (None for new producers)
201        producer_id: Option<u64>,
202    },
203
204    /// Publish with idempotent semantics (exactly-once delivery)
205    ///
206    /// Requires InitProducerId to have been called first.
207    IdempotentPublish {
208        topic: String,
209        partition: Option<u32>,
210        #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
211        key: Option<Bytes>,
212        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
213        value: Bytes,
214        /// Producer ID from InitProducerId response
215        producer_id: u64,
216        /// Producer epoch from InitProducerId response
217        producer_epoch: u16,
218        /// Sequence number (starts at 0, increments per partition)
219        sequence: i32,
220    },
221
222    // =========================================================================
223    // Native Transactions (KIP-98 Transactions)
224    // =========================================================================
225    /// Begin a new transaction
226    BeginTransaction {
227        /// Transaction ID (unique per producer)
228        txn_id: String,
229        /// Producer ID from InitProducerId
230        producer_id: u64,
231        /// Producer epoch
232        producer_epoch: u16,
233        /// Transaction timeout in milliseconds (optional, defaults to 60s)
234        timeout_ms: Option<u64>,
235    },
236
237    /// Add partitions to an active transaction
238    AddPartitionsToTxn {
239        /// Transaction ID
240        txn_id: String,
241        /// Producer ID
242        producer_id: u64,
243        /// Producer epoch
244        producer_epoch: u16,
245        /// Partitions to add (topic, partition pairs)
246        partitions: Vec<(String, u32)>,
247    },
248
249    /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
250    TransactionalPublish {
251        /// Transaction ID
252        txn_id: String,
253        topic: String,
254        partition: Option<u32>,
255        #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
256        key: Option<Bytes>,
257        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
258        value: Bytes,
259        /// Producer ID
260        producer_id: u64,
261        /// Producer epoch
262        producer_epoch: u16,
263        /// Sequence number
264        sequence: i32,
265    },
266
267    /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
268    AddOffsetsToTxn {
269        /// Transaction ID
270        txn_id: String,
271        /// Producer ID
272        producer_id: u64,
273        /// Producer epoch
274        producer_epoch: u16,
275        /// Consumer group ID
276        group_id: String,
277        /// Offsets to commit (topic, partition, offset triples)
278        offsets: Vec<(String, u32, i64)>,
279    },
280
281    /// Commit a transaction
282    CommitTransaction {
283        /// Transaction ID
284        txn_id: String,
285        /// Producer ID
286        producer_id: u64,
287        /// Producer epoch
288        producer_epoch: u16,
289    },
290
291    /// Abort a transaction
292    AbortTransaction {
293        /// Transaction ID
294        txn_id: String,
295        /// Producer ID
296        producer_id: u64,
297        /// Producer epoch
298        producer_epoch: u16,
299    },
300
301    // =========================================================================
302    // Per-Principal Quotas (Kafka Parity)
303    // =========================================================================
304    /// Describe quotas for entities
305    DescribeQuotas {
306        /// Entities to describe (empty = all)
307        /// Format: Vec<(entity_type, entity_name)>
308        /// entity_type: "user", "client-id", "consumer-group", "default"
309        /// entity_name: None for defaults, Some for specific entities
310        entities: Vec<(String, Option<String>)>,
311    },
312
313    /// Alter quotas for entities
314    AlterQuotas {
315        /// Quota alterations to apply
316        /// Each item: (entity_type, entity_name, quota_key, quota_value)
317        /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
318        /// quota_value: None to remove, Some(value) to set
319        alterations: Vec<QuotaAlteration>,
320    },
321
322    // =========================================================================
323    // Admin API (Kafka Parity)
324    // =========================================================================
325    /// Alter topic configuration
326    AlterTopicConfig {
327        /// Topic name
328        topic: String,
329        /// Configuration changes to apply
330        configs: Vec<TopicConfigEntry>,
331    },
332
333    /// Create additional partitions for an existing topic
334    CreatePartitions {
335        /// Topic name
336        topic: String,
337        /// New total partition count (must be > current count)
338        new_partition_count: u32,
339        /// Optional assignment of new partitions to brokers
340        /// If empty, broker will auto-assign
341        assignments: Vec<Vec<String>>,
342    },
343
344    /// Delete records before a given offset (log truncation)
345    DeleteRecords {
346        /// Topic name
347        topic: String,
348        /// Partition-offset pairs: delete all records before these offsets
349        partition_offsets: Vec<(u32, u64)>,
350    },
351
352    /// Describe topic configurations
353    DescribeTopicConfigs {
354        /// Topics to describe (empty = all)
355        topics: Vec<String>,
356    },
357}
358
359/// Protocol response messages
360///
361/// # Stability
362///
363/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
364/// Adding new variants should only be done at the end of the enum.
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub enum Response {
367    /// Authentication successful
368    Authenticated {
369        /// Session token for subsequent requests
370        session_id: String,
371        /// Session timeout in seconds
372        expires_in: u64,
373    },
374
375    /// SCRAM-SHA-256: Server-first message (challenge)
376    ScramServerFirst {
377        /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
378        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
379        message: Bytes,
380    },
381
382    /// SCRAM-SHA-256: Server-final message (verification or error)
383    ScramServerFinal {
384        /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
385        #[serde(with = "rivven_core::serde_utils::bytes_serde")]
386        message: Bytes,
387        /// Session ID (if authentication succeeded)
388        session_id: Option<String>,
389        /// Session timeout in seconds (if authentication succeeded)
390        expires_in: Option<u64>,
391    },
392
393    /// Success response with offset
394    Published { offset: u64, partition: u32 },
395
396    /// Messages response
397    Messages { messages: Vec<MessageData> },
398
399    /// Topic created
400    TopicCreated { name: String, partitions: u32 },
401
402    /// List of topics
403    Topics { topics: Vec<String> },
404
405    /// Topic deleted
406    TopicDeleted,
407
408    /// Offset committed
409    OffsetCommitted,
410
411    /// Offset response
412    Offset { offset: Option<u64> },
413
414    /// Metadata
415    Metadata { name: String, partitions: u32 },
416
417    /// Full cluster metadata for topic(s)
418    ClusterMetadata {
419        /// Controller node ID (Raft leader)
420        controller_id: Option<String>,
421        /// Broker/node list
422        brokers: Vec<BrokerInfo>,
423        /// Topic metadata
424        topics: Vec<TopicMetadata>,
425    },
426
427    /// Schema registration result
428    SchemaRegistered { id: i32 },
429
430    /// Schema details
431    Schema { id: i32, schema: String },
432
433    /// Pong
434    Pong,
435
436    /// Offset bounds for a partition
437    OffsetBounds { earliest: u64, latest: u64 },
438
439    /// List of consumer groups
440    Groups { groups: Vec<String> },
441
442    /// Consumer group details with all offsets
443    GroupDescription {
444        consumer_group: String,
445        /// topic → partition → offset
446        offsets: HashMap<String, HashMap<u32, u64>>,
447    },
448
449    /// Consumer group deleted
450    GroupDeleted,
451
452    /// Offset for a timestamp
453    OffsetForTimestamp {
454        /// The first offset with timestamp >= the requested timestamp
455        /// None if no matching offset was found
456        offset: Option<u64>,
457    },
458
459    /// Error response
460    Error { message: String },
461
462    /// Success
463    Ok,
464
465    // =========================================================================
466    // Idempotent Producer (KIP-98)
467    // =========================================================================
468    /// Producer ID initialized
469    ProducerIdInitialized {
470        /// Assigned or existing producer ID
471        producer_id: u64,
472        /// Current epoch (increments on reconnect)
473        producer_epoch: u16,
474    },
475
476    /// Idempotent publish result
477    IdempotentPublished {
478        /// Offset where message was written
479        offset: u64,
480        /// Partition the message was written to
481        partition: u32,
482        /// Whether this was a duplicate (message already existed)
483        duplicate: bool,
484    },
485
486    // =========================================================================
487    // Native Transactions (KIP-98 Transactions)
488    // =========================================================================
489    /// Transaction started successfully
490    TransactionStarted {
491        /// Transaction ID
492        txn_id: String,
493    },
494
495    /// Partitions added to transaction
496    PartitionsAddedToTxn {
497        /// Transaction ID
498        txn_id: String,
499        /// Number of partitions now in transaction
500        partition_count: usize,
501    },
502
503    /// Transactional publish result
504    TransactionalPublished {
505        /// Offset where message was written (pending commit)
506        offset: u64,
507        /// Partition the message was written to
508        partition: u32,
509        /// Sequence number accepted
510        sequence: i32,
511    },
512
513    /// Offsets added to transaction
514    OffsetsAddedToTxn {
515        /// Transaction ID
516        txn_id: String,
517    },
518
519    /// Transaction committed
520    TransactionCommitted {
521        /// Transaction ID
522        txn_id: String,
523    },
524
525    /// Transaction aborted
526    TransactionAborted {
527        /// Transaction ID
528        txn_id: String,
529    },
530
531    // =========================================================================
532    // Per-Principal Quotas (Kafka Parity)
533    // =========================================================================
534    /// Quota descriptions
535    QuotasDescribed {
536        /// List of quota entries
537        entries: Vec<QuotaEntry>,
538    },
539
540    /// Quotas altered successfully
541    QuotasAltered {
542        /// Number of quota alterations applied
543        altered_count: usize,
544    },
545
546    /// Throttle response (returned when quota exceeded)
547    Throttled {
548        /// Time to wait before retrying (milliseconds)
549        throttle_time_ms: u64,
550        /// Quota type that was exceeded
551        quota_type: String,
552        /// Entity that exceeded quota
553        entity: String,
554    },
555
556    // =========================================================================
557    // Admin API (Kafka Parity)
558    // =========================================================================
559    /// Topic configuration altered
560    TopicConfigAltered {
561        /// Topic name
562        topic: String,
563        /// Number of configurations changed
564        changed_count: usize,
565    },
566
567    /// Partitions created
568    PartitionsCreated {
569        /// Topic name
570        topic: String,
571        /// New total partition count
572        new_partition_count: u32,
573    },
574
575    /// Records deleted
576    RecordsDeleted {
577        /// Topic name
578        topic: String,
579        /// Results per partition
580        results: Vec<DeleteRecordsResult>,
581    },
582
583    /// Topic configurations described
584    TopicConfigsDescribed {
585        /// Configuration descriptions per topic
586        configs: Vec<TopicConfigDescription>,
587    },
588}
589
590impl Request {
591    /// Serialize request to bytes
592    pub fn to_bytes(&self) -> Result<Vec<u8>> {
593        Ok(postcard::to_allocvec(self)?)
594    }
595
596    /// Deserialize request from bytes
597    pub fn from_bytes(data: &[u8]) -> Result<Self> {
598        Ok(postcard::from_bytes(data)?)
599    }
600}
601
602impl Response {
603    /// Serialize response to bytes
604    pub fn to_bytes(&self) -> Result<Vec<u8>> {
605        Ok(postcard::to_allocvec(self)?)
606    }
607
608    /// Deserialize response from bytes
609    pub fn from_bytes(data: &[u8]) -> Result<Self> {
610        Ok(postcard::from_bytes(data)?)
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617
618    #[test]
619    fn test_request_roundtrip() {
620        let requests = vec![
621            Request::Ping,
622            Request::ListTopics,
623            Request::CreateTopic {
624                name: "test".to_string(),
625                partitions: Some(4),
626            },
627            Request::Authenticate {
628                username: "admin".to_string(),
629                password: "secret".to_string(),
630            },
631        ];
632
633        for req in requests {
634            let bytes = req.to_bytes().unwrap();
635            let decoded = Request::from_bytes(&bytes).unwrap();
636            // Can't directly compare due to Debug, but serialization should succeed
637            assert!(!bytes.is_empty());
638            let _ = decoded; // Use decoded
639        }
640    }
641
642    #[test]
643    fn test_response_roundtrip() {
644        let responses = vec![
645            Response::Pong,
646            Response::Ok,
647            Response::Topics {
648                topics: vec!["a".to_string(), "b".to_string()],
649            },
650            Response::Error {
651                message: "test error".to_string(),
652            },
653        ];
654
655        for resp in responses {
656            let bytes = resp.to_bytes().unwrap();
657            let decoded = Response::from_bytes(&bytes).unwrap();
658            assert!(!bytes.is_empty());
659            let _ = decoded;
660        }
661    }
662}