1use crate::error::{RaftError, RaftResult};
33use crate::placement::PlacementAction;
34use crate::shard::ShardId;
35use crate::types::NodeId;
36
37pub const TAG_DATA_PUT: u8 = 0x01;
41pub const TAG_DATA_DELETE: u8 = 0x02;
43pub const TAG_PLACE_SPLIT: u8 = 0x10;
45pub const TAG_PLACE_MERGE: u8 = 0x11;
47pub const TAG_PLACE_TRANSFER: u8 = 0x12;
49pub const TAG_MEMBERSHIP_ADD: u8 = 0x20;
51pub const TAG_MEMBERSHIP_REMOVE: u8 = 0x21;
53
54#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
63pub enum ClusterCommand {
64 DataPut {
66 key: Vec<u8>,
68 value: Vec<u8>,
70 },
71 DataDelete {
73 key: Vec<u8>,
75 },
76 PlaceSplit {
78 shard_id: ShardId,
80 split_key: Vec<u8>,
82 },
83 PlaceMerge {
85 left_shard_id: ShardId,
87 right_shard_id: ShardId,
89 },
90 PlaceTransfer {
92 shard_id: ShardId,
94 from_node: NodeId,
96 to_node: NodeId,
98 },
99 MembershipAdd {
101 node_id: NodeId,
103 address: String,
105 },
106 MembershipRemove {
108 node_id: NodeId,
110 },
111}
112
113impl ClusterCommand {
114 pub fn tag(&self) -> u8 {
119 match self {
120 ClusterCommand::DataPut { .. } => TAG_DATA_PUT,
121 ClusterCommand::DataDelete { .. } => TAG_DATA_DELETE,
122 ClusterCommand::PlaceSplit { .. } => TAG_PLACE_SPLIT,
123 ClusterCommand::PlaceMerge { .. } => TAG_PLACE_MERGE,
124 ClusterCommand::PlaceTransfer { .. } => TAG_PLACE_TRANSFER,
125 ClusterCommand::MembershipAdd { .. } => TAG_MEMBERSHIP_ADD,
126 ClusterCommand::MembershipRemove { .. } => TAG_MEMBERSHIP_REMOVE,
127 }
128 }
129
130 pub fn encode(&self) -> Vec<u8> {
135 let tag = self.tag();
136 let json = serde_json::to_vec(self)
137 .expect("ClusterCommand serialization must not fail for well-formed data");
138 let mut out = Vec::with_capacity(1 + json.len());
139 out.push(tag);
140 out.extend_from_slice(&json);
141 out
142 }
143
144 pub fn decode(bytes: &[u8]) -> RaftResult<Self> {
149 let (&tag, json_tail) = bytes.split_first().ok_or_else(|| RaftError::Other {
150 message: "ClusterCommand::decode: empty byte slice".to_owned(),
151 })?;
152
153 match tag {
156 TAG_DATA_PUT
157 | TAG_DATA_DELETE
158 | TAG_PLACE_SPLIT
159 | TAG_PLACE_MERGE
160 | TAG_PLACE_TRANSFER
161 | TAG_MEMBERSHIP_ADD
162 | TAG_MEMBERSHIP_REMOVE => {}
163 other => {
164 return Err(RaftError::Other {
165 message: format!("ClusterCommand::decode: unknown tag byte 0x{:02x}", other),
166 });
167 }
168 }
169
170 let cmd: ClusterCommand =
171 serde_json::from_slice(json_tail).map_err(|e| RaftError::Other {
172 message: format!("ClusterCommand::decode: JSON deserialisation failed: {}", e),
173 })?;
174
175 if cmd.tag() != tag {
178 return Err(RaftError::Other {
179 message: format!(
180 "ClusterCommand::decode: tag mismatch — header byte 0x{:02x} but JSON \
181 deserialised to variant with tag 0x{:02x}",
182 tag,
183 cmd.tag(),
184 ),
185 });
186 }
187
188 Ok(cmd)
189 }
190
191 pub fn from_placement_action(action: &PlacementAction) -> Self {
199 match action {
200 PlacementAction::Split {
201 shard_id,
202 split_key,
203 } => ClusterCommand::PlaceSplit {
204 shard_id: *shard_id,
205 split_key: split_key.as_bytes().to_vec(),
207 },
208 PlacementAction::Merge {
209 left_shard_id,
210 right_shard_id,
211 } => ClusterCommand::PlaceMerge {
212 left_shard_id: *left_shard_id,
213 right_shard_id: *right_shard_id,
214 },
215 PlacementAction::Transfer {
216 shard_id,
217 from_node,
218 to_node,
219 } => ClusterCommand::PlaceTransfer {
220 shard_id: *shard_id,
221 from_node: *from_node,
222 to_node: *to_node,
223 },
224 }
225 }
226}
227
228impl TryFrom<&[u8]> for ClusterCommand {
231 type Error = RaftError;
232
233 fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
235 ClusterCommand::decode(bytes)
236 }
237}
238
239#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::shard::KeyRange;
245 use amaters_core::Key;
246
247 fn assert_round_trip(cmd: ClusterCommand) {
250 let encoded = cmd.encode();
251 assert_eq!(
253 encoded[0],
254 cmd.tag(),
255 "leading tag byte must match cmd.tag()"
256 );
257 let decoded = ClusterCommand::decode(&encoded)
259 .expect("decode must succeed for a freshly encoded command");
260 assert_eq!(cmd, decoded, "decoded command must equal original");
261 let via_try_from = ClusterCommand::try_from(encoded.as_slice())
263 .expect("TryFrom must succeed for a freshly encoded command");
264 assert_eq!(cmd, via_try_from, "TryFrom result must equal original");
265 }
266
267 #[test]
270 fn test_encode_decode_place_split() {
271 let cmd = ClusterCommand::PlaceSplit {
272 shard_id: 42,
273 split_key: vec![0x80, 0x00, 0xFF],
274 };
275 assert_round_trip(cmd);
276 }
277
278 #[test]
279 fn test_encode_decode_place_merge() {
280 let cmd = ClusterCommand::PlaceMerge {
281 left_shard_id: 7,
282 right_shard_id: 8,
283 };
284 assert_round_trip(cmd);
285 }
286
287 #[test]
288 fn test_encode_decode_place_transfer() {
289 let cmd = ClusterCommand::PlaceTransfer {
290 shard_id: 99,
291 from_node: 1,
292 to_node: 3,
293 };
294 assert_round_trip(cmd);
295 }
296
297 #[test]
298 fn test_encode_decode_data_put() {
299 let cmd = ClusterCommand::DataPut {
300 key: b"hello".to_vec(),
301 value: b"world".to_vec(),
302 };
303 assert_round_trip(cmd);
304 }
305
306 #[test]
307 fn test_encode_decode_data_delete() {
308 let cmd = ClusterCommand::DataDelete {
309 key: b"goodbye".to_vec(),
310 };
311 assert_round_trip(cmd);
312 }
313
314 #[test]
315 fn test_encode_decode_membership_add() {
316 let cmd = ClusterCommand::MembershipAdd {
317 node_id: 5,
318 address: "192.168.1.10:7878".to_owned(),
319 };
320 assert_round_trip(cmd);
321 }
322
323 #[test]
324 fn test_encode_decode_membership_remove() {
325 let cmd = ClusterCommand::MembershipRemove { node_id: 5 };
326 assert_round_trip(cmd);
327 }
328
329 #[test]
332 fn test_from_placement_action_all_variants() {
333 let split_key = Key::from_slice(&[0x80u8]);
335 let split_action = PlacementAction::Split {
336 shard_id: 11,
337 split_key: split_key.clone(),
338 };
339 let split_cmd = ClusterCommand::from_placement_action(&split_action);
340 assert_eq!(
341 split_cmd,
342 ClusterCommand::PlaceSplit {
343 shard_id: 11,
344 split_key: split_key.as_bytes().to_vec(),
345 },
346 "Split action must map to PlaceSplit with key bytes"
347 );
348 assert_round_trip(split_cmd);
350
351 let merge_action = PlacementAction::Merge {
353 left_shard_id: 3,
354 right_shard_id: 4,
355 };
356 let merge_cmd = ClusterCommand::from_placement_action(&merge_action);
357 assert_eq!(
358 merge_cmd,
359 ClusterCommand::PlaceMerge {
360 left_shard_id: 3,
361 right_shard_id: 4,
362 },
363 "Merge action must map to PlaceMerge"
364 );
365 assert_round_trip(merge_cmd);
366
367 let transfer_action = PlacementAction::Transfer {
369 shard_id: 17,
370 from_node: 2,
371 to_node: 5,
372 };
373 let transfer_cmd = ClusterCommand::from_placement_action(&transfer_action);
374 assert_eq!(
375 transfer_cmd,
376 ClusterCommand::PlaceTransfer {
377 shard_id: 17,
378 from_node: 2,
379 to_node: 5,
380 },
381 "Transfer action must map to PlaceTransfer"
382 );
383 assert_round_trip(transfer_cmd);
384 }
385
386 #[test]
389 fn test_decode_empty_bytes_is_error() {
390 let result = ClusterCommand::decode(&[]);
391 assert!(result.is_err(), "decoding empty bytes must return an error");
392 }
393
394 #[test]
395 fn test_decode_unknown_tag_is_error() {
396 let bytes = [0xFF, b'{', b'}'];
398 let result = ClusterCommand::decode(&bytes);
399 assert!(result.is_err(), "unknown tag byte must return an error");
400 }
401
402 #[test]
403 fn test_tag_bytes_are_unique() {
404 let range =
406 KeyRange::new(Key::from_slice(&[0u8]), Key::from_slice(&[255u8])).expect("valid range");
407 let _ = range; let variants: Vec<ClusterCommand> = vec![
410 ClusterCommand::DataPut {
411 key: vec![1],
412 value: vec![2],
413 },
414 ClusterCommand::DataDelete { key: vec![3] },
415 ClusterCommand::PlaceSplit {
416 shard_id: 1,
417 split_key: vec![0x80],
418 },
419 ClusterCommand::PlaceMerge {
420 left_shard_id: 1,
421 right_shard_id: 2,
422 },
423 ClusterCommand::PlaceTransfer {
424 shard_id: 1,
425 from_node: 1,
426 to_node: 2,
427 },
428 ClusterCommand::MembershipAdd {
429 node_id: 1,
430 address: "a:1".to_owned(),
431 },
432 ClusterCommand::MembershipRemove { node_id: 1 },
433 ];
434
435 let mut tags = std::collections::HashSet::new();
436 for v in &variants {
437 let inserted = tags.insert(v.tag());
438 assert!(
439 inserted,
440 "duplicate tag 0x{:02x} found for {:?}",
441 v.tag(),
442 v
443 );
444 }
445 assert_eq!(
446 tags.len(),
447 variants.len(),
448 "each variant must have a unique tag"
449 );
450 }
451}