use bytes::Bytes;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::Operation;
use d_engine_proto::common::Entry;
use d_engine_proto::common::entry_payload::Payload;
use prost::Message;
use crate::Error;
use crate::StorageError;
#[derive(Debug, Clone, PartialEq)]
pub enum Command {
Noop,
Insert {
key: Bytes,
value: Bytes,
ttl_secs: Option<u64>,
},
Delete {
key: Bytes,
},
CompareAndSwap {
key: Bytes,
expected: Option<Bytes>,
value: Bytes,
},
}
#[derive(Debug, Clone, PartialEq)]
pub struct ApplyEntry {
pub index: u64,
pub term: u64,
pub command: Command,
}
impl TryFrom<WriteCommand> for Command {
type Error = Error;
fn try_from(wc: WriteCommand) -> Result<Self, Error> {
match wc.operation {
Some(Operation::Insert(i)) => Ok(Command::Insert {
key: i.key,
value: i.value,
ttl_secs: if i.ttl_secs == 0 {
None
} else {
Some(i.ttl_secs)
},
}),
Some(Operation::Delete(d)) => Ok(Command::Delete { key: d.key }),
Some(Operation::CompareAndSwap(c)) => Ok(Command::CompareAndSwap {
key: c.key,
expected: c.expected_value,
value: c.new_value,
}),
None => {
Err(StorageError::StateMachineError("WriteCommand has no operation".into()).into())
}
}
}
}
pub fn decode_entries(entries: Vec<Entry>) -> Result<Vec<ApplyEntry>, Error> {
let mut result = Vec::with_capacity(entries.len());
for entry in entries {
let index = entry.index;
let term = entry.term;
let payload = entry.payload.and_then(|p| p.payload).ok_or_else(|| {
StorageError::StateMachineError(format!("Entry at index {index} has no payload"))
})?;
match payload {
Payload::Noop(_) => {
result.push(ApplyEntry {
index,
term,
command: Command::Noop,
});
}
Payload::Config(_) => {
result.push(ApplyEntry {
index,
term,
command: Command::Noop,
});
}
Payload::Command(data) => {
let wc = WriteCommand::decode(&data[..]).map_err(|e| {
StorageError::SerializationError(format!(
"Failed to decode WriteCommand at index {index}: {e}"
))
})?;
result.push(ApplyEntry {
index,
term,
command: Command::try_from(wc)?,
});
}
}
}
Ok(result)
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use d_engine_proto::common::EntryPayload;
use d_engine_proto::common::MembershipChange;
use d_engine_proto::common::Noop;
use d_engine_proto::common::entry_payload::Payload;
use super::*;
fn noop_entry(index: u64) -> Entry {
Entry {
index,
term: 1,
payload: Some(EntryPayload {
payload: Some(Payload::Noop(Noop {})),
}),
}
}
fn config_entry(index: u64) -> Entry {
Entry {
index,
term: 1,
payload: Some(EntryPayload {
payload: Some(Payload::Config(MembershipChange { change: None })),
}),
}
}
fn insert_entry(
index: u64,
key: &str,
value: &str,
) -> Entry {
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::{Insert, Operation};
use prost::Message;
let wc = WriteCommand {
operation: Some(Operation::Insert(Insert {
key: Bytes::from(key.to_owned()),
value: Bytes::from(value.to_owned()),
ttl_secs: 0,
})),
};
let mut buf = Vec::new();
wc.encode(&mut buf).unwrap();
Entry {
index,
term: 1,
payload: Some(EntryPayload {
payload: Some(Payload::Command(Bytes::from(buf))),
}),
}
}
#[test]
fn config_entry_becomes_noop() {
let entries = vec![config_entry(5)];
let result = decode_entries(entries).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].index, 5);
assert_eq!(result[0].command, Command::Noop);
}
#[test]
fn config_entry_preserves_index_continuity() {
let entries = vec![
insert_entry(10, "k1", "v1"),
config_entry(11), insert_entry(12, "k2", "v2"),
];
let result = decode_entries(entries).unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0].index, 10);
assert_eq!(result[1].index, 11);
assert!(matches!(result[1].command, Command::Noop));
assert_eq!(result[2].index, 12);
}
#[test]
fn all_config_chunk_produces_noops() {
let entries = vec![config_entry(3), config_entry(4), config_entry(5)];
let result = decode_entries(entries).unwrap();
assert_eq!(result.len(), 3);
assert!(result.iter().all(|e| e.command == Command::Noop));
assert_eq!(result.last().unwrap().index, 5);
}
#[test]
fn noop_entry_produces_noop() {
let result = decode_entries(vec![noop_entry(1)]).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].command, Command::Noop);
}
}