d_engine_core/command.rs
1/// # Why this module exists
2///
3/// `EntryPayload.command` is stored as raw `bytes` in the Raft log — intentionally.
4/// The replication path (Leader → Followers → WAL) must be zero-copy: Followers store
5/// and forward bytes without ever decoding them. Only the *apply* step needs to know
6/// what the command means.
7///
8/// Before this module, every `StateMachine` implementor called
9/// `WriteCommand::decode(&data[..])` themselves, duplicating the same proto decode
10/// logic and forcing a `d-engine-proto` dependency on anyone writing a custom SM.
11///
12/// This module moves the decode to one place — `DefaultStateMachineHandler` — and
13/// exposes a clean Rust-native type (`Command`) at the `StateMachine::apply_chunk`
14/// boundary. `WriteCommand` (proto, wire format) and `Command` (Rust enum, execution
15/// interface) are intentionally two separate types — anti-corruption layer — so that
16/// proto schema changes never break the SM trait.
17use bytes::Bytes;
18use d_engine_proto::client::WriteCommand;
19use d_engine_proto::client::write_command::Operation;
20use d_engine_proto::common::Entry;
21use d_engine_proto::common::entry_payload::Payload;
22use prost::Message;
23
24use crate::Error;
25use crate::StorageError;
26
27/// Decoded KV operation — the unit of work delivered to a `StateMachine`.
28///
29/// Replaces raw proto `Entry` at the `StateMachine::apply_chunk` boundary.
30/// The framework decodes wire bytes exactly once (in `DefaultStateMachineHandler`)
31/// before dispatching; implementors never touch prost or proto types.
32#[derive(Debug, Clone, PartialEq)]
33pub enum Command {
34 /// Raft-internal no-op written by a newly elected leader.
35 /// SM must return `ApplyResult::success(entry.index)` and otherwise ignore it.
36 /// Cannot be filtered before reaching SM: omitting it would create gaps in
37 /// `last_applied`, breaking the index-tracking invariant in `StateMachineWorker`.
38 Noop,
39
40 Insert {
41 key: Bytes,
42 value: Bytes,
43 /// `None` = no expiration. Proto encodes this as `ttl_secs = 0`; we convert
44 /// to `Option` here so SM implementors use idiomatic Rust instead of magic zeros.
45 ttl_secs: Option<u64>,
46 },
47
48 Delete {
49 key: Bytes,
50 },
51
52 CompareAndSwap {
53 key: Bytes,
54 /// `None` means the key must not exist for the swap to succeed.
55 expected: Option<Bytes>,
56 value: Bytes,
57 },
58}
59
60/// A decoded Raft log entry ready for state machine application.
61///
62/// Pairs `index` / `term` with the decoded `Command` so that state machines can
63/// produce `ApplyResult { index, .. }` without re-reading the original `Entry`.
64#[derive(Debug, Clone, PartialEq)]
65pub struct ApplyEntry {
66 pub index: u64,
67 pub term: u64,
68 pub command: Command,
69}
70
71/// Single coupling point between the proto schema and the SM trait.
72///
73/// When a new operation variant is added to `WriteCommand`, the compiler forces
74/// an update here (non-exhaustive `match`), preventing silent omissions downstream.
75impl TryFrom<WriteCommand> for Command {
76 type Error = Error;
77
78 fn try_from(wc: WriteCommand) -> Result<Self, Error> {
79 match wc.operation {
80 Some(Operation::Insert(i)) => Ok(Command::Insert {
81 key: i.key,
82 value: i.value,
83 ttl_secs: if i.ttl_secs == 0 {
84 None
85 } else {
86 Some(i.ttl_secs)
87 },
88 }),
89 Some(Operation::Delete(d)) => Ok(Command::Delete { key: d.key }),
90 Some(Operation::CompareAndSwap(c)) => Ok(Command::CompareAndSwap {
91 key: c.key,
92 expected: c.expected_value,
93 value: c.new_value,
94 }),
95 None => {
96 Err(StorageError::StateMachineError("WriteCommand has no operation".into()).into())
97 }
98 }
99 }
100}
101
102/// Decode a batch of raw Raft `Entry` values into `ApplyEntry` values.
103///
104/// - `Config` entries become `Command::Noop` — membership is handled by the Raft layer, but
105/// the index must still reach SM so `last_applied` stays contiguous; dropping them would
106/// leave `last_applied` stuck below the config index, breaking ReadIndex drain.
107/// - `Noop` entries become `Command::Noop`.
108/// - `Command` bytes are decoded via `TryFrom<WriteCommand>`.
109/// - Any decode failure returns `Err` immediately.
110pub fn decode_entries(entries: Vec<Entry>) -> Result<Vec<ApplyEntry>, Error> {
111 let mut result = Vec::with_capacity(entries.len());
112
113 for entry in entries {
114 let index = entry.index;
115 let term = entry.term;
116
117 let payload = entry.payload.and_then(|p| p.payload).ok_or_else(|| {
118 StorageError::StateMachineError(format!("Entry at index {index} has no payload"))
119 })?;
120
121 match payload {
122 Payload::Noop(_) => {
123 result.push(ApplyEntry {
124 index,
125 term,
126 command: Command::Noop,
127 });
128 }
129 Payload::Config(_) => {
130 // Membership changes are handled by the membership layer.
131 // Convert to Noop so sm.last_applied advances through this index —
132 // dropping the entry would leave last_applied stuck, breaking ReadIndex.
133 result.push(ApplyEntry {
134 index,
135 term,
136 command: Command::Noop,
137 });
138 }
139 Payload::Command(data) => {
140 let wc = WriteCommand::decode(&data[..]).map_err(|e| {
141 StorageError::SerializationError(format!(
142 "Failed to decode WriteCommand at index {index}: {e}"
143 ))
144 })?;
145 result.push(ApplyEntry {
146 index,
147 term,
148 command: Command::try_from(wc)?,
149 });
150 }
151 }
152 }
153
154 Ok(result)
155}
156
157#[cfg(test)]
158mod tests {
159 use bytes::Bytes;
160 use d_engine_proto::common::EntryPayload;
161 use d_engine_proto::common::MembershipChange;
162 use d_engine_proto::common::Noop;
163 use d_engine_proto::common::entry_payload::Payload;
164
165 use super::*;
166
167 fn noop_entry(index: u64) -> Entry {
168 Entry {
169 index,
170 term: 1,
171 payload: Some(EntryPayload {
172 payload: Some(Payload::Noop(Noop {})),
173 }),
174 }
175 }
176
177 fn config_entry(index: u64) -> Entry {
178 Entry {
179 index,
180 term: 1,
181 payload: Some(EntryPayload {
182 payload: Some(Payload::Config(MembershipChange { change: None })),
183 }),
184 }
185 }
186
187 fn insert_entry(
188 index: u64,
189 key: &str,
190 value: &str,
191 ) -> Entry {
192 use d_engine_proto::client::WriteCommand;
193 use d_engine_proto::client::write_command::{Insert, Operation};
194 use prost::Message;
195
196 let wc = WriteCommand {
197 operation: Some(Operation::Insert(Insert {
198 key: Bytes::from(key.to_owned()),
199 value: Bytes::from(value.to_owned()),
200 ttl_secs: 0,
201 })),
202 };
203 let mut buf = Vec::new();
204 wc.encode(&mut buf).unwrap();
205 Entry {
206 index,
207 term: 1,
208 payload: Some(EntryPayload {
209 payload: Some(Payload::Command(Bytes::from(buf))),
210 }),
211 }
212 }
213
214 /// Config entries must produce Command::Noop, not be dropped.
215 /// Dropping them leaves sm.last_applied stuck, breaking ReadIndex drain.
216 #[test]
217 fn config_entry_becomes_noop() {
218 let entries = vec![config_entry(5)];
219 let result = decode_entries(entries).unwrap();
220 assert_eq!(result.len(), 1);
221 assert_eq!(result[0].index, 5);
222 assert_eq!(result[0].command, Command::Noop);
223 }
224
225 /// last_applied continuity: a mixed batch of Insert→Config→Insert must yield
226 /// three ApplyEntry values with no index gaps.
227 #[test]
228 fn config_entry_preserves_index_continuity() {
229 let entries = vec![
230 insert_entry(10, "k1", "v1"),
231 config_entry(11), // membership change
232 insert_entry(12, "k2", "v2"),
233 ];
234 let result = decode_entries(entries).unwrap();
235 assert_eq!(result.len(), 3);
236 assert_eq!(result[0].index, 10);
237 assert_eq!(result[1].index, 11);
238 assert!(matches!(result[1].command, Command::Noop));
239 assert_eq!(result[2].index, 12);
240 }
241
242 /// A chunk that is entirely Config entries must still produce one Noop per entry,
243 /// so the SM can advance last_applied to the highest config index.
244 #[test]
245 fn all_config_chunk_produces_noops() {
246 let entries = vec![config_entry(3), config_entry(4), config_entry(5)];
247 let result = decode_entries(entries).unwrap();
248 assert_eq!(result.len(), 3);
249 assert!(result.iter().all(|e| e.command == Command::Noop));
250 assert_eq!(result.last().unwrap().index, 5);
251 }
252
253 #[test]
254 fn noop_entry_produces_noop() {
255 let result = decode_entries(vec![noop_entry(1)]).unwrap();
256 assert_eq!(result.len(), 1);
257 assert_eq!(result[0].command, Command::Noop);
258 }
259}