rivven_protocol/messages.rs
1//! Protocol message types
2
3use crate::error::Result;
4use crate::metadata::{BrokerInfo, TopicMetadata};
5use crate::types::MessageData;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Quota alteration request
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QuotaAlteration {
13 /// Entity type: "user", "client-id", "consumer-group", "default"
14 pub entity_type: String,
15 /// Entity name (None for defaults)
16 pub entity_name: Option<String>,
17 /// Quota key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
18 pub quota_key: String,
19 /// Quota value (None to remove quota, Some to set)
20 pub quota_value: Option<u64>,
21}
22
23/// Quota entry in describe response
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct QuotaEntry {
26 /// Entity type
27 pub entity_type: String,
28 /// Entity name (None for defaults)
29 pub entity_name: Option<String>,
30 /// Quota values
31 pub quotas: HashMap<String, u64>,
32}
33
34/// Topic configuration entry for AlterTopicConfig
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct TopicConfigEntry {
37 /// Configuration key (e.g., "retention.ms", "max.message.bytes")
38 pub key: String,
39 /// Configuration value (None to reset to default)
40 pub value: Option<String>,
41}
42
43/// Topic configuration in describe response
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct TopicConfigDescription {
46 /// Topic name
47 pub topic: String,
48 /// Configuration entries
49 pub configs: HashMap<String, TopicConfigValue>,
50}
51
52/// Topic configuration value with metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct TopicConfigValue {
55 /// Current value
56 pub value: String,
57 /// Whether this is the default value
58 pub is_default: bool,
59 /// Whether this config is read-only
60 pub is_read_only: bool,
61 /// Whether this config is sensitive (e.g., passwords)
62 pub is_sensitive: bool,
63}
64
65/// Delete records result for a partition
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct DeleteRecordsResult {
68 /// Partition ID
69 pub partition: u32,
70 /// New low watermark (first available offset after deletion)
71 pub low_watermark: u64,
72 /// Error message if deletion failed for this partition
73 pub error: Option<String>,
74}
75
76/// Protocol request messages
77///
78/// # Stability
79///
80/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
81/// Adding new variants should only be done at the end of the enum.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub enum Request {
84 /// Authenticate with username/password (SASL/PLAIN compatible)
85 Authenticate { username: String, password: String },
86
87 /// Authenticate with SASL bytes (for Kafka client compatibility)
88 SaslAuthenticate {
89 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
90 mechanism: Bytes,
91 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
92 auth_bytes: Bytes,
93 },
94
95 /// SCRAM-SHA-256: Client-first message
96 ScramClientFirst {
97 /// Client-first-message bytes (`n,,n=<user>,r=<nonce>`)
98 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
99 message: Bytes,
100 },
101
102 /// SCRAM-SHA-256: Client-final message
103 ScramClientFinal {
104 /// Client-final-message bytes (`c=<binding>,r=<nonce>,p=<proof>`)
105 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
106 message: Bytes,
107 },
108
109 /// Publish a message to a topic
110 Publish {
111 topic: String,
112 partition: Option<u32>,
113 #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
114 key: Option<Bytes>,
115 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
116 value: Bytes,
117 },
118
119 /// Consume messages from a topic
120 Consume {
121 topic: String,
122 partition: u32,
123 offset: u64,
124 max_messages: usize,
125 },
126
127 /// Create a new topic
128 CreateTopic {
129 name: String,
130 partitions: Option<u32>,
131 },
132
133 /// List all topics
134 ListTopics,
135
136 /// Delete a topic
137 DeleteTopic { name: String },
138
139 /// Commit consumer offset
140 CommitOffset {
141 consumer_group: String,
142 topic: String,
143 partition: u32,
144 offset: u64,
145 },
146
147 /// Get consumer offset
148 GetOffset {
149 consumer_group: String,
150 topic: String,
151 partition: u32,
152 },
153
154 /// Get topic metadata
155 GetMetadata { topic: String },
156
157 /// Get cluster metadata (all topics or specific ones)
158 GetClusterMetadata {
159 /// Topics to get metadata for (empty = all topics)
160 topics: Vec<String>,
161 },
162
163 /// Ping
164 Ping,
165
166 /// Register a schema
167 RegisterSchema { subject: String, schema: String },
168
169 /// Get a schema
170 GetSchema { id: i32 },
171
172 /// Get offset bounds for a partition
173 GetOffsetBounds { topic: String, partition: u32 },
174
175 /// List all consumer groups
176 ListGroups,
177
178 /// Describe a consumer group (get all offsets)
179 DescribeGroup { consumer_group: String },
180
181 /// Delete a consumer group
182 DeleteGroup { consumer_group: String },
183
184 /// Find offset for a timestamp
185 GetOffsetForTimestamp {
186 topic: String,
187 partition: u32,
188 /// Timestamp in milliseconds since epoch
189 timestamp_ms: i64,
190 },
191
192 // =========================================================================
193 // Idempotent Producer (KIP-98)
194 // =========================================================================
195 /// Initialize idempotent producer (request producer ID and epoch)
196 ///
197 /// Call this before sending idempotent produce requests.
198 /// If reconnecting, provide the previous producer_id to bump epoch.
199 InitProducerId {
200 /// Previous producer ID (None for new producers)
201 producer_id: Option<u64>,
202 },
203
204 /// Publish with idempotent semantics (exactly-once delivery)
205 ///
206 /// Requires InitProducerId to have been called first.
207 IdempotentPublish {
208 topic: String,
209 partition: Option<u32>,
210 #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
211 key: Option<Bytes>,
212 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
213 value: Bytes,
214 /// Producer ID from InitProducerId response
215 producer_id: u64,
216 /// Producer epoch from InitProducerId response
217 producer_epoch: u16,
218 /// Sequence number (starts at 0, increments per partition)
219 sequence: i32,
220 },
221
222 // =========================================================================
223 // Native Transactions (KIP-98 Transactions)
224 // =========================================================================
225 /// Begin a new transaction
226 BeginTransaction {
227 /// Transaction ID (unique per producer)
228 txn_id: String,
229 /// Producer ID from InitProducerId
230 producer_id: u64,
231 /// Producer epoch
232 producer_epoch: u16,
233 /// Transaction timeout in milliseconds (optional, defaults to 60s)
234 timeout_ms: Option<u64>,
235 },
236
237 /// Add partitions to an active transaction
238 AddPartitionsToTxn {
239 /// Transaction ID
240 txn_id: String,
241 /// Producer ID
242 producer_id: u64,
243 /// Producer epoch
244 producer_epoch: u16,
245 /// Partitions to add (topic, partition pairs)
246 partitions: Vec<(String, u32)>,
247 },
248
249 /// Publish within a transaction (combines IdempotentPublish + transaction tracking)
250 TransactionalPublish {
251 /// Transaction ID
252 txn_id: String,
253 topic: String,
254 partition: Option<u32>,
255 #[serde(with = "rivven_core::serde_utils::option_bytes_serde")]
256 key: Option<Bytes>,
257 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
258 value: Bytes,
259 /// Producer ID
260 producer_id: u64,
261 /// Producer epoch
262 producer_epoch: u16,
263 /// Sequence number
264 sequence: i32,
265 },
266
267 /// Add consumer offsets to transaction (for exactly-once consume-transform-produce)
268 AddOffsetsToTxn {
269 /// Transaction ID
270 txn_id: String,
271 /// Producer ID
272 producer_id: u64,
273 /// Producer epoch
274 producer_epoch: u16,
275 /// Consumer group ID
276 group_id: String,
277 /// Offsets to commit (topic, partition, offset triples)
278 offsets: Vec<(String, u32, i64)>,
279 },
280
281 /// Commit a transaction
282 CommitTransaction {
283 /// Transaction ID
284 txn_id: String,
285 /// Producer ID
286 producer_id: u64,
287 /// Producer epoch
288 producer_epoch: u16,
289 },
290
291 /// Abort a transaction
292 AbortTransaction {
293 /// Transaction ID
294 txn_id: String,
295 /// Producer ID
296 producer_id: u64,
297 /// Producer epoch
298 producer_epoch: u16,
299 },
300
301 // =========================================================================
302 // Per-Principal Quotas (Kafka Parity)
303 // =========================================================================
304 /// Describe quotas for entities
305 DescribeQuotas {
306 /// Entities to describe (empty = all)
307 /// Format: Vec<(entity_type, entity_name)>
308 /// entity_type: "user", "client-id", "consumer-group", "default"
309 /// entity_name: None for defaults, Some for specific entities
310 entities: Vec<(String, Option<String>)>,
311 },
312
313 /// Alter quotas for entities
314 AlterQuotas {
315 /// Quota alterations to apply
316 /// Each item: (entity_type, entity_name, quota_key, quota_value)
317 /// quota_key: "produce_bytes_rate", "consume_bytes_rate", "request_rate"
318 /// quota_value: None to remove, Some(value) to set
319 alterations: Vec<QuotaAlteration>,
320 },
321
322 // =========================================================================
323 // Admin API (Kafka Parity)
324 // =========================================================================
325 /// Alter topic configuration
326 AlterTopicConfig {
327 /// Topic name
328 topic: String,
329 /// Configuration changes to apply
330 configs: Vec<TopicConfigEntry>,
331 },
332
333 /// Create additional partitions for an existing topic
334 CreatePartitions {
335 /// Topic name
336 topic: String,
337 /// New total partition count (must be > current count)
338 new_partition_count: u32,
339 /// Optional assignment of new partitions to brokers
340 /// If empty, broker will auto-assign
341 assignments: Vec<Vec<String>>,
342 },
343
344 /// Delete records before a given offset (log truncation)
345 DeleteRecords {
346 /// Topic name
347 topic: String,
348 /// Partition-offset pairs: delete all records before these offsets
349 partition_offsets: Vec<(u32, u64)>,
350 },
351
352 /// Describe topic configurations
353 DescribeTopicConfigs {
354 /// Topics to describe (empty = all)
355 topics: Vec<String>,
356 },
357}
358
359/// Protocol response messages
360///
361/// # Stability
362///
363/// **WARNING**: Variant order must remain stable for postcard serialization compatibility.
364/// Adding new variants should only be done at the end of the enum.
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub enum Response {
367 /// Authentication successful
368 Authenticated {
369 /// Session token for subsequent requests
370 session_id: String,
371 /// Session timeout in seconds
372 expires_in: u64,
373 },
374
375 /// SCRAM-SHA-256: Server-first message (challenge)
376 ScramServerFirst {
377 /// Server-first-message bytes (`r=<nonce>,s=<salt>,i=<iterations>`)
378 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
379 message: Bytes,
380 },
381
382 /// SCRAM-SHA-256: Server-final message (verification or error)
383 ScramServerFinal {
384 /// Server-final-message bytes (`v=<verifier>` or `e=<error>`)
385 #[serde(with = "rivven_core::serde_utils::bytes_serde")]
386 message: Bytes,
387 /// Session ID (if authentication succeeded)
388 session_id: Option<String>,
389 /// Session timeout in seconds (if authentication succeeded)
390 expires_in: Option<u64>,
391 },
392
393 /// Success response with offset
394 Published { offset: u64, partition: u32 },
395
396 /// Messages response
397 Messages { messages: Vec<MessageData> },
398
399 /// Topic created
400 TopicCreated { name: String, partitions: u32 },
401
402 /// List of topics
403 Topics { topics: Vec<String> },
404
405 /// Topic deleted
406 TopicDeleted,
407
408 /// Offset committed
409 OffsetCommitted,
410
411 /// Offset response
412 Offset { offset: Option<u64> },
413
414 /// Metadata
415 Metadata { name: String, partitions: u32 },
416
417 /// Full cluster metadata for topic(s)
418 ClusterMetadata {
419 /// Controller node ID (Raft leader)
420 controller_id: Option<String>,
421 /// Broker/node list
422 brokers: Vec<BrokerInfo>,
423 /// Topic metadata
424 topics: Vec<TopicMetadata>,
425 },
426
427 /// Schema registration result
428 SchemaRegistered { id: i32 },
429
430 /// Schema details
431 Schema { id: i32, schema: String },
432
433 /// Pong
434 Pong,
435
436 /// Offset bounds for a partition
437 OffsetBounds { earliest: u64, latest: u64 },
438
439 /// List of consumer groups
440 Groups { groups: Vec<String> },
441
442 /// Consumer group details with all offsets
443 GroupDescription {
444 consumer_group: String,
445 /// topic → partition → offset
446 offsets: HashMap<String, HashMap<u32, u64>>,
447 },
448
449 /// Consumer group deleted
450 GroupDeleted,
451
452 /// Offset for a timestamp
453 OffsetForTimestamp {
454 /// The first offset with timestamp >= the requested timestamp
455 /// None if no matching offset was found
456 offset: Option<u64>,
457 },
458
459 /// Error response
460 Error { message: String },
461
462 /// Success
463 Ok,
464
465 // =========================================================================
466 // Idempotent Producer (KIP-98)
467 // =========================================================================
468 /// Producer ID initialized
469 ProducerIdInitialized {
470 /// Assigned or existing producer ID
471 producer_id: u64,
472 /// Current epoch (increments on reconnect)
473 producer_epoch: u16,
474 },
475
476 /// Idempotent publish result
477 IdempotentPublished {
478 /// Offset where message was written
479 offset: u64,
480 /// Partition the message was written to
481 partition: u32,
482 /// Whether this was a duplicate (message already existed)
483 duplicate: bool,
484 },
485
486 // =========================================================================
487 // Native Transactions (KIP-98 Transactions)
488 // =========================================================================
489 /// Transaction started successfully
490 TransactionStarted {
491 /// Transaction ID
492 txn_id: String,
493 },
494
495 /// Partitions added to transaction
496 PartitionsAddedToTxn {
497 /// Transaction ID
498 txn_id: String,
499 /// Number of partitions now in transaction
500 partition_count: usize,
501 },
502
503 /// Transactional publish result
504 TransactionalPublished {
505 /// Offset where message was written (pending commit)
506 offset: u64,
507 /// Partition the message was written to
508 partition: u32,
509 /// Sequence number accepted
510 sequence: i32,
511 },
512
513 /// Offsets added to transaction
514 OffsetsAddedToTxn {
515 /// Transaction ID
516 txn_id: String,
517 },
518
519 /// Transaction committed
520 TransactionCommitted {
521 /// Transaction ID
522 txn_id: String,
523 },
524
525 /// Transaction aborted
526 TransactionAborted {
527 /// Transaction ID
528 txn_id: String,
529 },
530
531 // =========================================================================
532 // Per-Principal Quotas (Kafka Parity)
533 // =========================================================================
534 /// Quota descriptions
535 QuotasDescribed {
536 /// List of quota entries
537 entries: Vec<QuotaEntry>,
538 },
539
540 /// Quotas altered successfully
541 QuotasAltered {
542 /// Number of quota alterations applied
543 altered_count: usize,
544 },
545
546 /// Throttle response (returned when quota exceeded)
547 Throttled {
548 /// Time to wait before retrying (milliseconds)
549 throttle_time_ms: u64,
550 /// Quota type that was exceeded
551 quota_type: String,
552 /// Entity that exceeded quota
553 entity: String,
554 },
555
556 // =========================================================================
557 // Admin API (Kafka Parity)
558 // =========================================================================
559 /// Topic configuration altered
560 TopicConfigAltered {
561 /// Topic name
562 topic: String,
563 /// Number of configurations changed
564 changed_count: usize,
565 },
566
567 /// Partitions created
568 PartitionsCreated {
569 /// Topic name
570 topic: String,
571 /// New total partition count
572 new_partition_count: u32,
573 },
574
575 /// Records deleted
576 RecordsDeleted {
577 /// Topic name
578 topic: String,
579 /// Results per partition
580 results: Vec<DeleteRecordsResult>,
581 },
582
583 /// Topic configurations described
584 TopicConfigsDescribed {
585 /// Configuration descriptions per topic
586 configs: Vec<TopicConfigDescription>,
587 },
588}
589
590impl Request {
591 /// Serialize request to bytes
592 pub fn to_bytes(&self) -> Result<Vec<u8>> {
593 Ok(postcard::to_allocvec(self)?)
594 }
595
596 /// Deserialize request from bytes
597 pub fn from_bytes(data: &[u8]) -> Result<Self> {
598 Ok(postcard::from_bytes(data)?)
599 }
600}
601
602impl Response {
603 /// Serialize response to bytes
604 pub fn to_bytes(&self) -> Result<Vec<u8>> {
605 Ok(postcard::to_allocvec(self)?)
606 }
607
608 /// Deserialize response from bytes
609 pub fn from_bytes(data: &[u8]) -> Result<Self> {
610 Ok(postcard::from_bytes(data)?)
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617
618 #[test]
619 fn test_request_roundtrip() {
620 let requests = vec![
621 Request::Ping,
622 Request::ListTopics,
623 Request::CreateTopic {
624 name: "test".to_string(),
625 partitions: Some(4),
626 },
627 Request::Authenticate {
628 username: "admin".to_string(),
629 password: "secret".to_string(),
630 },
631 ];
632
633 for req in requests {
634 let bytes = req.to_bytes().unwrap();
635 let decoded = Request::from_bytes(&bytes).unwrap();
636 // Can't directly compare due to Debug, but serialization should succeed
637 assert!(!bytes.is_empty());
638 let _ = decoded; // Use decoded
639 }
640 }
641
642 #[test]
643 fn test_response_roundtrip() {
644 let responses = vec![
645 Response::Pong,
646 Response::Ok,
647 Response::Topics {
648 topics: vec!["a".to_string(), "b".to_string()],
649 },
650 Response::Error {
651 message: "test error".to_string(),
652 },
653 ];
654
655 for resp in responses {
656 let bytes = resp.to_bytes().unwrap();
657 let decoded = Response::from_bytes(&bytes).unwrap();
658 assert!(!bytes.is_empty());
659 let _ = decoded;
660 }
661 }
662}