Module customize_state_machine

Module customize_state_machine 

Source
Expand description

§Implementing Custom State Machines

d-engine supports pluggable state machines through the StateMachine trait. This is where your application’s core logic executes—processing committed log entries to update application state.

§Architecture Context

  • The StateMachine is the application logic layer that processes committed Raft log entries
  • The StateMachineHandler manages the lifecycle: applying entries, triggering snapshots, handling purges
  • Default implementations include file-based and RocksDB storage (requires features = ["rocksdb"])
  • Custom implementations enable specialized behaviors for key-value stores, document databases, etc.

§1. Implement the Trait

use d_engine::{StateMachine, Result, Entry, LogId, SnapshotMetadata};
use async_trait::async_trait;

struct CustomStateMachine {
    // Your storage backend
    backend: Arc<dyn ApplicationStorage>,
    last_applied: AtomicU64,
    snapshot_meta: Mutex<Option<SnapshotMetadata>>
}

#[async_trait]
impl StateMachine for CustomStateMachine {
    fn start(&self) -> Result<(), Error> {
        // Initialize your state machine
        Ok(())
    }

    fn stop(&self) -> Result<(), Error> {
        // Cleanup resources
        Ok(())
    }

    fn is_running(&self) -> bool {
        // Return running status
        true
    }

    fn get(&self, key_buffer: &[u8]) -> Result<Option<Vec<u8>>, Error> {
        // Retrieve value by key
        self.backend.get(key_buffer)
    }

    fn entry_term(&self, entry_id: u64) -> Option<u64> {
        // Return term for specific entry
        Some(1)
    }

    async fn apply_chunk(&self, chunk: Vec<Entry>) -> Result<()> {
        // Deserialize and process entries
        for entry in chunk {
            let cmd: AppCommand = bincode::deserialize(&entry.data)?;
            self.backend.execute(cmd)?;
        }
        Ok(())
    }

    fn len(&self) -> usize {
        // Return number of entries
        self.backend.count()
    }

    // Implement all required methods...
}

§2. Key Implementation Notes

  • Atomic Operations: Ensure apply_chunk() either fully applies or fails the entire batch
  • Idempotency: Handle duplicate entries safely—Raft may resend committed entries
  • Snapshot Isolation: apply_snapshot_from_file() must atomically replace state
  • Checksum Validation: Mandatory for snapshot integrity—validate before application
  • Concurrency Control: Use appropriate locking for state mutations
  • Resource Cleanup: Implement Drop to ensure proper flush on shutdown

§3. StateMachine API Reference

MethodPurposeSync/AsyncCriticality
start()Initialize state machine serviceSyncHigh
stop()Graceful shutdownSyncHigh
is_running()Check service statusSyncMedium
get()Read value by keySyncHigh
entry_term()Get term for log indexSyncMedium
apply_chunk()Apply committed entriesAsyncCritical
len()Get entry countSyncLow
is_empty()Check if emptySyncLow
update_last_applied()Update applied index in memorySyncHigh
last_applied()Get last applied indexSyncHigh
persist_last_applied()Persist applied indexSyncHigh
update_last_snapshot_metadata()Update snapshot metadata in memorySyncMedium
snapshot_metadata()Get current snapshot metadataSyncMedium
persist_last_snapshot_metadata()Persist snapshot metadataSyncMedium
apply_snapshot_from_file()Replace state with snapshotAsyncCritical
generate_snapshot_data()Create new snapshotAsyncHigh
save_hard_state()Persist term/vote stateSyncHigh
flush()Sync writes to storageSyncHigh
flush_async()Async flushAsyncHigh
reset()Reset to initial stateAsyncMedium

§4. Testing Your Implementation

Use the built-in test patterns from d-engine’s test suite:

use d_engine::{StateMachine, Error};
use tempfile::TempDir;

struct TestStateMachineBuilder {
    temp_dir: TempDir,
}

impl TestStateMachineBuilder {
    async fn build(&self) -> Result<Arc<CustomStateMachine>, Error> {
        let path = self.temp_dir.path().join("sm_test");
        let sm = CustomStateMachine::new(path).await?;
        Ok(Arc::new(sm))
    }
}

#[tokio::test]
async fn test_custom_state_machine() -> Result<(), Error> {
    let builder = TestStateMachineBuilder::new();
    let sm = builder.build().await?;

    // Test basic functionality
    sm.start()?;
    assert!(sm.is_running());

    // Test apply_chunk with sample entries
    let entries = vec![create_test_entry(1, 1)];
    sm.apply_chunk(entries).await?;
    assert_eq!(sm.len(), 1);

    // Test snapshot operations
    let temp_dir = TempDir::new()?;
    let checksum = sm.generate_snapshot_data(temp_dir.path().to_path_buf(), LogId::new(1, 1)).await?;
    assert!(!checksum.is_zero());

    sm.stop()?;
    Ok(())
}

#[tokio::test]
async fn test_performance() -> Result<(), Error> {
    let builder = TestStateMachineBuilder::new();
    let sm = builder.build().await?;
    sm.start()?;

    // Performance test: apply 10,000 entries
    let start = std::time::Instant::now();
    let entries = (1..=10000)
        .map(|i| create_test_entry(i, i))
        .collect();

    sm.apply_chunk(entries).await?;
    let duration = start.elapsed();

    assert!(duration.as_millis() < 1000, "Should apply 10k entries in <1s");
    Ok(())
}

§5. Register with NodeBuilder

use d_engine::NodeBuilder;

let custom_sm = Arc::new(CustomStateMachine::new().await?);

NodeBuilder::new(config, shutdown_rx)
    .state_machine(custom_sm)  // Required component
    .build();

§6. Production Examples

See complete implementations in:

  • examples/rocksdb-cluster/src/main.rs - RocksDB state machine
  • examples/sled-cluster/src/main.rs - Sled state machine
  • src/storage/adaptors/ - Built-in implementations

Enable RocksDB feature in your Cargo.toml:

d-engine = { version = "0.1.4", features = ["rocksdb"] }