Skip to main content

rivven_cluster/
metadata.rs

1//! Cluster metadata and Raft state machine
2//!
3//! This module implements the Raft state machine for cluster metadata consensus.
4//! Unlike systems like Redpanda that use Raft per-partition, we use a single
5//! Raft group for all metadata which is simpler and more memory efficient.
6//!
7//! Metadata managed by Raft:
8//! - Topic configurations
9//! - Partition assignments (which nodes host which partitions)
10//! - In-Sync Replica (ISR) state
11//! - Node registry (known nodes and their capabilities)
12
13use crate::error::{ClusterError, Result};
14use crate::node::{NodeId, NodeInfo};
15use crate::partition::{PartitionId, PartitionState, TopicConfig, TopicState};
16use rivven_core::consumer_group::{ConsumerGroup, GroupId};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Raft log entry types for metadata operations
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub enum MetadataCommand {
25    // ==================== Topic Operations ====================
26    /// Create a new topic
27    CreateTopic {
28        config: TopicConfig,
29        partition_assignments: Vec<Vec<NodeId>>,
30    },
31
32    /// Delete a topic
33    DeleteTopic { name: String },
34
35    /// Update topic configuration
36    UpdateTopicConfig {
37        name: String,
38        config: HashMap<String, String>,
39    },
40
41    /// Add partitions to existing topic
42    AddPartitions {
43        topic: String,
44        new_assignments: Vec<Vec<NodeId>>,
45    },
46
47    // ==================== Partition Operations ====================
48    /// Update partition leader
49    UpdatePartitionLeader {
50        partition: PartitionId,
51        leader: NodeId,
52        epoch: u64,
53    },
54
55    /// Update partition ISR (includes leader epoch for fencing)
56    UpdatePartitionIsr {
57        partition: PartitionId,
58        isr: Vec<NodeId>,
59        /// Leader epoch of the proposer — stale-epoch proposals are rejected
60        leader_epoch: u64,
61    },
62
63    /// Reassign partition replicas
64    ReassignPartition {
65        partition: PartitionId,
66        replicas: Vec<NodeId>,
67    },
68
69    // ==================== Node Operations ====================
70    /// Register a new node
71    RegisterNode { info: NodeInfo },
72
73    /// Deregister a node
74    DeregisterNode { node_id: NodeId },
75
76    /// Update node metadata
77    UpdateNode { node_id: NodeId, info: NodeInfo },
78
79    // ==================== Cluster Operations ====================
80    /// Update cluster-wide configuration
81    UpdateClusterConfig { config: HashMap<String, String> },
82
83    // ==================== Consumer Group Operations ====================
84    /// Update consumer group state (full snapshot)
85    UpdateConsumerGroup { group: ConsumerGroup },
86
87    /// Delete consumer group
88    DeleteConsumerGroup { group_id: GroupId },
89
90    /// No-op (for heartbeats/leader election)
91    Noop,
92
93    /// Batch of commands applied in a single Raft consensus round.
94    ///
95    /// Semantics: **best-effort** — each command is applied independently.
96    /// Some commands may succeed while others fail. The batch is atomic at
97    /// the Raft log level (single entry, single fsync), but individual
98    /// commands can return errors without rolling back prior successes.
99    ///
100    /// Used by `propose_batch` for higher throughput.
101    Batch(Vec<MetadataCommand>),
102}
103
104/// Result of applying a metadata command
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub enum MetadataResponse {
107    Success,
108    TopicCreated {
109        name: String,
110        partitions: u32,
111    },
112    TopicDeleted {
113        name: String,
114    },
115    PartitionLeaderUpdated {
116        partition: PartitionId,
117        leader: NodeId,
118    },
119    IsrUpdated {
120        partition: PartitionId,
121        isr: Vec<NodeId>,
122    },
123    NodeRegistered {
124        node_id: NodeId,
125    },
126    NodeDeregistered {
127        node_id: NodeId,
128    },
129    ConsumerGroupUpdated {
130        group_id: GroupId,
131    },
132    ConsumerGroupDeleted {
133        group_id: GroupId,
134    },
135    /// Per-command responses from a Batch command
136    BatchResponses(Vec<MetadataResponse>),
137    Error {
138        message: String,
139    },
140}
141
142/// Cluster metadata state (Raft state machine)
143#[derive(Debug, Clone, Serialize, Deserialize, Default)]
144pub struct ClusterMetadata {
145    /// Topics and their states
146    pub topics: HashMap<String, TopicState>,
147
148    /// Registered nodes
149    pub nodes: HashMap<NodeId, NodeInfo>,
150
151    /// Consumer groups
152    pub consumer_groups: HashMap<GroupId, ConsumerGroup>,
153
154    /// Cluster-wide configuration
155    pub config: HashMap<String, String>,
156
157    /// Last applied Raft index
158    pub last_applied_index: u64,
159
160    /// Cluster epoch (increments on significant changes)
161    pub epoch: u64,
162}
163
164impl ClusterMetadata {
165    /// Create new empty metadata
166    pub fn new() -> Self {
167        Self::default()
168    }
169
170    /// Apply a command to the state machine
171    pub fn apply(&mut self, index: u64, cmd: MetadataCommand) -> MetadataResponse {
172        self.last_applied_index = index;
173        self.apply_cmd(cmd)
174    }
175
176    /// Execute a single command without updating `last_applied_index`.
177    ///
178    /// Batch sub-commands must NOT go through `apply()` because
179    /// the entire batch shares a single Raft log index.  Calling `apply()`
180    /// recursively would redundantly re-set `last_applied_index` and, more
181    /// critically, a nested `Batch` inside a `Batch` would hide the shared-
182    /// index semantics.  By routing through `apply_cmd()` instead, the index
183    /// is set exactly once at the top-level `apply()` call.
184    fn apply_cmd(&mut self, cmd: MetadataCommand) -> MetadataResponse {
185        match cmd {
186            MetadataCommand::CreateTopic {
187                config,
188                partition_assignments,
189            } => self.create_topic(config, partition_assignments),
190            MetadataCommand::DeleteTopic { name } => self.delete_topic(&name),
191            MetadataCommand::UpdateTopicConfig { name, config } => {
192                self.update_topic_config(&name, config)
193            }
194            MetadataCommand::AddPartitions {
195                topic,
196                new_assignments,
197            } => self.add_partitions(&topic, new_assignments),
198            MetadataCommand::UpdatePartitionLeader {
199                partition,
200                leader,
201                epoch,
202            } => self.update_partition_leader(&partition, leader, epoch),
203            MetadataCommand::UpdatePartitionIsr {
204                partition,
205                isr,
206                leader_epoch,
207            } => self.update_partition_isr(&partition, isr, leader_epoch),
208            MetadataCommand::ReassignPartition {
209                partition,
210                replicas,
211            } => self.reassign_partition(&partition, replicas),
212            MetadataCommand::RegisterNode { info } => self.register_node(info),
213            MetadataCommand::DeregisterNode { node_id } => self.deregister_node(&node_id),
214            MetadataCommand::UpdateNode { node_id, info } => self.update_node(&node_id, info),
215            MetadataCommand::UpdateClusterConfig { config } => self.update_cluster_config(config),
216            MetadataCommand::UpdateConsumerGroup { group } => self.update_consumer_group(group),
217            MetadataCommand::DeleteConsumerGroup { group_id } => {
218                self.delete_consumer_group(&group_id)
219            }
220            MetadataCommand::Noop => MetadataResponse::Success,
221            MetadataCommand::Batch(commands) => {
222                // Apply commands individually within a single state machine transition.
223                // Best-effort: each command is applied independently — some may
224                // succeed while others fail. The entire batch is a single Raft
225                // log entry (atomic at the consensus level), but individual
226                // command failures do NOT roll back prior successes.
227                //
228                // Sub-commands are dispatched via `apply_cmd` (not
229                // `apply`) so that `last_applied_index` is set exactly once for
230                // the entire batch rather than redundantly per sub-command.
231                let mut responses = Vec::with_capacity(commands.len());
232                for cmd in commands {
233                    let resp = self.apply_cmd(cmd);
234                    responses.push(resp);
235                }
236                // Return per-command responses so callers can inspect each result
237                MetadataResponse::BatchResponses(responses)
238            }
239        }
240    }
241
242    /// Create a new topic
243    fn create_topic(
244        &mut self,
245        config: TopicConfig,
246        partition_assignments: Vec<Vec<NodeId>>,
247    ) -> MetadataResponse {
248        // Validate topic name
249        if let Err(msg) = Self::validate_topic_name(&config.name) {
250            return MetadataResponse::Error { message: msg };
251        }
252        if self.topics.contains_key(&config.name) {
253            return MetadataResponse::Error {
254                message: format!("Topic {} already exists", config.name),
255            };
256        }
257
258        let name = config.name.clone();
259        let partitions = config.partitions;
260        let topic_state = TopicState::new(config, partition_assignments);
261
262        self.topics.insert(name.clone(), topic_state);
263        self.epoch += 1;
264
265        MetadataResponse::TopicCreated { name, partitions }
266    }
267
268    /// Validate a topic name for safety and compatibility.
269    fn validate_topic_name(name: &str) -> std::result::Result<(), String> {
270        if name.is_empty() {
271            return Err("Topic name cannot be empty".into());
272        }
273        if name.len() > 249 {
274            return Err(format!(
275                "Topic name too long: {} chars (max 249)",
276                name.len()
277            ));
278        }
279        if name == "." || name == ".." {
280            return Err("Topic name cannot be '.' or '..'".into());
281        }
282        if name.contains(|c: char| c == '/' || c == '\\' || c == '\0' || c.is_control()) {
283            return Err("Topic name contains invalid characters".into());
284        }
285        Ok(())
286    }
287
288    /// Delete a topic
289    fn delete_topic(&mut self, name: &str) -> MetadataResponse {
290        if self.topics.remove(name).is_some() {
291            self.epoch += 1;
292            MetadataResponse::TopicDeleted {
293                name: name.to_string(),
294            }
295        } else {
296            MetadataResponse::Error {
297                message: format!("Topic {} not found", name),
298            }
299        }
300    }
301
302    /// Update topic configuration
303    fn update_topic_config(
304        &mut self,
305        name: &str,
306        config: HashMap<String, String>,
307    ) -> MetadataResponse {
308        if let Some(topic) = self.topics.get_mut(name) {
309            topic.config.config.extend(config);
310            MetadataResponse::Success
311        } else {
312            MetadataResponse::Error {
313                message: format!("Topic {} not found", name),
314            }
315        }
316    }
317
318    /// Add partitions to a topic
319    fn add_partitions(
320        &mut self,
321        topic: &str,
322        new_assignments: Vec<Vec<NodeId>>,
323    ) -> MetadataResponse {
324        if let Some(topic_state) = self.topics.get_mut(topic) {
325            let start_idx = topic_state.partitions.len() as u32;
326
327            for (i, replicas) in new_assignments.into_iter().enumerate() {
328                let partition_id = PartitionId::new(topic, start_idx + i as u32);
329                let mut partition_state = PartitionState::new(partition_id, replicas);
330                partition_state.elect_leader();
331                topic_state.partitions.push(partition_state);
332            }
333
334            topic_state.config.partitions = topic_state.partitions.len() as u32;
335            self.epoch += 1;
336
337            MetadataResponse::Success
338        } else {
339            MetadataResponse::Error {
340                message: format!("Topic {} not found", topic),
341            }
342        }
343    }
344
345    /// Update partition leader
346    fn update_partition_leader(
347        &mut self,
348        partition: &PartitionId,
349        leader: NodeId,
350        epoch: u64,
351    ) -> MetadataResponse {
352        if let Some(topic) = self.topics.get_mut(&partition.topic) {
353            if let Some(p) = topic.partition_mut(partition.partition) {
354                // Only update if epoch is higher
355                if epoch > p.leader_epoch {
356                    p.leader = Some(leader.clone());
357                    p.leader_epoch = epoch;
358                    p.online = true;
359                    return MetadataResponse::PartitionLeaderUpdated {
360                        partition: partition.clone(),
361                        leader,
362                    };
363                }
364            }
365        }
366
367        MetadataResponse::Error {
368            message: format!("Partition {} not found or stale epoch", partition),
369        }
370    }
371
372    /// Update partition ISR (epoch-fenced)
373    fn update_partition_isr(
374        &mut self,
375        partition: &PartitionId,
376        isr: Vec<NodeId>,
377        leader_epoch: u64,
378    ) -> MetadataResponse {
379        if let Some(topic) = self.topics.get_mut(&partition.topic) {
380            if let Some(p) = topic.partition_mut(partition.partition) {
381                // reject stale-epoch ISR updates to prevent split-brain
382                if leader_epoch < p.leader_epoch {
383                    return MetadataResponse::Error {
384                        message: format!(
385                            "Stale leader epoch {} < current {} for partition {}",
386                            leader_epoch, p.leader_epoch, partition
387                        ),
388                    };
389                }
390
391                p.isr = isr.iter().cloned().collect();
392                p.under_replicated = p.isr.len() < p.replicas.len();
393
394                // Update replica states
395                for replica in &mut p.replicas {
396                    if isr.contains(&replica.node_id) {
397                        replica.state = crate::partition::ReplicaState::InSync;
398                    } else {
399                        replica.state = crate::partition::ReplicaState::CatchingUp;
400                    }
401                }
402
403                return MetadataResponse::IsrUpdated {
404                    partition: partition.clone(),
405                    isr,
406                };
407            }
408        }
409
410        MetadataResponse::Error {
411            message: format!("Partition {} not found", partition),
412        }
413    }
414
415    /// Reassign partition replicas
416    fn reassign_partition(
417        &mut self,
418        partition: &PartitionId,
419        replicas: Vec<NodeId>,
420    ) -> MetadataResponse {
421        if let Some(topic) = self.topics.get_mut(&partition.topic) {
422            if let Some(p) = topic.partition_mut(partition.partition) {
423                // Create new replica info
424                p.replicas = replicas
425                    .iter()
426                    .map(|n| crate::partition::ReplicaInfo::new(n.clone()))
427                    .collect();
428
429                // Reset ISR to leader only if leader IS in the new
430                // replica set. Otherwise, ISR starts empty and the first replica
431                // to catch up will join. This prevents a removed leader from
432                // lingering in ISR as a non-replica.
433                p.isr.clear();
434                if let Some(leader) = &p.leader {
435                    if replicas.iter().any(|r| r == leader) {
436                        p.isr.insert(leader.clone());
437                    } else {
438                        // Leader is not in new replica set — elect first new replica
439                        p.leader = replicas.first().cloned();
440                        if let Some(new_leader) = &p.leader {
441                            p.isr.insert(new_leader.clone());
442                        }
443                    }
444                }
445
446                p.under_replicated = true;
447                self.epoch += 1;
448
449                return MetadataResponse::Success;
450            }
451        }
452
453        MetadataResponse::Error {
454            message: format!("Partition {} not found", partition),
455        }
456    }
457
458    /// Register a new node
459    fn register_node(&mut self, info: NodeInfo) -> MetadataResponse {
460        let node_id = info.id.clone();
461        self.nodes.insert(node_id.clone(), info);
462        self.epoch += 1;
463        MetadataResponse::NodeRegistered { node_id }
464    }
465
466    /// Deregister a node
467    fn deregister_node(&mut self, node_id: &NodeId) -> MetadataResponse {
468        if self.nodes.remove(node_id).is_some() {
469            self.epoch += 1;
470            MetadataResponse::NodeDeregistered {
471                node_id: node_id.clone(),
472            }
473        } else {
474            MetadataResponse::Error {
475                message: format!("Node {} not found", node_id),
476            }
477        }
478    }
479
480    /// Update node information
481    fn update_node(&mut self, node_id: &NodeId, info: NodeInfo) -> MetadataResponse {
482        if self.nodes.contains_key(node_id) {
483            self.nodes.insert(node_id.clone(), info);
484            MetadataResponse::Success
485        } else {
486            MetadataResponse::Error {
487                message: format!("Node {} not found", node_id),
488            }
489        }
490    }
491
492    /// Update cluster configuration
493    fn update_cluster_config(&mut self, config: HashMap<String, String>) -> MetadataResponse {
494        self.config.extend(config);
495        self.epoch += 1;
496        MetadataResponse::Success
497    }
498
499    /// Update consumer group state
500    fn update_consumer_group(&mut self, group: ConsumerGroup) -> MetadataResponse {
501        let group_id = group.group_id.clone();
502        self.consumer_groups.insert(group_id.clone(), group);
503        MetadataResponse::ConsumerGroupUpdated { group_id }
504    }
505
506    /// Delete consumer group
507    fn delete_consumer_group(&mut self, group_id: &GroupId) -> MetadataResponse {
508        if self.consumer_groups.remove(group_id).is_some() {
509            MetadataResponse::ConsumerGroupDeleted {
510                group_id: group_id.clone(),
511            }
512        } else {
513            MetadataResponse::Error {
514                message: format!("Consumer group {} not found", group_id),
515            }
516        }
517    }
518
519    // ==================== Query Methods ====================
520
521    /// Get topic by name
522    pub fn get_topic(&self, name: &str) -> Option<&TopicState> {
523        self.topics.get(name)
524    }
525
526    /// Get partition by ID
527    pub fn get_partition(&self, id: &PartitionId) -> Option<&PartitionState> {
528        self.topics.get(&id.topic)?.partition(id.partition)
529    }
530
531    /// Get node by ID
532    pub fn get_node(&self, node_id: &NodeId) -> Option<&NodeInfo> {
533        self.nodes.get(node_id)
534    }
535
536    /// Get all topic names
537    pub fn topic_names(&self) -> Vec<&str> {
538        self.topics.keys().map(|s| s.as_str()).collect()
539    }
540
541    /// Get all node IDs
542    pub fn node_ids(&self) -> Vec<&NodeId> {
543        self.nodes.keys().collect()
544    }
545
546    /// Get consumer group by ID
547    pub fn get_consumer_group(&self, group_id: &GroupId) -> Option<&ConsumerGroup> {
548        self.consumer_groups.get(group_id)
549    }
550
551    /// Get all consumer group IDs
552    pub fn consumer_group_ids(&self) -> Vec<&GroupId> {
553        self.consumer_groups.keys().collect()
554    }
555
556    /// Find partition leader
557    pub fn find_leader(&self, topic: &str, partition: u32) -> Option<&NodeId> {
558        self.topics
559            .get(topic)?
560            .partition(partition)?
561            .leader
562            .as_ref()
563    }
564
565    /// Get partitions led by a node
566    pub fn partitions_led_by(&self, node_id: &NodeId) -> Vec<PartitionId> {
567        let mut result = vec![];
568        for (topic_name, topic) in &self.topics {
569            for (i, partition) in topic.partitions.iter().enumerate() {
570                if partition.leader.as_ref() == Some(node_id) {
571                    result.push(PartitionId::new(topic_name, i as u32));
572                }
573            }
574        }
575        result
576    }
577
578    /// Get partitions hosted by a node (leader or replica)
579    pub fn partitions_on_node(&self, node_id: &NodeId) -> Vec<PartitionId> {
580        let mut result = vec![];
581        for (topic_name, topic) in &self.topics {
582            for (i, partition) in topic.partitions.iter().enumerate() {
583                if partition.is_replica(node_id) {
584                    result.push(PartitionId::new(topic_name, i as u32));
585                }
586            }
587        }
588        result
589    }
590
591    /// Get under-replicated partitions
592    pub fn under_replicated_partitions(&self) -> Vec<PartitionId> {
593        let mut result = vec![];
594        for (topic_name, topic) in &self.topics {
595            for (i, partition) in topic.partitions.iter().enumerate() {
596                if partition.under_replicated {
597                    result.push(PartitionId::new(topic_name, i as u32));
598                }
599            }
600        }
601        result
602    }
603
604    /// Get offline partitions
605    pub fn offline_partitions(&self) -> Vec<PartitionId> {
606        let mut result = vec![];
607        for (topic_name, topic) in &self.topics {
608            for (i, partition) in topic.partitions.iter().enumerate() {
609                if !partition.online {
610                    result.push(PartitionId::new(topic_name, i as u32));
611                }
612            }
613        }
614        result
615    }
616
617    /// Serialize for snapshot
618    pub fn serialize(&self) -> Result<Vec<u8>> {
619        postcard::to_allocvec(self).map_err(|e| ClusterError::Serialization(e.to_string()))
620    }
621
622    /// Deserialize from snapshot
623    pub fn deserialize(data: &[u8]) -> Result<Self> {
624        postcard::from_bytes(data).map_err(|e| ClusterError::Deserialization(e.to_string()))
625    }
626}
627
628/// Thread-safe metadata store wrapper
629pub struct MetadataStore {
630    metadata: Arc<RwLock<ClusterMetadata>>,
631}
632
633impl MetadataStore {
634    pub fn new() -> Self {
635        Self {
636            metadata: Arc::new(RwLock::new(ClusterMetadata::new())),
637        }
638    }
639
640    pub fn from_metadata(metadata: ClusterMetadata) -> Self {
641        Self {
642            metadata: Arc::new(RwLock::new(metadata)),
643        }
644    }
645
646    pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, ClusterMetadata> {
647        self.metadata.read().await
648    }
649
650    pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, ClusterMetadata> {
651        self.metadata.write().await
652    }
653
654    pub async fn apply(&self, index: u64, cmd: MetadataCommand) -> MetadataResponse {
655        self.metadata.write().await.apply(index, cmd)
656    }
657
658    pub async fn get_topic(&self, name: &str) -> Option<TopicState> {
659        self.metadata.read().await.topics.get(name).cloned()
660    }
661
662    pub async fn get_partition(&self, id: &PartitionId) -> Option<PartitionState> {
663        let meta = self.metadata.read().await;
664        meta.topics.get(&id.topic)?.partition(id.partition).cloned()
665    }
666
667    pub async fn epoch(&self) -> u64 {
668        self.metadata.read().await.epoch
669    }
670}
671
672impl Default for MetadataStore {
673    fn default() -> Self {
674        Self::new()
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use super::*;
681
682    #[test]
683    fn test_create_topic() {
684        let mut metadata = ClusterMetadata::new();
685
686        let config = TopicConfig::new("test-topic", 3, 2);
687        let assignments = vec![
688            vec!["node-1".to_string(), "node-2".to_string()],
689            vec!["node-2".to_string(), "node-3".to_string()],
690            vec!["node-3".to_string(), "node-1".to_string()],
691        ];
692
693        let cmd = MetadataCommand::CreateTopic {
694            config,
695            partition_assignments: assignments,
696        };
697        let result = metadata.apply(1, cmd);
698
699        match result {
700            MetadataResponse::TopicCreated { name, partitions } => {
701                assert_eq!(name, "test-topic");
702                assert_eq!(partitions, 3);
703            }
704            _ => panic!("Expected TopicCreated"),
705        }
706
707        assert!(metadata.topics.contains_key("test-topic"));
708        assert_eq!(metadata.topics["test-topic"].partitions.len(), 3);
709    }
710
711    #[test]
712    fn test_update_partition_leader() {
713        let mut metadata = ClusterMetadata::new();
714
715        // Create topic first
716        let config = TopicConfig::new("test", 1, 2);
717        let assignments = vec![vec!["node-1".to_string(), "node-2".to_string()]];
718        metadata.apply(
719            1,
720            MetadataCommand::CreateTopic {
721                config,
722                partition_assignments: assignments,
723            },
724        );
725
726        // Initial leader was elected in TopicState::new with epoch 1
727        // Update leader with higher epoch
728        let partition = PartitionId::new("test", 0);
729        let result = metadata.apply(
730            2,
731            MetadataCommand::UpdatePartitionLeader {
732                partition: partition.clone(),
733                leader: "node-2".to_string(),
734                epoch: 2, // Must be higher than current epoch (1)
735            },
736        );
737
738        assert!(matches!(
739            result,
740            MetadataResponse::PartitionLeaderUpdated { .. }
741        ));
742        assert_eq!(
743            metadata.topics["test"].partition(0).unwrap().leader,
744            Some("node-2".to_string())
745        );
746    }
747
748    #[test]
749    fn test_register_deregister_node() {
750        let mut metadata = ClusterMetadata::new();
751
752        let info = NodeInfo::new(
753            "node-1",
754            "127.0.0.1:9092".parse().unwrap(),
755            "127.0.0.1:9093".parse().unwrap(),
756        );
757
758        // Register
759        let result = metadata.apply(1, MetadataCommand::RegisterNode { info });
760        assert!(matches!(result, MetadataResponse::NodeRegistered { .. }));
761        assert!(metadata.nodes.contains_key("node-1"));
762
763        // Deregister
764        let result = metadata.apply(
765            2,
766            MetadataCommand::DeregisterNode {
767                node_id: "node-1".to_string(),
768            },
769        );
770        assert!(matches!(result, MetadataResponse::NodeDeregistered { .. }));
771        assert!(!metadata.nodes.contains_key("node-1"));
772    }
773
774    #[test]
775    fn test_serialization() {
776        let mut metadata = ClusterMetadata::new();
777        metadata.apply(
778            1,
779            MetadataCommand::CreateTopic {
780                config: TopicConfig::new("test", 2, 1),
781                partition_assignments: vec![vec!["node-1".to_string()], vec!["node-1".to_string()]],
782            },
783        );
784
785        let bytes = metadata.serialize().unwrap();
786        let restored = ClusterMetadata::deserialize(&bytes).unwrap();
787
788        assert_eq!(restored.topics.len(), metadata.topics.len());
789        assert_eq!(restored.last_applied_index, metadata.last_applied_index);
790    }
791}