use crate::error::{RaftError, RaftResult};
use crate::placement::PlacementAction;
use crate::shard::ShardId;
use crate::types::NodeId;
pub const TAG_DATA_PUT: u8 = 0x01;
pub const TAG_DATA_DELETE: u8 = 0x02;
pub const TAG_PLACE_SPLIT: u8 = 0x10;
pub const TAG_PLACE_MERGE: u8 = 0x11;
pub const TAG_PLACE_TRANSFER: u8 = 0x12;
pub const TAG_MEMBERSHIP_ADD: u8 = 0x20;
pub const TAG_MEMBERSHIP_REMOVE: u8 = 0x21;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ClusterCommand {
DataPut {
key: Vec<u8>,
value: Vec<u8>,
},
DataDelete {
key: Vec<u8>,
},
PlaceSplit {
shard_id: ShardId,
split_key: Vec<u8>,
},
PlaceMerge {
left_shard_id: ShardId,
right_shard_id: ShardId,
},
PlaceTransfer {
shard_id: ShardId,
from_node: NodeId,
to_node: NodeId,
},
MembershipAdd {
node_id: NodeId,
address: String,
},
MembershipRemove {
node_id: NodeId,
},
}
impl ClusterCommand {
pub fn tag(&self) -> u8 {
match self {
ClusterCommand::DataPut { .. } => TAG_DATA_PUT,
ClusterCommand::DataDelete { .. } => TAG_DATA_DELETE,
ClusterCommand::PlaceSplit { .. } => TAG_PLACE_SPLIT,
ClusterCommand::PlaceMerge { .. } => TAG_PLACE_MERGE,
ClusterCommand::PlaceTransfer { .. } => TAG_PLACE_TRANSFER,
ClusterCommand::MembershipAdd { .. } => TAG_MEMBERSHIP_ADD,
ClusterCommand::MembershipRemove { .. } => TAG_MEMBERSHIP_REMOVE,
}
}
pub fn encode(&self) -> Vec<u8> {
let tag = self.tag();
let json = serde_json::to_vec(self)
.expect("ClusterCommand serialization must not fail for well-formed data");
let mut out = Vec::with_capacity(1 + json.len());
out.push(tag);
out.extend_from_slice(&json);
out
}
pub fn decode(bytes: &[u8]) -> RaftResult<Self> {
let (&tag, json_tail) = bytes.split_first().ok_or_else(|| RaftError::Other {
message: "ClusterCommand::decode: empty byte slice".to_owned(),
})?;
match tag {
TAG_DATA_PUT
| TAG_DATA_DELETE
| TAG_PLACE_SPLIT
| TAG_PLACE_MERGE
| TAG_PLACE_TRANSFER
| TAG_MEMBERSHIP_ADD
| TAG_MEMBERSHIP_REMOVE => {}
other => {
return Err(RaftError::Other {
message: format!("ClusterCommand::decode: unknown tag byte 0x{:02x}", other),
});
}
}
let cmd: ClusterCommand =
serde_json::from_slice(json_tail).map_err(|e| RaftError::Other {
message: format!("ClusterCommand::decode: JSON deserialisation failed: {}", e),
})?;
if cmd.tag() != tag {
return Err(RaftError::Other {
message: format!(
"ClusterCommand::decode: tag mismatch — header byte 0x{:02x} but JSON \
deserialised to variant with tag 0x{:02x}",
tag,
cmd.tag(),
),
});
}
Ok(cmd)
}
pub fn from_placement_action(action: &PlacementAction) -> Self {
match action {
PlacementAction::Split {
shard_id,
split_key,
} => ClusterCommand::PlaceSplit {
shard_id: *shard_id,
split_key: split_key.as_bytes().to_vec(),
},
PlacementAction::Merge {
left_shard_id,
right_shard_id,
} => ClusterCommand::PlaceMerge {
left_shard_id: *left_shard_id,
right_shard_id: *right_shard_id,
},
PlacementAction::Transfer {
shard_id,
from_node,
to_node,
} => ClusterCommand::PlaceTransfer {
shard_id: *shard_id,
from_node: *from_node,
to_node: *to_node,
},
}
}
}
impl TryFrom<&[u8]> for ClusterCommand {
type Error = RaftError;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
ClusterCommand::decode(bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::shard::KeyRange;
use amaters_core::Key;
fn assert_round_trip(cmd: ClusterCommand) {
let encoded = cmd.encode();
assert_eq!(
encoded[0],
cmd.tag(),
"leading tag byte must match cmd.tag()"
);
let decoded = ClusterCommand::decode(&encoded)
.expect("decode must succeed for a freshly encoded command");
assert_eq!(cmd, decoded, "decoded command must equal original");
let via_try_from = ClusterCommand::try_from(encoded.as_slice())
.expect("TryFrom must succeed for a freshly encoded command");
assert_eq!(cmd, via_try_from, "TryFrom result must equal original");
}
#[test]
fn test_encode_decode_place_split() {
let cmd = ClusterCommand::PlaceSplit {
shard_id: 42,
split_key: vec![0x80, 0x00, 0xFF],
};
assert_round_trip(cmd);
}
#[test]
fn test_encode_decode_place_merge() {
let cmd = ClusterCommand::PlaceMerge {
left_shard_id: 7,
right_shard_id: 8,
};
assert_round_trip(cmd);
}
#[test]
fn test_encode_decode_place_transfer() {
let cmd = ClusterCommand::PlaceTransfer {
shard_id: 99,
from_node: 1,
to_node: 3,
};
assert_round_trip(cmd);
}
#[test]
fn test_encode_decode_data_put() {
let cmd = ClusterCommand::DataPut {
key: b"hello".to_vec(),
value: b"world".to_vec(),
};
assert_round_trip(cmd);
}
#[test]
fn test_encode_decode_data_delete() {
let cmd = ClusterCommand::DataDelete {
key: b"goodbye".to_vec(),
};
assert_round_trip(cmd);
}
#[test]
fn test_encode_decode_membership_add() {
let cmd = ClusterCommand::MembershipAdd {
node_id: 5,
address: "192.168.1.10:7878".to_owned(),
};
assert_round_trip(cmd);
}
#[test]
fn test_encode_decode_membership_remove() {
let cmd = ClusterCommand::MembershipRemove { node_id: 5 };
assert_round_trip(cmd);
}
#[test]
fn test_from_placement_action_all_variants() {
let split_key = Key::from_slice(&[0x80u8]);
let split_action = PlacementAction::Split {
shard_id: 11,
split_key: split_key.clone(),
};
let split_cmd = ClusterCommand::from_placement_action(&split_action);
assert_eq!(
split_cmd,
ClusterCommand::PlaceSplit {
shard_id: 11,
split_key: split_key.as_bytes().to_vec(),
},
"Split action must map to PlaceSplit with key bytes"
);
assert_round_trip(split_cmd);
let merge_action = PlacementAction::Merge {
left_shard_id: 3,
right_shard_id: 4,
};
let merge_cmd = ClusterCommand::from_placement_action(&merge_action);
assert_eq!(
merge_cmd,
ClusterCommand::PlaceMerge {
left_shard_id: 3,
right_shard_id: 4,
},
"Merge action must map to PlaceMerge"
);
assert_round_trip(merge_cmd);
let transfer_action = PlacementAction::Transfer {
shard_id: 17,
from_node: 2,
to_node: 5,
};
let transfer_cmd = ClusterCommand::from_placement_action(&transfer_action);
assert_eq!(
transfer_cmd,
ClusterCommand::PlaceTransfer {
shard_id: 17,
from_node: 2,
to_node: 5,
},
"Transfer action must map to PlaceTransfer"
);
assert_round_trip(transfer_cmd);
}
#[test]
fn test_decode_empty_bytes_is_error() {
let result = ClusterCommand::decode(&[]);
assert!(result.is_err(), "decoding empty bytes must return an error");
}
#[test]
fn test_decode_unknown_tag_is_error() {
let bytes = [0xFF, b'{', b'}'];
let result = ClusterCommand::decode(&bytes);
assert!(result.is_err(), "unknown tag byte must return an error");
}
#[test]
fn test_tag_bytes_are_unique() {
let range =
KeyRange::new(Key::from_slice(&[0u8]), Key::from_slice(&[255u8])).expect("valid range");
let _ = range;
let variants: Vec<ClusterCommand> = vec![
ClusterCommand::DataPut {
key: vec![1],
value: vec![2],
},
ClusterCommand::DataDelete { key: vec![3] },
ClusterCommand::PlaceSplit {
shard_id: 1,
split_key: vec![0x80],
},
ClusterCommand::PlaceMerge {
left_shard_id: 1,
right_shard_id: 2,
},
ClusterCommand::PlaceTransfer {
shard_id: 1,
from_node: 1,
to_node: 2,
},
ClusterCommand::MembershipAdd {
node_id: 1,
address: "a:1".to_owned(),
},
ClusterCommand::MembershipRemove { node_id: 1 },
];
let mut tags = std::collections::HashSet::new();
for v in &variants {
let inserted = tags.insert(v.tag());
assert!(
inserted,
"duplicate tag 0x{:02x} found for {:?}",
v.tag(),
v
);
}
assert_eq!(
tags.len(),
variants.len(),
"each variant must have a unique tag"
);
}
}