use std::collections::HashMap;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::SystemTime;
use bytes::Bytes;
use d_engine_core::ApplyResult;
use d_engine_core::Error;
use d_engine_core::Lease;
use d_engine_core::StateMachine;
use d_engine_core::StorageError;
use d_engine_proto::client::WriteCommand;
use d_engine_proto::client::write_command::CompareAndSwap;
use d_engine_proto::client::write_command::Delete;
use d_engine_proto::client::write_command::Insert;
use d_engine_proto::client::write_command::Operation;
use d_engine_proto::common::Entry;
use d_engine_proto::common::LogId;
use d_engine_proto::common::entry_payload::Payload;
use d_engine_proto::server::storage::SnapshotMetadata;
use parking_lot::RwLock;
use prost::Message;
use tokio::fs;
use tokio::fs::File;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::time::Instant;
use tonic::async_trait;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use crate::storage::DefaultLease;
type FileStateMachineDataType = RwLock<HashMap<Bytes, (Bytes, u64)>>;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WalOpCode {
Noop = 0,
Insert = 1,
Delete = 2,
Config = 3,
CompareAndSwap = 4,
}
impl WalOpCode {
fn from_str(s: &str) -> Self {
match s {
"INSERT" => Self::Insert,
"DELETE" => Self::Delete,
"CONFIG" => Self::Config,
"CAS" => Self::CompareAndSwap,
_ => Self::Noop,
}
}
fn from_u8(byte: u8) -> Self {
match byte {
1 => Self::Insert,
2 => Self::Delete,
3 => Self::Config,
4 => Self::CompareAndSwap,
_ => Self::Noop,
}
}
}
#[derive(Debug)]
pub struct FileStateMachine {
data: FileStateMachineDataType,
lease: Option<Arc<DefaultLease>>,
lease_enabled: bool,
last_applied_index: AtomicU64,
last_applied_term: AtomicU64,
last_snapshot_metadata: RwLock<Option<SnapshotMetadata>>,
running: AtomicBool,
wal_entries_since_checkpoint: AtomicU64,
last_checkpoint: Mutex<Instant>,
data_dir: PathBuf,
}
impl FileStateMachine {
pub async fn new(data_dir: PathBuf) -> Result<Self, Error> {
fs::create_dir_all(&data_dir).await?;
let machine = Self {
data: RwLock::new(HashMap::new()),
lease: None, lease_enabled: false, last_applied_index: AtomicU64::new(0),
last_applied_term: AtomicU64::new(0),
last_snapshot_metadata: RwLock::new(None),
running: AtomicBool::new(true),
wal_entries_since_checkpoint: AtomicU64::new(0),
last_checkpoint: Mutex::new(Instant::now()),
data_dir: data_dir.clone(),
};
machine.load_from_disk().await?;
Ok(machine)
}
pub fn set_lease(
&mut self,
lease: Arc<DefaultLease>,
) {
self.lease_enabled = true;
self.lease = Some(lease);
}
async fn load_from_disk(&self) -> Result<(), Error> {
self.load_metadata().await?;
self.load_data().await?;
self.load_ttl_data().await?;
self.replay_wal().await?;
info!("Loaded state machine data from disk");
Ok(())
}
async fn load_ttl_data(&self) -> Result<(), Error> {
Ok(())
}
pub async fn load_lease_data(&self) -> Result<(), Error> {
let Some(ref lease) = self.lease else {
return Ok(()); };
let ttl_path = self.data_dir.join("ttl_state.bin");
if !ttl_path.exists() {
debug!("No TTL state file found");
return Ok(());
}
let ttl_data = tokio::fs::read(&ttl_path).await?;
lease.reload(&ttl_data)?;
info!("Loaded TTL state from disk: {} active TTLs", lease.len());
Ok(())
}
async fn load_metadata(&self) -> Result<(), Error> {
let metadata_path = self.data_dir.join("metadata.bin");
if !metadata_path.exists() {
return Ok(());
}
let mut file = File::open(metadata_path).await?;
let mut buffer = [0u8; 16];
if file.read_exact(&mut buffer).await.is_ok() {
let index = u64::from_be_bytes([
buffer[0], buffer[1], buffer[2], buffer[3], buffer[4], buffer[5], buffer[6],
buffer[7],
]);
let term = u64::from_be_bytes([
buffer[8], buffer[9], buffer[10], buffer[11], buffer[12], buffer[13], buffer[14],
buffer[15],
]);
self.last_applied_index.store(index, Ordering::SeqCst);
self.last_applied_term.store(term, Ordering::SeqCst);
}
Ok(())
}
async fn load_data(&self) -> Result<(), Error> {
let data_path = self.data_dir.join("state.data");
if !data_path.exists() {
return Ok(());
}
let mut file = File::open(data_path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let mut pos = 0;
let mut data = self.data.write();
while pos < buffer.len() {
if pos + 8 > buffer.len() {
break;
}
let key_len_bytes = &buffer[pos..pos + 8];
let key_len = u64::from_be_bytes([
key_len_bytes[0],
key_len_bytes[1],
key_len_bytes[2],
key_len_bytes[3],
key_len_bytes[4],
key_len_bytes[5],
key_len_bytes[6],
key_len_bytes[7],
]) as usize;
pos += 8;
if pos + key_len > buffer.len() {
break;
}
let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
pos += key_len;
if pos + 8 > buffer.len() {
break;
}
let value_len_bytes = &buffer[pos..pos + 8];
let value_len = u64::from_be_bytes([
value_len_bytes[0],
value_len_bytes[1],
value_len_bytes[2],
value_len_bytes[3],
value_len_bytes[4],
value_len_bytes[5],
value_len_bytes[6],
value_len_bytes[7],
]) as usize;
pos += 8;
if pos + value_len > buffer.len() {
break;
}
let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
pos += value_len;
if pos + 8 > buffer.len() {
break;
}
let term_bytes = &buffer[pos..pos + 8];
let term = u64::from_be_bytes([
term_bytes[0],
term_bytes[1],
term_bytes[2],
term_bytes[3],
term_bytes[4],
term_bytes[5],
term_bytes[6],
term_bytes[7],
]);
pos += 8;
data.insert(key, (value, term));
}
Ok(())
}
async fn replay_wal(&self) -> Result<(), Error> {
let wal_path = self.data_dir.join("wal.log");
if !wal_path.exists() {
debug!("No WAL file found, skipping replay");
return Ok(());
}
let mut file = File::open(wal_path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
if buffer.is_empty() {
debug!("WAL file is empty, skipping replay");
return Ok(());
}
let mut pos = 0;
let mut operations = Vec::new();
let mut replayed_count = 0;
while pos + 17 < buffer.len() {
let _index = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
pos += 8;
let term = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
pos += 8;
let op_code = WalOpCode::from_u8(buffer[pos]);
pos += 1;
if pos + 8 > buffer.len() {
warn!("Incomplete key length at position {}, stopping replay", pos);
break;
}
let key_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
pos += 8;
if pos + key_len > buffer.len() {
warn!(
"Incomplete key data at position {} (need {} bytes, have {})",
pos,
key_len,
buffer.len() - pos
);
break;
}
let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
pos += key_len;
if pos + 8 > buffer.len() {
warn!(
"Incomplete value length at position {}, stopping replay",
pos
);
break;
}
let value_len = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap()) as usize;
pos += 8;
let value = if value_len > 0 {
if pos + value_len > buffer.len() {
warn!("Incomplete value data at position {}, stopping replay", pos);
break;
}
let value_data = Bytes::from(buffer[pos..pos + value_len].to_vec());
pos += value_len;
Some(value_data)
} else {
None
};
let expire_at_secs = if pos + 8 <= buffer.len() {
let secs = u64::from_be_bytes(buffer[pos..pos + 8].try_into().unwrap());
pos += 8;
if secs > 0 { Some(secs) } else { None }
} else {
debug!(
"No expiration time field at position {}, assuming no TTL (incomplete WAL entry)",
pos
);
None
};
operations.push((op_code, key, value, term, expire_at_secs));
replayed_count += 1;
}
info!(
"Parsed {} WAL operations, applying to memory",
operations.len()
);
let mut applied_count = 0;
let mut skipped_expired = 0;
let now = std::time::SystemTime::now();
{
let mut data = self.data.write();
for (op_code, key, value, term, expire_at_secs) in operations {
match op_code {
WalOpCode::Insert => {
if let Some(value_data) = value {
let is_expired = if let Some(secs) = expire_at_secs {
let expire_at =
std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs);
now >= expire_at
} else {
false
};
if is_expired {
debug!("Skipped expired key during WAL replay: key={:?}", key);
skipped_expired += 1;
continue;
}
data.insert(key.clone(), (value_data, term));
if let Some(secs) = expire_at_secs {
if let Some(ref lease) = self.lease {
let expire_at = std::time::UNIX_EPOCH
+ std::time::Duration::from_secs(secs);
let remaining = expire_at
.duration_since(now)
.map(|d| d.as_secs())
.unwrap_or(0);
if remaining > 0 {
lease.register(key.clone(), remaining);
debug!(
"Replayed INSERT with TTL: key={:?}, remaining={}s",
key, remaining
);
}
}
} else {
debug!("Replayed INSERT: key={:?}", key);
}
applied_count += 1;
} else {
warn!("INSERT operation without value");
}
}
WalOpCode::Delete => {
data.remove(&key);
if let Some(ref lease) = self.lease {
lease.unregister(&key);
}
applied_count += 1;
debug!("Replayed DELETE: key={:?}", key);
}
WalOpCode::CompareAndSwap => {
if let Some(new_value) = value {
data.insert(key.clone(), (new_value, term));
applied_count += 1;
debug!("Replayed CAS: key={:?}", key);
} else {
warn!("CAS operation without new_value in WAL");
}
}
WalOpCode::Noop | WalOpCode::Config => {
applied_count += 1;
debug!("Replayed {:?} operation", op_code);
}
}
}
}
info!(
"WAL replay complete: {} operations replayed, {} applied, {} expired keys skipped",
replayed_count, applied_count, skipped_expired
);
if applied_count > 0 {
self.clear_wal_async().await?;
debug!(
"Cleared WAL after successful replay of {} operations",
applied_count
);
}
Ok(())
}
fn persist_data(&self) -> Result<(), Error> {
let data_copy: HashMap<Bytes, (Bytes, u64)> = {
let data = self.data.read();
data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
};
let data_path = self.data_dir.join("state.data");
let estimated: usize =
data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
let mut buf = Vec::with_capacity(estimated);
for (key, (value, term)) in &data_copy {
buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
buf.extend_from_slice(key);
buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
buf.extend_from_slice(value);
buf.extend_from_slice(&term.to_be_bytes());
}
std::fs::write(data_path, buf)?;
Ok(())
}
async fn persist_data_async(&self) -> Result<(), Error> {
let data_copy: HashMap<Bytes, (Bytes, u64)> = {
let data = self.data.read();
data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
};
let data_path = self.data_dir.join("state.data");
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(data_path)
.await?;
let estimated: usize =
data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum();
let mut buf = Vec::with_capacity(estimated);
for (key, (value, term)) in &data_copy {
buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
buf.extend_from_slice(key);
buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
buf.extend_from_slice(value);
buf.extend_from_slice(&term.to_be_bytes());
}
file.write_all(&buf).await?;
file.flush().await?;
Ok(())
}
fn persist_metadata(&self) -> Result<(), Error> {
let metadata_path = self.data_dir.join("metadata.bin");
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(metadata_path)?;
let index = self.last_applied_index.load(Ordering::SeqCst);
let term = self.last_applied_term.load(Ordering::SeqCst);
file.write_all(&index.to_be_bytes())?;
file.write_all(&term.to_be_bytes())?;
file.flush()?;
Ok(())
}
async fn persist_metadata_async(&self) -> Result<(), Error> {
let metadata_path = self.data_dir.join("metadata.bin");
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(metadata_path)
.await?;
let index = self.last_applied_index.load(Ordering::SeqCst);
let term = self.last_applied_term.load(Ordering::SeqCst);
file.write_all(&index.to_be_bytes()).await?;
file.write_all(&term.to_be_bytes()).await?;
file.flush().await?;
Ok(())
}
#[allow(unused)]
fn clear_wal(&self) -> Result<(), Error> {
let wal_path = self.data_dir.join("wal.log");
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(wal_path)?;
file.set_len(0)?;
file.flush()?;
Ok(())
}
async fn clear_wal_async(&self) -> Result<(), Error> {
let wal_path = self.data_dir.join("wal.log");
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(wal_path)
.await?;
file.set_len(0).await?;
file.flush().await?;
Ok(())
}
fn should_checkpoint(&self) -> bool {
const WAL_ENTRY_THRESHOLD: u64 = 1000;
const TIME_THRESHOLD_SECS: u64 = 10;
if self.wal_entries_since_checkpoint.load(Ordering::Relaxed) >= WAL_ENTRY_THRESHOLD {
return true;
}
if let Ok(last) = self.last_checkpoint.lock() {
return last.elapsed().as_secs() >= TIME_THRESHOLD_SECS;
}
false
}
pub(crate) async fn checkpoint(&self) -> Result<(), Error> {
self.persist_data_async().await?;
self.persist_metadata_async().await?;
self.clear_wal_async().await?;
self.wal_entries_since_checkpoint.store(0, Ordering::Relaxed);
if let Ok(mut last) = self.last_checkpoint.lock() {
*last = Instant::now();
}
debug!("Checkpoint complete");
Ok(())
}
pub async fn reset(&self) -> Result<(), Error> {
info!("Resetting state machine");
{
let mut data = self.data.write();
data.clear();
}
self.last_applied_index.store(0, Ordering::SeqCst);
self.last_applied_term.store(0, Ordering::SeqCst);
{
let mut snapshot_metadata = self.last_snapshot_metadata.write();
*snapshot_metadata = None;
}
self.clear_data_file().await?;
self.clear_metadata_file().await?;
self.clear_wal_async().await?;
info!("State machine reset completed");
Ok(())
}
async fn clear_data_file(&self) -> Result<(), Error> {
let data_path = self.data_dir.join("state.data");
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(data_path)
.await?;
file.set_len(0).await?;
file.flush().await?;
Ok(())
}
async fn clear_metadata_file(&self) -> Result<(), Error> {
let metadata_path = self.data_dir.join("metadata.bin");
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(metadata_path)
.await?;
file.write_all(&0u64.to_be_bytes()).await?;
file.write_all(&0u64.to_be_bytes()).await?;
file.flush().await?;
Ok(())
}
pub(crate) async fn append_to_wal(
&self,
entries: Vec<(Entry, String, Bytes, Option<Bytes>, u64)>,
) -> Result<(), Error> {
if entries.is_empty() {
return Ok(());
}
let wal_path = self.data_dir.join("wal.log");
let mut file =
OpenOptions::new().write(true).create(true).append(true).open(&wal_path).await?;
let estimated_size: usize = entries
.iter()
.map(|(_, _, key, value, _)| {
8 + 8 + 1 + 8 + key.len() + 8 + value.as_ref().map_or(0, |v| v.len()) + 8
})
.sum();
let mut batch_buffer = Vec::with_capacity(estimated_size);
for (entry, operation, key, value, ttl_secs) in entries {
batch_buffer.extend_from_slice(&entry.index.to_be_bytes());
batch_buffer.extend_from_slice(&entry.term.to_be_bytes());
let op_code = WalOpCode::from_str(&operation);
batch_buffer.push(op_code as u8);
batch_buffer.extend_from_slice(&(key.len() as u64).to_be_bytes());
batch_buffer.extend_from_slice(&key);
if let Some(value_data) = value {
batch_buffer.extend_from_slice(&(value_data.len() as u64).to_be_bytes());
batch_buffer.extend_from_slice(&value_data);
} else {
batch_buffer.extend_from_slice(&0u64.to_be_bytes());
}
let expire_at_secs = if ttl_secs > 0 {
let expire_at =
std::time::SystemTime::now() + std::time::Duration::from_secs(ttl_secs);
expire_at
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
} else {
0
};
batch_buffer.extend_from_slice(&expire_at_secs.to_be_bytes());
}
file.write_all(&batch_buffer).await?;
file.flush().await?;
Ok(())
}
}
impl Drop for FileStateMachine {
fn drop(&mut self) {
let timer = Instant::now();
match self.save_hard_state() {
Ok(_) => debug!("StateMachine saved in {:?}", timer.elapsed()),
Err(e) => error!("Failed to save StateMachine: {}", e),
}
}
}
#[async_trait]
impl StateMachine for FileStateMachine {
async fn start(&self) -> Result<(), Error> {
self.running.store(true, Ordering::SeqCst);
if self.lease.is_some() {
self.load_lease_data().await?;
debug!("Lease data loaded during state machine initialization");
}
info!("File state machine started");
Ok(())
}
fn stop(&self) -> Result<(), Error> {
self.running.store(false, Ordering::SeqCst);
if let Some(ref lease) = self.lease {
let ttl_snapshot = lease.to_snapshot();
let ttl_path = self.data_dir.join("ttl_state.bin");
std::fs::write(&ttl_path, ttl_snapshot)
.map_err(d_engine_core::StorageError::IoError)?;
debug!("Persisted TTL state on shutdown");
}
info!("File state machine stopped");
Ok(())
}
fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn get(
&self,
key_buffer: &[u8],
) -> Result<Option<Bytes>, Error> {
let data = self.data.read();
Ok(data.get(key_buffer).map(|(value, _)| value.clone()))
}
fn entry_term(
&self,
entry_id: u64,
) -> Option<u64> {
let data = self.data.read();
data.values().find(|(_, index)| *index == entry_id).map(|(_, term)| *term)
}
async fn apply_chunk(
&self,
chunk: Vec<Entry>,
) -> Result<Vec<ApplyResult>, Error> {
let chunk_len = chunk.len();
let mut highest_index_entry: Option<LogId> = None;
let mut batch_operations = Vec::new();
let mut results = Vec::with_capacity(chunk_len);
for entry in chunk {
let entry_index = entry.index;
assert!(entry.payload.is_some(), "Entry payload should not be None!");
if let Some(prev) = &highest_index_entry {
assert!(
entry.index > prev.index,
"apply_chunk: received unordered entry at index {} (prev={})",
entry.index,
prev.index
);
}
highest_index_entry = Some(LogId {
index: entry.index,
term: entry.term,
});
match entry.payload.as_ref().unwrap().payload.as_ref() {
Some(Payload::Noop(_)) => {
let entry_index = entry.index;
debug!("Handling NOOP command at index {}", entry_index);
batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
results.push(ApplyResult::success(entry_index));
}
Some(Payload::Command(bytes)) => match WriteCommand::decode(&bytes[..]) {
Ok(write_cmd) => {
match write_cmd.operation {
Some(Operation::Insert(Insert {
key,
value,
ttl_secs,
})) => {
let entry_index = entry.index;
batch_operations.push((
entry,
"INSERT",
key,
Some(value),
ttl_secs,
));
results.push(ApplyResult::success(entry_index));
}
Some(Operation::Delete(Delete { key })) => {
let entry_index = entry.index;
batch_operations.push((entry, "DELETE", key, None, 0));
results.push(ApplyResult::success(entry_index));
}
Some(Operation::CompareAndSwap(CompareAndSwap {
key,
expected_value: _,
new_value,
})) => {
batch_operations.push((entry, "CAS", key, Some(new_value), 0));
}
None => {
warn!("WriteCommand without operation at index {}", entry.index);
batch_operations.push((entry, "NOOP", Bytes::new(), None, 0));
}
}
}
Err(e) => {
error!(
"Failed to decode WriteCommand at index {}: {:?}",
entry.index, e
);
return Err(StorageError::SerializationError(e.to_string()).into());
}
},
Some(Payload::Config(_config_change)) => {
debug!("Ignoring config change at index {}", entry.index);
batch_operations.push((entry, "CONFIG", Bytes::new(), None, 0));
}
None => panic!("Entry payload variant should not be None!"),
}
info!("COMMITTED_LOG_METRIC: {}", entry_index);
}
let mut wal_entries = Vec::new();
for (entry, operation, key, value, ttl_secs) in &batch_operations {
wal_entries.push((
entry.clone(),
operation.to_string(),
key.clone(),
value.clone(),
*ttl_secs, ));
}
self.append_to_wal(wal_entries).await?;
{
let mut data = self.data.write();
for (entry, operation, key, value, ttl_secs) in batch_operations {
match operation {
"NOOP" => {
}
"INSERT" => {
if let Some(value) = value {
data.insert(key.clone(), (value, entry.term));
if ttl_secs > 0 {
if !self.lease_enabled {
return Err(StorageError::FeatureNotEnabled(
"TTL feature is not enabled on this server. \
Enable it in config: [raft.state_machine.lease] enabled = true".into()
).into());
}
let lease = unsafe { self.lease.as_ref().unwrap_unchecked() };
lease.register(key, ttl_secs);
}
}
}
"DELETE" => {
data.remove(&key);
if let Some(ref lease) = self.lease {
lease.unregister(&key);
}
}
"CAS" => {
if let Some(Payload::Command(bytes)) =
entry.payload.as_ref().unwrap().payload.as_ref()
{
if let Ok(write_cmd) = WriteCommand::decode(&bytes[..]) {
if let Some(Operation::CompareAndSwap(CompareAndSwap {
expected_value,
..
})) = write_cmd.operation
{
let current_value = data.get(&key);
let cas_success = match (current_value, &expected_value) {
(Some((current, _)), Some(expected)) => {
current.as_ref() == expected.as_ref()
}
(None, None) => true,
_ => false,
};
results.push(if cas_success {
ApplyResult::success(entry.index)
} else {
ApplyResult::failure(entry.index)
});
debug!(
"CAS at index {}: key={:?}, success={}",
entry.index,
String::from_utf8_lossy(&key),
cas_success
);
if cas_success {
if let Some(new_value) = value {
data.insert(key, (new_value, entry.term));
}
}
}
}
}
}
"CONFIG" => {
}
_ => warn!("Unknown operation: {}", operation),
}
}
}
if let Some(log_id) = highest_index_entry {
debug!("State machine - updated last_applied: {:?}", log_id);
self.update_last_applied(log_id);
}
self.wal_entries_since_checkpoint.fetch_add(chunk_len as u64, Ordering::Relaxed);
if self.should_checkpoint() {
self.checkpoint().await?;
}
Ok(results)
}
fn len(&self) -> usize {
self.data.read().len()
}
fn update_last_applied(
&self,
last_applied: LogId,
) {
self.last_applied_index.store(last_applied.index, Ordering::SeqCst);
self.last_applied_term.store(last_applied.term, Ordering::SeqCst);
}
fn last_applied(&self) -> LogId {
LogId {
index: self.last_applied_index.load(Ordering::SeqCst),
term: self.last_applied_term.load(Ordering::SeqCst),
}
}
fn persist_last_applied(
&self,
last_applied: LogId,
) -> Result<(), Error> {
self.update_last_applied(last_applied);
self.persist_metadata()
}
fn update_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error> {
*self.last_snapshot_metadata.write() = Some(snapshot_metadata.clone());
Ok(())
}
fn snapshot_metadata(&self) -> Option<SnapshotMetadata> {
self.last_snapshot_metadata.read().clone()
}
fn persist_last_snapshot_metadata(
&self,
snapshot_metadata: &SnapshotMetadata,
) -> Result<(), Error> {
self.update_last_snapshot_metadata(snapshot_metadata)
}
async fn apply_snapshot_from_file(
&self,
metadata: &SnapshotMetadata,
snapshot_dir: std::path::PathBuf,
) -> Result<(), Error> {
info!("Applying snapshot from file: {:?}", snapshot_dir);
let snapshot_data_path = snapshot_dir.join("snapshot.bin");
let mut file = File::open(snapshot_data_path).await?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
let mut pos = 0;
let mut new_data = HashMap::new();
while pos < buffer.len() {
if pos + 8 > buffer.len() {
break;
}
let key_len_bytes = &buffer[pos..pos + 8];
let key_len = u64::from_be_bytes([
key_len_bytes[0],
key_len_bytes[1],
key_len_bytes[2],
key_len_bytes[3],
key_len_bytes[4],
key_len_bytes[5],
key_len_bytes[6],
key_len_bytes[7],
]) as usize;
pos += 8;
if pos + key_len > buffer.len() {
break;
}
let key = Bytes::from(buffer[pos..pos + key_len].to_vec());
pos += key_len;
if pos + 8 > buffer.len() {
break;
}
let value_len_bytes = &buffer[pos..pos + 8];
let value_len = u64::from_be_bytes([
value_len_bytes[0],
value_len_bytes[1],
value_len_bytes[2],
value_len_bytes[3],
value_len_bytes[4],
value_len_bytes[5],
value_len_bytes[6],
value_len_bytes[7],
]) as usize;
pos += 8;
if pos + value_len > buffer.len() {
break;
}
let value = Bytes::from(buffer[pos..pos + value_len].to_vec());
pos += value_len;
if pos + 8 > buffer.len() {
break;
}
let term_bytes = &buffer[pos..pos + 8];
let term = u64::from_be_bytes([
term_bytes[0],
term_bytes[1],
term_bytes[2],
term_bytes[3],
term_bytes[4],
term_bytes[5],
term_bytes[6],
term_bytes[7],
]);
pos += 8;
new_data.insert(key, (value, term));
}
if pos + 8 <= buffer.len() {
let ttl_len_bytes = &buffer[pos..pos + 8];
let ttl_len = u64::from_be_bytes([
ttl_len_bytes[0],
ttl_len_bytes[1],
ttl_len_bytes[2],
ttl_len_bytes[3],
ttl_len_bytes[4],
ttl_len_bytes[5],
ttl_len_bytes[6],
ttl_len_bytes[7],
]) as usize;
pos += 8;
if pos + ttl_len <= buffer.len() {
let ttl_data = &buffer[pos..pos + ttl_len];
if let Some(ref lease) = self.lease {
lease.reload(ttl_data)?;
}
}
}
{
let mut data = self.data.write();
*data = new_data;
}
*self.last_snapshot_metadata.write() = Some(metadata.clone());
if let Some(last_included) = &metadata.last_included {
self.update_last_applied(*last_included);
}
self.persist_data_async().await?;
self.persist_metadata_async().await?;
self.clear_wal_async().await?;
info!("Snapshot applied successfully");
Ok(())
}
async fn generate_snapshot_data(
&self,
new_snapshot_dir: std::path::PathBuf,
last_included: LogId,
) -> Result<Bytes, Error> {
info!("Generating snapshot data up to {:?}", last_included);
fs::create_dir_all(&new_snapshot_dir).await?;
let snapshot_path = new_snapshot_dir.join("snapshot.bin");
let mut file = File::create(&snapshot_path).await?;
let data_copy: HashMap<Bytes, (Bytes, u64)> = {
let data = self.data.read();
data.iter().map(|(k, (v, t))| (k.clone(), (v.clone(), *t))).collect()
};
let lease_snapshot = if let Some(ref lease) = self.lease {
lease.to_snapshot()
} else {
Vec::new()
};
let estimated: usize =
data_copy.iter().map(|(k, (v, _))| 8 + k.len() + 8 + v.len() + 8).sum::<usize>()
+ 8
+ lease_snapshot.len();
let mut buf = Vec::with_capacity(estimated);
for (key, (value, term)) in &data_copy {
buf.extend_from_slice(&(key.len() as u64).to_be_bytes());
buf.extend_from_slice(key);
buf.extend_from_slice(&(value.len() as u64).to_be_bytes());
buf.extend_from_slice(value);
buf.extend_from_slice(&term.to_be_bytes());
}
buf.extend_from_slice(&(lease_snapshot.len() as u64).to_be_bytes());
buf.extend_from_slice(&lease_snapshot);
file.write_all(&buf).await?;
file.flush().await?;
let metadata = SnapshotMetadata {
last_included: Some(last_included),
checksum: Bytes::from(vec![0; 32]), };
self.update_last_snapshot_metadata(&metadata)?;
info!("Snapshot generated at {:?}", snapshot_path);
Ok(Bytes::from_static(&[0u8; 32]))
}
fn save_hard_state(&self) -> Result<(), Error> {
let last_applied = self.last_applied();
self.persist_last_applied(last_applied)?;
if let Some(last_snapshot_metadata) = self.snapshot_metadata() {
self.persist_last_snapshot_metadata(&last_snapshot_metadata)?;
}
self.flush()?;
Ok(())
}
fn flush(&self) -> Result<(), Error> {
self.persist_data()?;
self.persist_metadata()?;
Ok(())
}
async fn flush_async(&self) -> Result<(), Error> {
self.checkpoint().await
}
async fn reset(&self) -> Result<(), Error> {
self.reset().await
}
async fn lease_background_cleanup(&self) -> Result<Vec<Bytes>, Error> {
let Some(ref lease) = self.lease else {
return Ok(vec![]);
};
let now = SystemTime::now();
let expired_keys = lease.get_expired_keys(now);
if expired_keys.is_empty() {
return Ok(vec![]);
}
debug!(
"Lease background cleanup: found {} expired keys",
expired_keys.len()
);
{
let mut data = self.data.write();
for key in &expired_keys {
data.remove(key);
}
}
self.persist_data_async().await?;
info!(
"Lease background cleanup: deleted {} expired keys",
expired_keys.len()
);
Ok(expired_keys)
}
}