Skip to main content

d_engine_core/
command.rs

1/// # Why this module exists
2///
3/// `EntryPayload.command` is stored as raw `bytes` in the Raft log — intentionally.
4/// The replication path (Leader → Followers → WAL) must be zero-copy: Followers store
5/// and forward bytes without ever decoding them.  Only the *apply* step needs to know
6/// what the command means.
7///
8/// Before this module, every `StateMachine` implementor called
9/// `WriteCommand::decode(&data[..])` themselves, duplicating the same proto decode
10/// logic and forcing a `d-engine-proto` dependency on anyone writing a custom SM.
11///
12/// This module moves the decode to one place — `DefaultStateMachineHandler` — and
13/// exposes a clean Rust-native type (`Command`) at the `StateMachine::apply_chunk`
14/// boundary.  `WriteCommand` (proto, wire format) and `Command` (Rust enum, execution
15/// interface) are intentionally two separate types — anti-corruption layer — so that
16/// proto schema changes never break the SM trait.
17use bytes::Bytes;
18use d_engine_proto::client::WriteCommand;
19use d_engine_proto::client::write_command::Operation;
20use d_engine_proto::common::Entry;
21use d_engine_proto::common::entry_payload::Payload;
22use prost::Message;
23
24use crate::Error;
25use crate::StorageError;
26
27/// Decoded KV operation — the unit of work delivered to a `StateMachine`.
28///
29/// Replaces raw proto `Entry` at the `StateMachine::apply_chunk` boundary.
30/// The framework decodes wire bytes exactly once (in `DefaultStateMachineHandler`)
31/// before dispatching; implementors never touch prost or proto types.
32#[derive(Debug, Clone, PartialEq)]
33pub enum Command {
34    /// Raft-internal no-op written by a newly elected leader.
35    /// SM must return `ApplyResult::success(entry.index)` and otherwise ignore it.
36    /// Cannot be filtered before reaching SM: omitting it would create gaps in
37    /// `last_applied`, breaking the index-tracking invariant in `StateMachineWorker`.
38    Noop,
39
40    Insert {
41        key: Bytes,
42        value: Bytes,
43        /// `None` = no expiration.  Proto encodes this as `ttl_secs = 0`; we convert
44        /// to `Option` here so SM implementors use idiomatic Rust instead of magic zeros.
45        ttl_secs: Option<u64>,
46    },
47
48    Delete {
49        key: Bytes,
50    },
51
52    CompareAndSwap {
53        key: Bytes,
54        /// `None` means the key must not exist for the swap to succeed.
55        expected: Option<Bytes>,
56        value: Bytes,
57    },
58}
59
60/// A decoded Raft log entry ready for state machine application.
61///
62/// Pairs `index` / `term` with the decoded `Command` so that state machines can
63/// produce `ApplyResult { index, .. }` without re-reading the original `Entry`.
64#[derive(Debug, Clone, PartialEq)]
65pub struct ApplyEntry {
66    pub index: u64,
67    pub term: u64,
68    pub command: Command,
69}
70
71/// Single coupling point between the proto schema and the SM trait.
72///
73/// When a new operation variant is added to `WriteCommand`, the compiler forces
74/// an update here (non-exhaustive `match`), preventing silent omissions downstream.
75impl TryFrom<WriteCommand> for Command {
76    type Error = Error;
77
78    fn try_from(wc: WriteCommand) -> Result<Self, Error> {
79        match wc.operation {
80            Some(Operation::Insert(i)) => Ok(Command::Insert {
81                key: i.key,
82                value: i.value,
83                ttl_secs: if i.ttl_secs == 0 {
84                    None
85                } else {
86                    Some(i.ttl_secs)
87                },
88            }),
89            Some(Operation::Delete(d)) => Ok(Command::Delete { key: d.key }),
90            Some(Operation::CompareAndSwap(c)) => Ok(Command::CompareAndSwap {
91                key: c.key,
92                expected: c.expected_value,
93                value: c.new_value,
94            }),
95            None => {
96                Err(StorageError::StateMachineError("WriteCommand has no operation".into()).into())
97            }
98        }
99    }
100}
101
102/// Decode a batch of raw Raft `Entry` values into `ApplyEntry` values.
103///
104/// - `Config` entries become `Command::Noop` — membership is handled by the Raft layer, but
105///   the index must still reach SM so `last_applied` stays contiguous; dropping them would
106///   leave `last_applied` stuck below the config index, breaking ReadIndex drain.
107/// - `Noop` entries become `Command::Noop`.
108/// - `Command` bytes are decoded via `TryFrom<WriteCommand>`.
109/// - Any decode failure returns `Err` immediately.
110pub fn decode_entries(entries: Vec<Entry>) -> Result<Vec<ApplyEntry>, Error> {
111    let mut result = Vec::with_capacity(entries.len());
112
113    for entry in entries {
114        let index = entry.index;
115        let term = entry.term;
116
117        let payload = entry.payload.and_then(|p| p.payload).ok_or_else(|| {
118            StorageError::StateMachineError(format!("Entry at index {index} has no payload"))
119        })?;
120
121        match payload {
122            Payload::Noop(_) => {
123                result.push(ApplyEntry {
124                    index,
125                    term,
126                    command: Command::Noop,
127                });
128            }
129            Payload::Config(_) => {
130                // Membership changes are handled by the membership layer.
131                // Convert to Noop so sm.last_applied advances through this index —
132                // dropping the entry would leave last_applied stuck, breaking ReadIndex.
133                result.push(ApplyEntry {
134                    index,
135                    term,
136                    command: Command::Noop,
137                });
138            }
139            Payload::Command(data) => {
140                let wc = WriteCommand::decode(&data[..]).map_err(|e| {
141                    StorageError::SerializationError(format!(
142                        "Failed to decode WriteCommand at index {index}: {e}"
143                    ))
144                })?;
145                result.push(ApplyEntry {
146                    index,
147                    term,
148                    command: Command::try_from(wc)?,
149                });
150            }
151        }
152    }
153
154    Ok(result)
155}
156
157#[cfg(test)]
158mod tests {
159    use bytes::Bytes;
160    use d_engine_proto::common::EntryPayload;
161    use d_engine_proto::common::MembershipChange;
162    use d_engine_proto::common::Noop;
163    use d_engine_proto::common::entry_payload::Payload;
164
165    use super::*;
166
167    fn noop_entry(index: u64) -> Entry {
168        Entry {
169            index,
170            term: 1,
171            payload: Some(EntryPayload {
172                payload: Some(Payload::Noop(Noop {})),
173            }),
174        }
175    }
176
177    fn config_entry(index: u64) -> Entry {
178        Entry {
179            index,
180            term: 1,
181            payload: Some(EntryPayload {
182                payload: Some(Payload::Config(MembershipChange { change: None })),
183            }),
184        }
185    }
186
187    fn insert_entry(
188        index: u64,
189        key: &str,
190        value: &str,
191    ) -> Entry {
192        use d_engine_proto::client::WriteCommand;
193        use d_engine_proto::client::write_command::{Insert, Operation};
194        use prost::Message;
195
196        let wc = WriteCommand {
197            operation: Some(Operation::Insert(Insert {
198                key: Bytes::from(key.to_owned()),
199                value: Bytes::from(value.to_owned()),
200                ttl_secs: 0,
201            })),
202        };
203        let mut buf = Vec::new();
204        wc.encode(&mut buf).unwrap();
205        Entry {
206            index,
207            term: 1,
208            payload: Some(EntryPayload {
209                payload: Some(Payload::Command(Bytes::from(buf))),
210            }),
211        }
212    }
213
214    /// Config entries must produce Command::Noop, not be dropped.
215    /// Dropping them leaves sm.last_applied stuck, breaking ReadIndex drain.
216    #[test]
217    fn config_entry_becomes_noop() {
218        let entries = vec![config_entry(5)];
219        let result = decode_entries(entries).unwrap();
220        assert_eq!(result.len(), 1);
221        assert_eq!(result[0].index, 5);
222        assert_eq!(result[0].command, Command::Noop);
223    }
224
225    /// last_applied continuity: a mixed batch of Insert→Config→Insert must yield
226    /// three ApplyEntry values with no index gaps.
227    #[test]
228    fn config_entry_preserves_index_continuity() {
229        let entries = vec![
230            insert_entry(10, "k1", "v1"),
231            config_entry(11), // membership change
232            insert_entry(12, "k2", "v2"),
233        ];
234        let result = decode_entries(entries).unwrap();
235        assert_eq!(result.len(), 3);
236        assert_eq!(result[0].index, 10);
237        assert_eq!(result[1].index, 11);
238        assert!(matches!(result[1].command, Command::Noop));
239        assert_eq!(result[2].index, 12);
240    }
241
242    /// A chunk that is entirely Config entries must still produce one Noop per entry,
243    /// so the SM can advance last_applied to the highest config index.
244    #[test]
245    fn all_config_chunk_produces_noops() {
246        let entries = vec![config_entry(3), config_entry(4), config_entry(5)];
247        let result = decode_entries(entries).unwrap();
248        assert_eq!(result.len(), 3);
249        assert!(result.iter().all(|e| e.command == Command::Noop));
250        assert_eq!(result.last().unwrap().index, 5);
251    }
252
253    #[test]
254    fn noop_entry_produces_noop() {
255        let result = decode_entries(vec![noop_entry(1)]).unwrap();
256        assert_eq!(result.len(), 1);
257        assert_eq!(result[0].command, Command::Noop);
258    }
259}