Skip to main content

amaters_cluster/
cluster_command.rs

1//! Typed cluster command encoding for Raft log entries.
2//!
3//! All commands written into the Raft log by AmateRS components (data writes,
4//! placement actions, membership changes) are encoded as a [`ClusterCommand`]
5//! and stored in `Command::data`.  The encoding is:
6//!
7//! ```text
8//! [tag: u8] [json: UTF-8 bytes]
9//! ```
10//!
11//! The tag byte allows O(1) dispatch without deserialising the payload.
12//!
13//! # Variant tags
14//!
15//! | Tag  | Variant                    |
16//! |------|---------------------------|
17//! | 0x01 | `DataPut`                 |
18//! | 0x02 | `DataDelete`              |
19//! | 0x10 | `PlaceSplit`              |
20//! | 0x11 | `PlaceMerge`              |
21//! | 0x12 | `PlaceTransfer`           |
22//! | 0x20 | `MembershipAdd`           |
23//! | 0x21 | `MembershipRemove`        |
24//!
25//! # State machine boundary
26//!
27//! This module is responsible for *encoding* commands into the Raft log only.
28//! The state machine that *applies* committed `PlaceSplit`, `PlaceMerge`, and
29//! `PlaceTransfer` entries — updating the [`crate::shard::ShardRegistry`] and
30//! migrating data — is a separate concern implemented in future phases.
31
32use crate::error::{RaftError, RaftResult};
33use crate::placement::PlacementAction;
34use crate::shard::ShardId;
35use crate::types::NodeId;
36
37// ── Tag constants ─────────────────────────────────────────────────────────────
38
39/// Raft log tag for a KV put (data plane).
40pub const TAG_DATA_PUT: u8 = 0x01;
41/// Raft log tag for a KV delete (data plane).
42pub const TAG_DATA_DELETE: u8 = 0x02;
43/// Raft log tag for a shard split action.
44pub const TAG_PLACE_SPLIT: u8 = 0x10;
45/// Raft log tag for a shard merge action.
46pub const TAG_PLACE_MERGE: u8 = 0x11;
47/// Raft log tag for a shard transfer action.
48pub const TAG_PLACE_TRANSFER: u8 = 0x12;
49/// Raft log tag for adding a cluster member.
50pub const TAG_MEMBERSHIP_ADD: u8 = 0x20;
51/// Raft log tag for removing a cluster member.
52pub const TAG_MEMBERSHIP_REMOVE: u8 = 0x21;
53
54// ── ClusterCommand ────────────────────────────────────────────────────────────
55
56/// A typed, serialisable view over the raw `Command::data` bytes in a Raft log
57/// entry.
58///
59/// Use [`ClusterCommand::encode`] to produce bytes suitable for
60/// [`crate::log::Command::new`], and [`ClusterCommand::decode`] (or the
61/// `TryFrom<&[u8]>` impl) to reconstruct the command on the receiver side.
62#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
63pub enum ClusterCommand {
64    /// A KV put (data plane, future use).
65    DataPut {
66        /// The key to insert or overwrite.
67        key: Vec<u8>,
68        /// The value to associate with the key.
69        value: Vec<u8>,
70    },
71    /// A KV delete (data plane, future use).
72    DataDelete {
73        /// The key to delete.
74        key: Vec<u8>,
75    },
76    /// Split a hot shard at `split_key`.
77    PlaceSplit {
78        /// The shard to split.
79        shard_id: ShardId,
80        /// The key at which to split, serialised as raw bytes.
81        split_key: Vec<u8>,
82    },
83    /// Merge two adjacent cold shards into one.
84    PlaceMerge {
85        /// The left (lower-range) shard.
86        left_shard_id: ShardId,
87        /// The right (higher-range) shard.
88        right_shard_id: ShardId,
89    },
90    /// Transfer a shard from one node to another to rebalance load.
91    PlaceTransfer {
92        /// The shard to move.
93        shard_id: ShardId,
94        /// The node currently hosting the shard.
95        from_node: NodeId,
96        /// The node that should receive the shard.
97        to_node: NodeId,
98    },
99    /// Add a cluster member (membership plane).
100    MembershipAdd {
101        /// The node ID to admit.
102        node_id: NodeId,
103        /// The network address of the new node.
104        address: String,
105    },
106    /// Remove a cluster member (membership plane).
107    MembershipRemove {
108        /// The node ID to evict.
109        node_id: NodeId,
110    },
111}
112
113impl ClusterCommand {
114    /// Return the single-byte tag that identifies this variant.
115    ///
116    /// The tag is stored as the first byte of the encoded form and enables
117    /// O(1) dispatch without deserialising the JSON payload.
118    pub fn tag(&self) -> u8 {
119        match self {
120            ClusterCommand::DataPut { .. } => TAG_DATA_PUT,
121            ClusterCommand::DataDelete { .. } => TAG_DATA_DELETE,
122            ClusterCommand::PlaceSplit { .. } => TAG_PLACE_SPLIT,
123            ClusterCommand::PlaceMerge { .. } => TAG_PLACE_MERGE,
124            ClusterCommand::PlaceTransfer { .. } => TAG_PLACE_TRANSFER,
125            ClusterCommand::MembershipAdd { .. } => TAG_MEMBERSHIP_ADD,
126            ClusterCommand::MembershipRemove { .. } => TAG_MEMBERSHIP_REMOVE,
127        }
128    }
129
130    /// Encode the command as `[tag_byte][json_bytes]`.
131    ///
132    /// The result is suitable for use as the `data` field of a
133    /// [`crate::log::Command`].
134    pub fn encode(&self) -> Vec<u8> {
135        let tag = self.tag();
136        let json = serde_json::to_vec(self)
137            .expect("ClusterCommand serialization must not fail for well-formed data");
138        let mut out = Vec::with_capacity(1 + json.len());
139        out.push(tag);
140        out.extend_from_slice(&json);
141        out
142    }
143
144    /// Decode a command from `[tag_byte][json_bytes]`.
145    ///
146    /// Returns an error if the byte slice is empty, the tag is unknown, or the
147    /// JSON body cannot be deserialised.
148    pub fn decode(bytes: &[u8]) -> RaftResult<Self> {
149        let (&tag, json_tail) = bytes.split_first().ok_or_else(|| RaftError::Other {
150            message: "ClusterCommand::decode: empty byte slice".to_owned(),
151        })?;
152
153        // Verify that the tag byte is one we know about before paying the cost
154        // of JSON deserialisation.
155        match tag {
156            TAG_DATA_PUT
157            | TAG_DATA_DELETE
158            | TAG_PLACE_SPLIT
159            | TAG_PLACE_MERGE
160            | TAG_PLACE_TRANSFER
161            | TAG_MEMBERSHIP_ADD
162            | TAG_MEMBERSHIP_REMOVE => {}
163            other => {
164                return Err(RaftError::Other {
165                    message: format!("ClusterCommand::decode: unknown tag byte 0x{:02x}", other),
166                });
167            }
168        }
169
170        let cmd: ClusterCommand =
171            serde_json::from_slice(json_tail).map_err(|e| RaftError::Other {
172                message: format!("ClusterCommand::decode: JSON deserialisation failed: {}", e),
173            })?;
174
175        // Consistency check: the tag embedded in the JSON must match the
176        // leading byte.  A mismatch indicates data corruption.
177        if cmd.tag() != tag {
178            return Err(RaftError::Other {
179                message: format!(
180                    "ClusterCommand::decode: tag mismatch — header byte 0x{:02x} but JSON \
181                     deserialised to variant with tag 0x{:02x}",
182                    tag,
183                    cmd.tag(),
184                ),
185            });
186        }
187
188        Ok(cmd)
189    }
190
191    /// Convert a [`crate::placement::PlacementAction`] into the corresponding [`ClusterCommand`].
192    ///
193    /// | `PlacementAction` variant | `ClusterCommand` variant |
194    /// |--------------------------|--------------------------|
195    /// | `Split`                  | `PlaceSplit`             |
196    /// | `Merge`                  | `PlaceMerge`             |
197    /// | `Transfer`               | `PlaceTransfer`          |
198    pub fn from_placement_action(action: &PlacementAction) -> Self {
199        match action {
200            PlacementAction::Split {
201                shard_id,
202                split_key,
203            } => ClusterCommand::PlaceSplit {
204                shard_id: *shard_id,
205                // Serialise the `Key` as its raw byte representation.
206                split_key: split_key.as_bytes().to_vec(),
207            },
208            PlacementAction::Merge {
209                left_shard_id,
210                right_shard_id,
211            } => ClusterCommand::PlaceMerge {
212                left_shard_id: *left_shard_id,
213                right_shard_id: *right_shard_id,
214            },
215            PlacementAction::Transfer {
216                shard_id,
217                from_node,
218                to_node,
219            } => ClusterCommand::PlaceTransfer {
220                shard_id: *shard_id,
221                from_node: *from_node,
222                to_node: *to_node,
223            },
224        }
225    }
226}
227
228// ── TryFrom<&[u8]> ───────────────────────────────────────────────────────────
229
230impl TryFrom<&[u8]> for ClusterCommand {
231    type Error = RaftError;
232
233    /// Delegate to [`ClusterCommand::decode`].
234    fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
235        ClusterCommand::decode(bytes)
236    }
237}
238
239// ── Tests ─────────────────────────────────────────────────────────────────────
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::shard::KeyRange;
245    use amaters_core::Key;
246
247    // ── round-trip helpers ────────────────────────────────────────────────────
248
249    fn assert_round_trip(cmd: ClusterCommand) {
250        let encoded = cmd.encode();
251        // Verify the leading tag byte matches.
252        assert_eq!(
253            encoded[0],
254            cmd.tag(),
255            "leading tag byte must match cmd.tag()"
256        );
257        // Decode and compare.
258        let decoded = ClusterCommand::decode(&encoded)
259            .expect("decode must succeed for a freshly encoded command");
260        assert_eq!(cmd, decoded, "decoded command must equal original");
261        // Also test TryFrom.
262        let via_try_from = ClusterCommand::try_from(encoded.as_slice())
263            .expect("TryFrom must succeed for a freshly encoded command");
264        assert_eq!(cmd, via_try_from, "TryFrom result must equal original");
265    }
266
267    // ── individual variant round-trips ────────────────────────────────────────
268
269    #[test]
270    fn test_encode_decode_place_split() {
271        let cmd = ClusterCommand::PlaceSplit {
272            shard_id: 42,
273            split_key: vec![0x80, 0x00, 0xFF],
274        };
275        assert_round_trip(cmd);
276    }
277
278    #[test]
279    fn test_encode_decode_place_merge() {
280        let cmd = ClusterCommand::PlaceMerge {
281            left_shard_id: 7,
282            right_shard_id: 8,
283        };
284        assert_round_trip(cmd);
285    }
286
287    #[test]
288    fn test_encode_decode_place_transfer() {
289        let cmd = ClusterCommand::PlaceTransfer {
290            shard_id: 99,
291            from_node: 1,
292            to_node: 3,
293        };
294        assert_round_trip(cmd);
295    }
296
297    #[test]
298    fn test_encode_decode_data_put() {
299        let cmd = ClusterCommand::DataPut {
300            key: b"hello".to_vec(),
301            value: b"world".to_vec(),
302        };
303        assert_round_trip(cmd);
304    }
305
306    #[test]
307    fn test_encode_decode_data_delete() {
308        let cmd = ClusterCommand::DataDelete {
309            key: b"goodbye".to_vec(),
310        };
311        assert_round_trip(cmd);
312    }
313
314    #[test]
315    fn test_encode_decode_membership_add() {
316        let cmd = ClusterCommand::MembershipAdd {
317            node_id: 5,
318            address: "192.168.1.10:7878".to_owned(),
319        };
320        assert_round_trip(cmd);
321    }
322
323    #[test]
324    fn test_encode_decode_membership_remove() {
325        let cmd = ClusterCommand::MembershipRemove { node_id: 5 };
326        assert_round_trip(cmd);
327    }
328
329    // ── from_placement_action — all three variants ─────────────────────────────
330
331    #[test]
332    fn test_from_placement_action_all_variants() {
333        // Split
334        let split_key = Key::from_slice(&[0x80u8]);
335        let split_action = PlacementAction::Split {
336            shard_id: 11,
337            split_key: split_key.clone(),
338        };
339        let split_cmd = ClusterCommand::from_placement_action(&split_action);
340        assert_eq!(
341            split_cmd,
342            ClusterCommand::PlaceSplit {
343                shard_id: 11,
344                split_key: split_key.as_bytes().to_vec(),
345            },
346            "Split action must map to PlaceSplit with key bytes"
347        );
348        // Must also encode/decode cleanly.
349        assert_round_trip(split_cmd);
350
351        // Merge
352        let merge_action = PlacementAction::Merge {
353            left_shard_id: 3,
354            right_shard_id: 4,
355        };
356        let merge_cmd = ClusterCommand::from_placement_action(&merge_action);
357        assert_eq!(
358            merge_cmd,
359            ClusterCommand::PlaceMerge {
360                left_shard_id: 3,
361                right_shard_id: 4,
362            },
363            "Merge action must map to PlaceMerge"
364        );
365        assert_round_trip(merge_cmd);
366
367        // Transfer
368        let transfer_action = PlacementAction::Transfer {
369            shard_id: 17,
370            from_node: 2,
371            to_node: 5,
372        };
373        let transfer_cmd = ClusterCommand::from_placement_action(&transfer_action);
374        assert_eq!(
375            transfer_cmd,
376            ClusterCommand::PlaceTransfer {
377                shard_id: 17,
378                from_node: 2,
379                to_node: 5,
380            },
381            "Transfer action must map to PlaceTransfer"
382        );
383        assert_round_trip(transfer_cmd);
384    }
385
386    // ── error handling ────────────────────────────────────────────────────────
387
388    #[test]
389    fn test_decode_empty_bytes_is_error() {
390        let result = ClusterCommand::decode(&[]);
391        assert!(result.is_err(), "decoding empty bytes must return an error");
392    }
393
394    #[test]
395    fn test_decode_unknown_tag_is_error() {
396        // Tag 0xFF is not assigned to any variant.
397        let bytes = [0xFF, b'{', b'}'];
398        let result = ClusterCommand::decode(&bytes);
399        assert!(result.is_err(), "unknown tag byte must return an error");
400    }
401
402    #[test]
403    fn test_tag_bytes_are_unique() {
404        // Construct one of every variant and verify tags are all distinct.
405        let range =
406            KeyRange::new(Key::from_slice(&[0u8]), Key::from_slice(&[255u8])).expect("valid range");
407        let _ = range; // used to prove KeyRange construction; actual tags come from enum
408
409        let variants: Vec<ClusterCommand> = vec![
410            ClusterCommand::DataPut {
411                key: vec![1],
412                value: vec![2],
413            },
414            ClusterCommand::DataDelete { key: vec![3] },
415            ClusterCommand::PlaceSplit {
416                shard_id: 1,
417                split_key: vec![0x80],
418            },
419            ClusterCommand::PlaceMerge {
420                left_shard_id: 1,
421                right_shard_id: 2,
422            },
423            ClusterCommand::PlaceTransfer {
424                shard_id: 1,
425                from_node: 1,
426                to_node: 2,
427            },
428            ClusterCommand::MembershipAdd {
429                node_id: 1,
430                address: "a:1".to_owned(),
431            },
432            ClusterCommand::MembershipRemove { node_id: 1 },
433        ];
434
435        let mut tags = std::collections::HashSet::new();
436        for v in &variants {
437            let inserted = tags.insert(v.tag());
438            assert!(
439                inserted,
440                "duplicate tag 0x{:02x} found for {:?}",
441                v.tag(),
442                v
443            );
444        }
445        assert_eq!(
446            tags.len(),
447            variants.len(),
448            "each variant must have a unique tag"
449        );
450    }
451}