Module server_guide

Module server_guide 

Source
Expand description

§Implementing Custom Storage Engines

d-engine supports pluggable storage through the StorageEngine trait. This storage engine is responsible for persisting Raft log entries and metadata to durable storage.

§Architecture Context

  • The StorageEngine trait provides a generic interface for storage backends
  • Physical separation between log storage (LogStore) and metadata storage (MetaStore)
  • Built-in implementations include:
    • File-based: Production-ready file system storage
    • RocksDB: High-performance embedded database (requires features = ["rocksdb"])
    • Sled: Modern embedded database
    • In-memory: Volatile storage for testing
  • Custom implementations enable support for cloud storage, SQL databases, etc.

§1. Implement the Trait

use d_engine::storage::{StorageEngine, LogStore, MetaStore};
use async_trait::async_trait;

struct CustomLogStore;
struct CustomMetaStore;

#[async_trait]
impl LogStore for CustomLogStore {
    async fn persist_entries(&self, entries: Vec<Entry>) -> Result<(), Error> {
        // Your implementation - batch operations recommended
        Ok(())
    }

    async fn entry(&self, index: u64) -> Result<Option<Entry>, Error> {
        // Retrieve single entry
        Ok(None)
    }

    // Implement all required methods...
}

impl MetaStore for CustomMetaStore {
    fn save_hard_state(&self, state: &HardState) -> Result<(), Error> {
        // Persist hard state atomically
        Ok(())
    }

    fn load_hard_state(&self) -> Result<Option<HardState>, Error> {
        // Load persisted hard state
        Ok(None)
    }
}

// Combine log and metadata stores
struct CustomStorageEngine {
    log_store: Arc<CustomLogStore>,
    meta_store: Arc<CustomMetaStore>,
}

impl StorageEngine for CustomStorageEngine {
    type LogStore = CustomLogStore;
    type MetaStore = CustomMetaStore;

    fn log_store(&self) -> Arc<Self::LogStore> {
        self.log_store.clone()
    }

    fn meta_store(&self) -> Arc<Self::MetaStore> {
        self.meta_store.clone()
    }
}

§2. Key Implementation Notes

  • Atomicity: Ensure write operations are atomic—use batch operations where possible
  • Durability: Flush writes to persistent storage—implement flush() properly
  • Consistency: Maintain exactly-once semantics for log entries
  • Performance: Target >100k ops/sec for log persistence
  • Resource Management: Clean up resources in Drop implementation

§3. StorageEngine API Reference

§LogStore Methods

MethodPurposePerformance Target
persist_entries()Batch persist log entries>100k entries/sec
entry()Get single entry by index<1ms latency
get_entries()Get entries in range<1ms for 10k entries
purge()Remove logs up to index<100ms for 10k entries
truncate()Remove entries from index<100ms
flush()Sync writes to diskVaries by backend
reset()Clear all data<1s
last_index()Get highest persisted index<100μs

§MetaStore Methods

MethodPurposeCriticality
save_hard_state()Persist term/vote stateHigh - atomic required
load_hard_state()Load persisted stateHigh
flush()Sync metadata writesMedium

§4. Testing Your Implementation

Use the standardized test suite to ensure compatibility:

use d_engine::storage_engine_test::{StorageEngineBuilder, StorageEngineTestSuite};
use tempfile::TempDir;

struct CustomStorageEngineBuilder {
    temp_dir: TempDir,
}

#[async_trait]
impl StorageEngineBuilder for CustomStorageEngineBuilder {
    type Engine = CustomStorageEngine;

    async fn build(&self) -> Result<Arc<Self::Engine>, Error> {
        let path = self.temp_dir.path().join("storage");
        let engine = CustomStorageEngine::new(path).await?;
        Ok(Arc::new(engine))
    }

    async fn cleanup(&self) -> Result<(), Error> {
        Ok(()) // TempDir auto-cleans on drop
    }
}

