Skip to main content

nodedb_cluster/
metadata_group.rs

1//! Replicated metadata Raft group for cluster-wide state.
2//!
3//! A dedicated Raft group (ID = 0, separate from the 1024 data vShards)
4//! that replicates cluster metadata across all nodes:
5//!
6//! - RoutingTable (vShard → node mapping)
7//! - DDL schemas (collection definitions, index declarations)
8//! - Security catalog (users, roles, permissions)
9//!
10//! All nodes participate in this group. The leader serves authoritative
11//! reads; followers cache locally and subscribe to the leader's change
12//! stream for updates.
13
14use std::collections::HashMap;
15
16use serde::{Deserialize, Serialize};
17use tracing::{debug, info};
18
19/// Well-known Raft group ID for the metadata group.
20/// Distinct from data vShard groups (which start at group 1).
21pub const METADATA_GROUP_ID: u64 = 0;
22
23/// Types of metadata entries replicated via the metadata Raft group.
24#[derive(
25    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
26)]
27pub enum MetadataEntry {
28    /// Routing table update (vShard assignment change).
29    RoutingUpdate {
30        vshard_id: u16,
31        new_node_id: u64,
32        new_group_id: u64,
33    },
34    /// Collection DDL (create/drop/alter).
35    CollectionDdl {
36        tenant_id: u32,
37        collection: String,
38        action: DdlAction,
39    },
40    /// Security policy change (user/role/permission).
41    SecurityChange {
42        tenant_id: u32,
43        change: SecurityChangeType,
44    },
45    /// Node membership change (join/leave/decommission).
46    MembershipChange {
47        node_id: u64,
48        action: MembershipAction,
49    },
50}
51
52#[derive(
53    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
54)]
55pub enum DdlAction {
56    Create { fields: Vec<(String, String)> },
57    Drop,
58    AlterAddField { name: String, field_type: String },
59}
60
61#[derive(
62    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
63)]
64pub enum SecurityChangeType {
65    CreateUser { username: String },
66    DropUser { username: String },
67    GrantPermission { role: String, resource: String },
68    RevokePermission { role: String, resource: String },
69}
70
71#[derive(
72    Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
73)]
74pub enum MembershipAction {
75    Join { addr: String },
76    Leave,
77    Decommission,
78    PromoteToVoter,
79}
80
81/// Local cache of the metadata state, updated from the Raft log.
82///
83/// Each node maintains this cache. The leader writes to it; followers
84/// apply committed entries from the Raft log to stay consistent.
85#[derive(Debug, Clone, Default)]
86pub struct MetadataCache {
87    /// Applied log index — entries up to this index are reflected in cache.
88    pub applied_index: u64,
89    /// Cached routing table version.
90    pub routing_version: u64,
91    /// Collection schemas: `(tenant_id, collection_name)` → field definitions.
92    pub collections: HashMap<(u32, String), Vec<(String, String)>>,
93    /// Node membership: `node_id` → address.
94    pub members: HashMap<u64, String>,
95}
96
97impl MetadataCache {
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Apply a metadata entry from the Raft log.
103    ///
104    /// Called by the Raft commit applier when a metadata entry is committed.
105    /// Updates the local cache to reflect the change.
106    pub fn apply(&mut self, index: u64, entry: &MetadataEntry) {
107        self.applied_index = index;
108        match entry {
109            MetadataEntry::RoutingUpdate {
110                vshard_id,
111                new_node_id,
112                ..
113            } => {
114                debug!(
115                    vshard_id,
116                    new_node_id, index, "metadata: routing update applied"
117                );
118                self.routing_version += 1;
119            }
120            MetadataEntry::CollectionDdl {
121                tenant_id,
122                collection,
123                action,
124            } => match action {
125                DdlAction::Create { fields } => {
126                    self.collections
127                        .insert((*tenant_id, collection.clone()), fields.clone());
128                    info!(tenant_id, collection, index, "metadata: collection created");
129                }
130                DdlAction::Drop => {
131                    self.collections.remove(&(*tenant_id, collection.clone()));
132                    info!(tenant_id, collection, index, "metadata: collection dropped");
133                }
134                DdlAction::AlterAddField { name, field_type } => {
135                    if let Some(fields) =
136                        self.collections.get_mut(&(*tenant_id, collection.clone()))
137                    {
138                        fields.push((name.clone(), field_type.clone()));
139                    }
140                }
141            },
142            MetadataEntry::SecurityChange { tenant_id, change } => {
143                debug!(tenant_id, ?change, index, "metadata: security change");
144            }
145            MetadataEntry::MembershipChange { node_id, action } => match action {
146                MembershipAction::Join { addr } => {
147                    self.members.insert(*node_id, addr.clone());
148                    info!(node_id, addr, "metadata: node joined");
149                }
150                MembershipAction::Leave | MembershipAction::Decommission => {
151                    self.members.remove(node_id);
152                    info!(node_id, "metadata: node left");
153                }
154                MembershipAction::PromoteToVoter => {
155                    debug!(node_id, "metadata: node promoted to voter");
156                }
157            },
158        }
159    }
160
161    /// Serialize a metadata entry for Raft proposal.
162    pub fn serialize_entry(entry: &MetadataEntry) -> crate::Result<Vec<u8>> {
163        zerompk::to_msgpack_vec(entry).map_err(|e| crate::ClusterError::Codec {
164            detail: format!("metadata serialize: {e}"),
165        })
166    }
167
168    /// Deserialize a metadata entry from Raft log data.
169    pub fn deserialize_entry(data: &[u8]) -> crate::Result<MetadataEntry> {
170        zerompk::from_msgpack(data).map_err(|e| crate::ClusterError::Codec {
171            detail: format!("metadata deserialize: {e}"),
172        })
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179
180    #[test]
181    fn apply_routing_update() {
182        let mut cache = MetadataCache::new();
183        let entry = MetadataEntry::RoutingUpdate {
184            vshard_id: 42,
185            new_node_id: 3,
186            new_group_id: 10,
187        };
188        cache.apply(1, &entry);
189        assert_eq!(cache.applied_index, 1);
190        assert_eq!(cache.routing_version, 1);
191    }
192
193    #[test]
194    fn apply_collection_ddl() {
195        let mut cache = MetadataCache::new();
196        cache.apply(
197            1,
198            &MetadataEntry::CollectionDdl {
199                tenant_id: 1,
200                collection: "users".into(),
201                action: DdlAction::Create {
202                    fields: vec![("name".into(), "VARCHAR".into())],
203                },
204            },
205        );
206        assert!(cache.collections.contains_key(&(1, "users".into())));
207
208        cache.apply(
209            2,
210            &MetadataEntry::CollectionDdl {
211                tenant_id: 1,
212                collection: "users".into(),
213                action: DdlAction::Drop,
214            },
215        );
216        assert!(!cache.collections.contains_key(&(1, "users".into())));
217    }
218
219    #[test]
220    fn apply_membership() {
221        let mut cache = MetadataCache::new();
222        cache.apply(
223            1,
224            &MetadataEntry::MembershipChange {
225                node_id: 5,
226                action: MembershipAction::Join {
227                    addr: "10.0.0.5:9000".into(),
228                },
229            },
230        );
231        assert_eq!(cache.members.get(&5), Some(&"10.0.0.5:9000".to_string()));
232
233        cache.apply(
234            2,
235            &MetadataEntry::MembershipChange {
236                node_id: 5,
237                action: MembershipAction::Decommission,
238            },
239        );
240        assert!(!cache.members.contains_key(&5));
241    }
242
243    #[test]
244    fn serialize_roundtrip() {
245        let entry = MetadataEntry::RoutingUpdate {
246            vshard_id: 100,
247            new_node_id: 2,
248            new_group_id: 50,
249        };
250        let bytes = MetadataCache::serialize_entry(&entry).unwrap();
251        let decoded = MetadataCache::deserialize_entry(&bytes).unwrap();
252        match decoded {
253            MetadataEntry::RoutingUpdate {
254                vshard_id,
255                new_node_id,
256                ..
257            } => {
258                assert_eq!(vshard_id, 100);
259                assert_eq!(new_node_id, 2);
260            }
261            _ => panic!("wrong variant"),
262        }
263    }
264}