1use crate::error::{ClusterError, Result};
14use crate::node::{NodeId, NodeInfo};
15use crate::partition::{PartitionId, PartitionState, TopicConfig, TopicState};
16use rivven_core::consumer_group::{ConsumerGroup, GroupId};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub enum MetadataCommand {
25 CreateTopic {
28 config: TopicConfig,
29 partition_assignments: Vec<Vec<NodeId>>,
30 },
31
32 DeleteTopic { name: String },
34
35 UpdateTopicConfig {
37 name: String,
38 config: HashMap<String, String>,
39 },
40
41 AddPartitions {
43 topic: String,
44 new_assignments: Vec<Vec<NodeId>>,
45 },
46
47 UpdatePartitionLeader {
50 partition: PartitionId,
51 leader: NodeId,
52 epoch: u64,
53 },
54
55 UpdatePartitionIsr {
57 partition: PartitionId,
58 isr: Vec<NodeId>,
59 leader_epoch: u64,
61 },
62
63 ReassignPartition {
65 partition: PartitionId,
66 replicas: Vec<NodeId>,
67 },
68
69 RegisterNode { info: NodeInfo },
72
73 DeregisterNode { node_id: NodeId },
75
76 UpdateNode { node_id: NodeId, info: NodeInfo },
78
79 UpdateClusterConfig { config: HashMap<String, String> },
82
83 UpdateConsumerGroup { group: ConsumerGroup },
86
87 DeleteConsumerGroup { group_id: GroupId },
89
90 Noop,
92
93 Batch(Vec<MetadataCommand>),
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub enum MetadataResponse {
107 Success,
108 TopicCreated {
109 name: String,
110 partitions: u32,
111 },
112 TopicDeleted {
113 name: String,
114 },
115 PartitionLeaderUpdated {
116 partition: PartitionId,
117 leader: NodeId,
118 },
119 IsrUpdated {
120 partition: PartitionId,
121 isr: Vec<NodeId>,
122 },
123 NodeRegistered {
124 node_id: NodeId,
125 },
126 NodeDeregistered {
127 node_id: NodeId,
128 },
129 ConsumerGroupUpdated {
130 group_id: GroupId,
131 },
132 ConsumerGroupDeleted {
133 group_id: GroupId,
134 },
135 BatchResponses(Vec<MetadataResponse>),
137 Error {
138 message: String,
139 },
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize, Default)]
144pub struct ClusterMetadata {
145 pub topics: HashMap<String, TopicState>,
147
148 pub nodes: HashMap<NodeId, NodeInfo>,
150
151 pub consumer_groups: HashMap<GroupId, ConsumerGroup>,
153
154 pub config: HashMap<String, String>,
156
157 pub last_applied_index: u64,
159
160 pub epoch: u64,
162}
163
164impl ClusterMetadata {
165 pub fn new() -> Self {
167 Self::default()
168 }
169
170 pub fn apply(&mut self, index: u64, cmd: MetadataCommand) -> MetadataResponse {
172 self.last_applied_index = index;
173 self.apply_cmd(cmd)
174 }
175
176 fn apply_cmd(&mut self, cmd: MetadataCommand) -> MetadataResponse {
185 match cmd {
186 MetadataCommand::CreateTopic {
187 config,
188 partition_assignments,
189 } => self.create_topic(config, partition_assignments),
190 MetadataCommand::DeleteTopic { name } => self.delete_topic(&name),
191 MetadataCommand::UpdateTopicConfig { name, config } => {
192 self.update_topic_config(&name, config)
193 }
194 MetadataCommand::AddPartitions {
195 topic,
196 new_assignments,
197 } => self.add_partitions(&topic, new_assignments),
198 MetadataCommand::UpdatePartitionLeader {
199 partition,
200 leader,
201 epoch,
202 } => self.update_partition_leader(&partition, leader, epoch),
203 MetadataCommand::UpdatePartitionIsr {
204 partition,
205 isr,
206 leader_epoch,
207 } => self.update_partition_isr(&partition, isr, leader_epoch),
208 MetadataCommand::ReassignPartition {
209 partition,
210 replicas,
211 } => self.reassign_partition(&partition, replicas),
212 MetadataCommand::RegisterNode { info } => self.register_node(info),
213 MetadataCommand::DeregisterNode { node_id } => self.deregister_node(&node_id),
214 MetadataCommand::UpdateNode { node_id, info } => self.update_node(&node_id, info),
215 MetadataCommand::UpdateClusterConfig { config } => self.update_cluster_config(config),
216 MetadataCommand::UpdateConsumerGroup { group } => self.update_consumer_group(group),
217 MetadataCommand::DeleteConsumerGroup { group_id } => {
218 self.delete_consumer_group(&group_id)
219 }
220 MetadataCommand::Noop => MetadataResponse::Success,
221 MetadataCommand::Batch(commands) => {
222 let mut responses = Vec::with_capacity(commands.len());
232 for cmd in commands {
233 let resp = self.apply_cmd(cmd);
234 responses.push(resp);
235 }
236 MetadataResponse::BatchResponses(responses)
238 }
239 }
240 }
241
242 fn create_topic(
244 &mut self,
245 config: TopicConfig,
246 partition_assignments: Vec<Vec<NodeId>>,
247 ) -> MetadataResponse {
248 if let Err(msg) = Self::validate_topic_name(&config.name) {
250 return MetadataResponse::Error { message: msg };
251 }
252 if self.topics.contains_key(&config.name) {
253 return MetadataResponse::Error {
254 message: format!("Topic {} already exists", config.name),
255 };
256 }
257
258 let name = config.name.clone();
259 let partitions = config.partitions;
260 let topic_state = TopicState::new(config, partition_assignments);
261
262 self.topics.insert(name.clone(), topic_state);
263 self.epoch += 1;
264
265 MetadataResponse::TopicCreated { name, partitions }
266 }
267
268 fn validate_topic_name(name: &str) -> std::result::Result<(), String> {
270 if name.is_empty() {
271 return Err("Topic name cannot be empty".into());
272 }
273 if name.len() > 249 {
274 return Err(format!(
275 "Topic name too long: {} chars (max 249)",
276 name.len()
277 ));
278 }
279 if name == "." || name == ".." {
280 return Err("Topic name cannot be '.' or '..'".into());
281 }
282 if name.contains(|c: char| c == '/' || c == '\\' || c == '\0' || c.is_control()) {
283 return Err("Topic name contains invalid characters".into());
284 }
285 Ok(())
286 }
287
288 fn delete_topic(&mut self, name: &str) -> MetadataResponse {
290 if self.topics.remove(name).is_some() {
291 self.epoch += 1;
292 MetadataResponse::TopicDeleted {
293 name: name.to_string(),
294 }
295 } else {
296 MetadataResponse::Error {
297 message: format!("Topic {} not found", name),
298 }
299 }
300 }
301
302 fn update_topic_config(
304 &mut self,
305 name: &str,
306 config: HashMap<String, String>,
307 ) -> MetadataResponse {
308 if let Some(topic) = self.topics.get_mut(name) {
309 topic.config.config.extend(config);
310 MetadataResponse::Success
311 } else {
312 MetadataResponse::Error {
313 message: format!("Topic {} not found", name),
314 }
315 }
316 }
317
318 fn add_partitions(
320 &mut self,
321 topic: &str,
322 new_assignments: Vec<Vec<NodeId>>,
323 ) -> MetadataResponse {
324 if let Some(topic_state) = self.topics.get_mut(topic) {
325 let start_idx = topic_state.partitions.len() as u32;
326
327 for (i, replicas) in new_assignments.into_iter().enumerate() {
328 let partition_id = PartitionId::new(topic, start_idx + i as u32);
329 let mut partition_state = PartitionState::new(partition_id, replicas);
330 partition_state.elect_leader();
331 topic_state.partitions.push(partition_state);
332 }
333
334 topic_state.config.partitions = topic_state.partitions.len() as u32;
335 self.epoch += 1;
336
337 MetadataResponse::Success
338 } else {
339 MetadataResponse::Error {
340 message: format!("Topic {} not found", topic),
341 }
342 }
343 }
344
345 fn update_partition_leader(
347 &mut self,
348 partition: &PartitionId,
349 leader: NodeId,
350 epoch: u64,
351 ) -> MetadataResponse {
352 if let Some(topic) = self.topics.get_mut(&partition.topic) {
353 if let Some(p) = topic.partition_mut(partition.partition) {
354 if epoch > p.leader_epoch {
356 p.leader = Some(leader.clone());
357 p.leader_epoch = epoch;
358 p.online = true;
359 return MetadataResponse::PartitionLeaderUpdated {
360 partition: partition.clone(),
361 leader,
362 };
363 }
364 }
365 }
366
367 MetadataResponse::Error {
368 message: format!("Partition {} not found or stale epoch", partition),
369 }
370 }
371
372 fn update_partition_isr(
374 &mut self,
375 partition: &PartitionId,
376 isr: Vec<NodeId>,
377 leader_epoch: u64,
378 ) -> MetadataResponse {
379 if let Some(topic) = self.topics.get_mut(&partition.topic) {
380 if let Some(p) = topic.partition_mut(partition.partition) {
381 if leader_epoch < p.leader_epoch {
383 return MetadataResponse::Error {
384 message: format!(
385 "Stale leader epoch {} < current {} for partition {}",
386 leader_epoch, p.leader_epoch, partition
387 ),
388 };
389 }
390
391 p.isr = isr.iter().cloned().collect();
392 p.under_replicated = p.isr.len() < p.replicas.len();
393
394 for replica in &mut p.replicas {
396 if isr.contains(&replica.node_id) {
397 replica.state = crate::partition::ReplicaState::InSync;
398 } else {
399 replica.state = crate::partition::ReplicaState::CatchingUp;
400 }
401 }
402
403 return MetadataResponse::IsrUpdated {
404 partition: partition.clone(),
405 isr,
406 };
407 }
408 }
409
410 MetadataResponse::Error {
411 message: format!("Partition {} not found", partition),
412 }
413 }
414
415 fn reassign_partition(
417 &mut self,
418 partition: &PartitionId,
419 replicas: Vec<NodeId>,
420 ) -> MetadataResponse {
421 if let Some(topic) = self.topics.get_mut(&partition.topic) {
422 if let Some(p) = topic.partition_mut(partition.partition) {
423 p.replicas = replicas
425 .iter()
426 .map(|n| crate::partition::ReplicaInfo::new(n.clone()))
427 .collect();
428
429 p.isr.clear();
434 if let Some(leader) = &p.leader {
435 if replicas.iter().any(|r| r == leader) {
436 p.isr.insert(leader.clone());
437 } else {
438 p.leader = replicas.first().cloned();
440 if let Some(new_leader) = &p.leader {
441 p.isr.insert(new_leader.clone());
442 }
443 }
444 }
445
446 p.under_replicated = true;
447 self.epoch += 1;
448
449 return MetadataResponse::Success;
450 }
451 }
452
453 MetadataResponse::Error {
454 message: format!("Partition {} not found", partition),
455 }
456 }
457
458 fn register_node(&mut self, info: NodeInfo) -> MetadataResponse {
460 let node_id = info.id.clone();
461 self.nodes.insert(node_id.clone(), info);
462 self.epoch += 1;
463 MetadataResponse::NodeRegistered { node_id }
464 }
465
466 fn deregister_node(&mut self, node_id: &NodeId) -> MetadataResponse {
468 if self.nodes.remove(node_id).is_some() {
469 self.epoch += 1;
470 MetadataResponse::NodeDeregistered {
471 node_id: node_id.clone(),
472 }
473 } else {
474 MetadataResponse::Error {
475 message: format!("Node {} not found", node_id),
476 }
477 }
478 }
479
480 fn update_node(&mut self, node_id: &NodeId, info: NodeInfo) -> MetadataResponse {
482 if self.nodes.contains_key(node_id) {
483 self.nodes.insert(node_id.clone(), info);
484 MetadataResponse::Success
485 } else {
486 MetadataResponse::Error {
487 message: format!("Node {} not found", node_id),
488 }
489 }
490 }
491
492 fn update_cluster_config(&mut self, config: HashMap<String, String>) -> MetadataResponse {
494 self.config.extend(config);
495 self.epoch += 1;
496 MetadataResponse::Success
497 }
498
499 fn update_consumer_group(&mut self, group: ConsumerGroup) -> MetadataResponse {
501 let group_id = group.group_id.clone();
502 self.consumer_groups.insert(group_id.clone(), group);
503 MetadataResponse::ConsumerGroupUpdated { group_id }
504 }
505
506 fn delete_consumer_group(&mut self, group_id: &GroupId) -> MetadataResponse {
508 if self.consumer_groups.remove(group_id).is_some() {
509 MetadataResponse::ConsumerGroupDeleted {
510 group_id: group_id.clone(),
511 }
512 } else {
513 MetadataResponse::Error {
514 message: format!("Consumer group {} not found", group_id),
515 }
516 }
517 }
518
519 pub fn get_topic(&self, name: &str) -> Option<&TopicState> {
523 self.topics.get(name)
524 }
525
526 pub fn get_partition(&self, id: &PartitionId) -> Option<&PartitionState> {
528 self.topics.get(&id.topic)?.partition(id.partition)
529 }
530
531 pub fn get_node(&self, node_id: &NodeId) -> Option<&NodeInfo> {
533 self.nodes.get(node_id)
534 }
535
536 pub fn topic_names(&self) -> Vec<&str> {
538 self.topics.keys().map(|s| s.as_str()).collect()
539 }
540
541 pub fn node_ids(&self) -> Vec<&NodeId> {
543 self.nodes.keys().collect()
544 }
545
546 pub fn get_consumer_group(&self, group_id: &GroupId) -> Option<&ConsumerGroup> {
548 self.consumer_groups.get(group_id)
549 }
550
551 pub fn consumer_group_ids(&self) -> Vec<&GroupId> {
553 self.consumer_groups.keys().collect()
554 }
555
556 pub fn find_leader(&self, topic: &str, partition: u32) -> Option<&NodeId> {
558 self.topics
559 .get(topic)?
560 .partition(partition)?
561 .leader
562 .as_ref()
563 }
564
565 pub fn partitions_led_by(&self, node_id: &NodeId) -> Vec<PartitionId> {
567 let mut result = vec![];
568 for (topic_name, topic) in &self.topics {
569 for (i, partition) in topic.partitions.iter().enumerate() {
570 if partition.leader.as_ref() == Some(node_id) {
571 result.push(PartitionId::new(topic_name, i as u32));
572 }
573 }
574 }
575 result
576 }
577
578 pub fn partitions_on_node(&self, node_id: &NodeId) -> Vec<PartitionId> {
580 let mut result = vec![];
581 for (topic_name, topic) in &self.topics {
582 for (i, partition) in topic.partitions.iter().enumerate() {
583 if partition.is_replica(node_id) {
584 result.push(PartitionId::new(topic_name, i as u32));
585 }
586 }
587 }
588 result
589 }
590
591 pub fn under_replicated_partitions(&self) -> Vec<PartitionId> {
593 let mut result = vec![];
594 for (topic_name, topic) in &self.topics {
595 for (i, partition) in topic.partitions.iter().enumerate() {
596 if partition.under_replicated {
597 result.push(PartitionId::new(topic_name, i as u32));
598 }
599 }
600 }
601 result
602 }
603
604 pub fn offline_partitions(&self) -> Vec<PartitionId> {
606 let mut result = vec![];
607 for (topic_name, topic) in &self.topics {
608 for (i, partition) in topic.partitions.iter().enumerate() {
609 if !partition.online {
610 result.push(PartitionId::new(topic_name, i as u32));
611 }
612 }
613 }
614 result
615 }
616
617 pub fn serialize(&self) -> Result<Vec<u8>> {
619 postcard::to_allocvec(self).map_err(|e| ClusterError::Serialization(e.to_string()))
620 }
621
622 pub fn deserialize(data: &[u8]) -> Result<Self> {
624 postcard::from_bytes(data).map_err(|e| ClusterError::Deserialization(e.to_string()))
625 }
626}
627
628pub struct MetadataStore {
630 metadata: Arc<RwLock<ClusterMetadata>>,
631}
632
633impl MetadataStore {
634 pub fn new() -> Self {
635 Self {
636 metadata: Arc::new(RwLock::new(ClusterMetadata::new())),
637 }
638 }
639
640 pub fn from_metadata(metadata: ClusterMetadata) -> Self {
641 Self {
642 metadata: Arc::new(RwLock::new(metadata)),
643 }
644 }
645
646 pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, ClusterMetadata> {
647 self.metadata.read().await
648 }
649
650 pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, ClusterMetadata> {
651 self.metadata.write().await
652 }
653
654 pub async fn apply(&self, index: u64, cmd: MetadataCommand) -> MetadataResponse {
655 self.metadata.write().await.apply(index, cmd)
656 }
657
658 pub async fn get_topic(&self, name: &str) -> Option<TopicState> {
659 self.metadata.read().await.topics.get(name).cloned()
660 }
661
662 pub async fn get_partition(&self, id: &PartitionId) -> Option<PartitionState> {
663 let meta = self.metadata.read().await;
664 meta.topics.get(&id.topic)?.partition(id.partition).cloned()
665 }
666
667 pub async fn epoch(&self) -> u64 {
668 self.metadata.read().await.epoch
669 }
670}
671
672impl Default for MetadataStore {
673 fn default() -> Self {
674 Self::new()
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681
682 #[test]
683 fn test_create_topic() {
684 let mut metadata = ClusterMetadata::new();
685
686 let config = TopicConfig::new("test-topic", 3, 2);
687 let assignments = vec![
688 vec!["node-1".to_string(), "node-2".to_string()],
689 vec!["node-2".to_string(), "node-3".to_string()],
690 vec!["node-3".to_string(), "node-1".to_string()],
691 ];
692
693 let cmd = MetadataCommand::CreateTopic {
694 config,
695 partition_assignments: assignments,
696 };
697 let result = metadata.apply(1, cmd);
698
699 match result {
700 MetadataResponse::TopicCreated { name, partitions } => {
701 assert_eq!(name, "test-topic");
702 assert_eq!(partitions, 3);
703 }
704 _ => panic!("Expected TopicCreated"),
705 }
706
707 assert!(metadata.topics.contains_key("test-topic"));
708 assert_eq!(metadata.topics["test-topic"].partitions.len(), 3);
709 }
710
711 #[test]
712 fn test_update_partition_leader() {
713 let mut metadata = ClusterMetadata::new();
714
715 let config = TopicConfig::new("test", 1, 2);
717 let assignments = vec![vec!["node-1".to_string(), "node-2".to_string()]];
718 metadata.apply(
719 1,
720 MetadataCommand::CreateTopic {
721 config,
722 partition_assignments: assignments,
723 },
724 );
725
726 let partition = PartitionId::new("test", 0);
729 let result = metadata.apply(
730 2,
731 MetadataCommand::UpdatePartitionLeader {
732 partition: partition.clone(),
733 leader: "node-2".to_string(),
734 epoch: 2, },
736 );
737
738 assert!(matches!(
739 result,
740 MetadataResponse::PartitionLeaderUpdated { .. }
741 ));
742 assert_eq!(
743 metadata.topics["test"].partition(0).unwrap().leader,
744 Some("node-2".to_string())
745 );
746 }
747
748 #[test]
749 fn test_register_deregister_node() {
750 let mut metadata = ClusterMetadata::new();
751
752 let info = NodeInfo::new(
753 "node-1",
754 "127.0.0.1:9092".parse().unwrap(),
755 "127.0.0.1:9093".parse().unwrap(),
756 );
757
758 let result = metadata.apply(1, MetadataCommand::RegisterNode { info });
760 assert!(matches!(result, MetadataResponse::NodeRegistered { .. }));
761 assert!(metadata.nodes.contains_key("node-1"));
762
763 let result = metadata.apply(
765 2,
766 MetadataCommand::DeregisterNode {
767 node_id: "node-1".to_string(),
768 },
769 );
770 assert!(matches!(result, MetadataResponse::NodeDeregistered { .. }));
771 assert!(!metadata.nodes.contains_key("node-1"));
772 }
773
774 #[test]
775 fn test_serialization() {
776 let mut metadata = ClusterMetadata::new();
777 metadata.apply(
778 1,
779 MetadataCommand::CreateTopic {
780 config: TopicConfig::new("test", 2, 1),
781 partition_assignments: vec![vec!["node-1".to_string()], vec!["node-1".to_string()]],
782 },
783 );
784
785 let bytes = metadata.serialize().unwrap();
786 let restored = ClusterMetadata::deserialize(&bytes).unwrap();
787
788 assert_eq!(restored.topics.len(), metadata.topics.len());
789 assert_eq!(restored.last_applied_index, metadata.last_applied_index);
790 }
791}