#[tokio::test]
async fn test_custom_storage_engine() -> Result<(), Error> {
    let builder = CustomStorageEngineBuilder::new();
    StorageEngineTestSuite::run_all_tests(builder).await
}

#[tokio::test]
async fn test_performance() -> Result<(), Error> {
    let builder = CustomStorageEngineBuilder::new();
    let engine = builder.build().await?;
    let log_store = engine.log_store();

    // Performance test: persist 10,000 entries
    let start = std::time::Instant::now();
    let entries = (1..=10000)
        .map(|i| create_test_entry(i, i))
        .collect();

    log_store.persist_entries(entries).await?;
    let duration = start.elapsed();

    assert!(duration.as_millis() < 1000, "Should persist 10k entries in <1s");
    Ok(())
}

§5. Register with NodeBuilder

use d_engine::NodeBuilder;

let storage_engine = Arc::new(CustomStorageEngine::new().await?);

NodeBuilder::new(config, shutdown_rx)
    .storage_engine(storage_engine)  // Required component
    .build();

§6. Production Examples

Reference implementations available in:

  • src/storage/adaptors/rocksdb/ - RocksDB storage engine
  • src/storage/adaptors/sled/ - Sled storage engine
  • src/storage/adaptors/file/ - File-based storage
  • src/storage/adaptors/mem/ - In-memory storage

Enable RocksDB feature in your Cargo.toml:

d-engine = { version = "0.1.3", features = ["rocksdb"] }

§7. Performance Optimization

  • Batch Operations: Use batch writes for log persistence
  • Caching: Cache frequently accessed data (e.g., last index)
  • Concurrency: Use appropriate locking strategies
  • Compression: Consider compressing log entries for large deployments

See src/storage/adaptors/rocksdb/rocksdb_engine.rs for a production-grade implementation featuring:

  • Write batches for atomic operations
  • Log compaction to reclaim disk space
  • Efficient snapshot storage
  • Metrics integration

§Implementing Custom State Machines

d-engine supports pluggable state machines through the StateMachine trait. This is where your application’s core logic executes—processing committed log entries to update application state.

§Architecture Context

  • The StateMachine is the application logic layer that processes committed Raft log entries
  • The StateMachineHandler manages the lifecycle: applying entries, triggering snapshots, handling purges
  • Default implementations include file-based and RocksDB storage (requires features = ["rocksdb"])
  • Custom implementations enable specialized behaviors for key-value stores, document databases, etc.

§1. Implement the Trait

use d_engine::{StateMachine, Result, Entry, LogId, SnapshotMetadata};
use async_trait::async_trait;

struct CustomStateMachine {
    // Your storage backend
    backend: Arc<dyn ApplicationStorage>,
    last_applied: AtomicU64,
    snapshot_meta: Mutex<Option<SnapshotMetadata>>
}

#[async_trait]
impl StateMachine for CustomStateMachine {
    fn start(&self) -> Result<(), Error> {
        // Initialize your state machine
        Ok(())
    }

    fn stop(&self) -> Result<(), Error> {
        // Cleanup resources
        Ok(())
    }

    fn is_running(&self) -> bool {
        // Return running status
        true
    }

    fn get(&self, key_buffer: &[u8]) -> Result<Option<Vec<u8>>, Error> {
        // Retrieve value by key
        self.backend.get(key_buffer)
    }

    fn entry_term(&self, entry_id: u64) -> Option<u64> {
        // Return term for specific entry
        Some(1)
    }

    async fn apply_chunk(&self, chunk: Vec<Entry>) -> Result<()> {
        // Deserialize and process entries
        for entry in chunk {
            let cmd: AppCommand = bincode::deserialize(&entry.data)?;
            self.backend.execute(cmd)?;
        }
        Ok(())
    }

    fn len(&self) -> usize {
        // Return number of entries
        self.backend.count()
    }

    // Implement all required methods...
}

