1use std::collections::HashMap;
15
16use serde::{Deserialize, Serialize};
17use tracing::{debug, info};
18
19pub const METADATA_GROUP_ID: u64 = 0;
22
23#[derive(
25 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
26)]
27pub enum MetadataEntry {
28 RoutingUpdate {
30 vshard_id: u16,
31 new_node_id: u64,
32 new_group_id: u64,
33 },
34 CollectionDdl {
36 tenant_id: u32,
37 collection: String,
38 action: DdlAction,
39 },
40 SecurityChange {
42 tenant_id: u32,
43 change: SecurityChangeType,
44 },
45 MembershipChange {
47 node_id: u64,
48 action: MembershipAction,
49 },
50}
51
52#[derive(
53 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
54)]
55pub enum DdlAction {
56 Create { fields: Vec<(String, String)> },
57 Drop,
58 AlterAddField { name: String, field_type: String },
59}
60
61#[derive(
62 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
63)]
64pub enum SecurityChangeType {
65 CreateUser { username: String },
66 DropUser { username: String },
67 GrantPermission { role: String, resource: String },
68 RevokePermission { role: String, resource: String },
69}
70
71#[derive(
72 Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
73)]
74pub enum MembershipAction {
75 Join { addr: String },
76 Leave,
77 Decommission,
78 PromoteToVoter,
79}
80
81#[derive(Debug, Clone, Default)]
86pub struct MetadataCache {
87 pub applied_index: u64,
89 pub routing_version: u64,
91 pub collections: HashMap<(u32, String), Vec<(String, String)>>,
93 pub members: HashMap<u64, String>,
95}
96
97impl MetadataCache {
98 pub fn new() -> Self {
99 Self::default()
100 }
101
102 pub fn apply(&mut self, index: u64, entry: &MetadataEntry) {
107 self.applied_index = index;
108 match entry {
109 MetadataEntry::RoutingUpdate {
110 vshard_id,
111 new_node_id,
112 ..
113 } => {
114 debug!(
115 vshard_id,
116 new_node_id, index, "metadata: routing update applied"
117 );
118 self.routing_version += 1;
119 }
120 MetadataEntry::CollectionDdl {
121 tenant_id,
122 collection,
123 action,
124 } => match action {
125 DdlAction::Create { fields } => {
126 self.collections
127 .insert((*tenant_id, collection.clone()), fields.clone());
128 info!(tenant_id, collection, index, "metadata: collection created");
129 }
130 DdlAction::Drop => {
131 self.collections.remove(&(*tenant_id, collection.clone()));
132 info!(tenant_id, collection, index, "metadata: collection dropped");
133 }
134 DdlAction::AlterAddField { name, field_type } => {
135 if let Some(fields) =
136 self.collections.get_mut(&(*tenant_id, collection.clone()))
137 {
138 fields.push((name.clone(), field_type.clone()));
139 }
140 }
141 },
142 MetadataEntry::SecurityChange { tenant_id, change } => {
143 debug!(tenant_id, ?change, index, "metadata: security change");
144 }
145 MetadataEntry::MembershipChange { node_id, action } => match action {
146 MembershipAction::Join { addr } => {
147 self.members.insert(*node_id, addr.clone());
148 info!(node_id, addr, "metadata: node joined");
149 }
150 MembershipAction::Leave | MembershipAction::Decommission => {
151 self.members.remove(node_id);
152 info!(node_id, "metadata: node left");
153 }
154 MembershipAction::PromoteToVoter => {
155 debug!(node_id, "metadata: node promoted to voter");
156 }
157 },
158 }
159 }
160
161 pub fn serialize_entry(entry: &MetadataEntry) -> crate::Result<Vec<u8>> {
163 zerompk::to_msgpack_vec(entry).map_err(|e| crate::ClusterError::Codec {
164 detail: format!("metadata serialize: {e}"),
165 })
166 }
167
168 pub fn deserialize_entry(data: &[u8]) -> crate::Result<MetadataEntry> {
170 zerompk::from_msgpack(data).map_err(|e| crate::ClusterError::Codec {
171 detail: format!("metadata deserialize: {e}"),
172 })
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179
180 #[test]
181 fn apply_routing_update() {
182 let mut cache = MetadataCache::new();
183 let entry = MetadataEntry::RoutingUpdate {
184 vshard_id: 42,
185 new_node_id: 3,
186 new_group_id: 10,
187 };
188 cache.apply(1, &entry);
189 assert_eq!(cache.applied_index, 1);
190 assert_eq!(cache.routing_version, 1);
191 }
192
193 #[test]
194 fn apply_collection_ddl() {
195 let mut cache = MetadataCache::new();
196 cache.apply(
197 1,
198 &MetadataEntry::CollectionDdl {
199 tenant_id: 1,
200 collection: "users".into(),
201 action: DdlAction::Create {
202 fields: vec![("name".into(), "VARCHAR".into())],
203 },
204 },
205 );
206 assert!(cache.collections.contains_key(&(1, "users".into())));
207
208 cache.apply(
209 2,
210 &MetadataEntry::CollectionDdl {
211 tenant_id: 1,
212 collection: "users".into(),
213 action: DdlAction::Drop,
214 },
215 );
216 assert!(!cache.collections.contains_key(&(1, "users".into())));
217 }
218
219 #[test]
220 fn apply_membership() {
221 let mut cache = MetadataCache::new();
222 cache.apply(
223 1,
224 &MetadataEntry::MembershipChange {
225 node_id: 5,
226 action: MembershipAction::Join {
227 addr: "10.0.0.5:9000".into(),
228 },
229 },
230 );
231 assert_eq!(cache.members.get(&5), Some(&"10.0.0.5:9000".to_string()));
232
233 cache.apply(
234 2,
235 &MetadataEntry::MembershipChange {
236 node_id: 5,
237 action: MembershipAction::Decommission,
238 },
239 );
240 assert!(!cache.members.contains_key(&5));
241 }
242
243 #[test]
244 fn serialize_roundtrip() {
245 let entry = MetadataEntry::RoutingUpdate {
246 vshard_id: 100,
247 new_node_id: 2,
248 new_group_id: 50,
249 };
250 let bytes = MetadataCache::serialize_entry(&entry).unwrap();
251 let decoded = MetadataCache::deserialize_entry(&bytes).unwrap();
252 match decoded {
253 MetadataEntry::RoutingUpdate {
254 vshard_id,
255 new_node_id,
256 ..
257 } => {
258 assert_eq!(vshard_id, 100);
259 assert_eq!(new_node_id, 2);
260 }
261 _ => panic!("wrong variant"),
262 }
263 }
264}