§2. Key Implementation Notes

  • Atomic Operations: Ensure apply_chunk() either fully applies or fails the entire batch
  • Idempotency: Handle duplicate entries safely—Raft may resend committed entries
  • Snapshot Isolation: apply_snapshot_from_file() must atomically replace state
  • Checksum Validation: Mandatory for snapshot integrity—validate before application
  • Concurrency Control: Use appropriate locking for state mutations
  • Resource Cleanup: Implement Drop to ensure proper flush on shutdown

§3. StateMachine API Reference

MethodPurposeSync/AsyncCriticality
start()Initialize state machine serviceSyncHigh
stop()Graceful shutdownSyncHigh
is_running()Check service statusSyncMedium
get()Read value by keySyncHigh
entry_term()Get term for log indexSyncMedium
apply_chunk()Apply committed entriesAsyncCritical
len()Get entry countSyncLow
is_empty()Check if emptySyncLow
update_last_applied()Update applied index in memorySyncHigh
last_applied()Get last applied indexSyncHigh
persist_last_applied()Persist applied indexSyncHigh
update_last_snapshot_metadata()Update snapshot metadata in memorySyncMedium
snapshot_metadata()Get current snapshot metadataSyncMedium
persist_last_snapshot_metadata()Persist snapshot metadataSyncMedium
apply_snapshot_from_file()Replace state with snapshotAsyncCritical
generate_snapshot_data()Create new snapshotAsyncHigh
save_hard_state()Persist term/vote stateSyncHigh
flush()Sync writes to storageSyncHigh
flush_async()Async flushAsyncHigh
reset()Reset to initial stateAsyncMedium

§4. Testing Your Implementation

Use the built-in test patterns from d-engine’s test suite:

use d_engine::{StateMachine, Error};
use tempfile::TempDir;

struct TestStateMachineBuilder {
    temp_dir: TempDir,
}

impl TestStateMachineBuilder {
    async fn build(&self) -> Result<Arc<CustomStateMachine>, Error> {
        let path = self.temp_dir.path().join("sm_test");
        let sm = CustomStateMachine::new(path).await?;
        Ok(Arc::new(sm))
    }
}

#[tokio::test]
async fn test_custom_state_machine() -> Result<(), Error> {
    let builder = TestStateMachineBuilder::new();
    let sm = builder.build().await?;

    // Test basic functionality
    sm.start()?;
    assert!(sm.is_running());

    // Test apply_chunk with sample entries
    let entries = vec![create_test_entry(1, 1)];
    sm.apply_chunk(entries).await?;
    assert_eq!(sm.len(), 1);

    // Test snapshot operations
    let temp_dir = TempDir::new()?;
    let checksum = sm.generate_snapshot_data(temp_dir.path().to_path_buf(), LogId::new(1, 1)).await?;
    assert!(!checksum.is_zero());

    sm.stop()?;
    Ok(())
}

#[tokio::test]
async fn test_performance() -> Result<(), Error> {
    let builder = TestStateMachineBuilder::new();
    let sm = builder.build().await?;
    sm.start()?;

    // Performance test: apply 10,000 entries
    let start = std::time::Instant::now();
    let entries = (1..=10000)
        .map(|i| create_test_entry(i, i))
        .collect();

    sm.apply_chunk(entries).await?;
    let duration = start.elapsed();

    assert!(duration.as_millis() < 1000, "Should apply 10k entries in <1s");
    Ok(())
}

§5. Register with NodeBuilder

use d_engine::NodeBuilder;

let custom_sm = Arc::new(CustomStateMachine::new().await?);

NodeBuilder::new(config, shutdown_rx)
    .state_machine(custom_sm)  // Required component
    .build();

§6. Production Examples

See complete implementations in:

  • examples/rocksdb-cluster/src/main.rs - RocksDB state machine
  • examples/sled-cluster/src/main.rs - Sled state machine
  • src/storage/adaptors/ - Built-in implementations

Enable RocksDB feature in your Cargo.toml:

d-engine = { version = "0.1.3", features = ["rocksdb"] }