Expand description
§Implementing Custom Storage Engines
d-engine supports pluggable storage through the StorageEngine
trait. This storage engine is responsible for persisting Raft log entries and metadata to durable storage.
§Architecture Context
- The StorageEngine trait provides a generic interface for storage backends
- Physical separation between log storage (
LogStore
) and metadata storage (MetaStore
) - Built-in implementations include:
- File-based: Production-ready file system storage
- RocksDB: High-performance embedded database (requires
features = ["rocksdb"]
) - Sled: Modern embedded database
- In-memory: Volatile storage for testing
- Custom implementations enable support for cloud storage, SQL databases, etc.
§1. Implement the Trait
ⓘ
use d_engine::storage::{StorageEngine, LogStore, MetaStore};
use async_trait::async_trait;
struct CustomLogStore;
struct CustomMetaStore;
#[async_trait]
impl LogStore for CustomLogStore {
async fn persist_entries(&self, entries: Vec<Entry>) -> Result<(), Error> {
// Your implementation - batch operations recommended
Ok(())
}
async fn entry(&self, index: u64) -> Result<Option<Entry>, Error> {
// Retrieve single entry
Ok(None)
}
// Implement all required methods...
}
impl MetaStore for CustomMetaStore {
fn save_hard_state(&self, state: &HardState) -> Result<(), Error> {
// Persist hard state atomically
Ok(())
}
fn load_hard_state(&self) -> Result<Option<HardState>, Error> {
// Load persisted hard state
Ok(None)
}
}
// Combine log and metadata stores
struct CustomStorageEngine {
log_store: Arc<CustomLogStore>,
meta_store: Arc<CustomMetaStore>,
}
impl StorageEngine for CustomStorageEngine {
type LogStore = CustomLogStore;
type MetaStore = CustomMetaStore;
fn log_store(&self) -> Arc<Self::LogStore> {
self.log_store.clone()
}
fn meta_store(&self) -> Arc<Self::MetaStore> {
self.meta_store.clone()
}
}
§2. Key Implementation Notes
- Atomicity: Ensure write operations are atomic—use batch operations where possible
- Durability: Flush writes to persistent storage—implement
flush()
properly - Consistency: Maintain exactly-once semantics for log entries
- Performance: Target >100k ops/sec for log persistence
- Resource Management: Clean up resources in
Drop
implementation
§3. StorageEngine API Reference
§LogStore Methods
Method | Purpose | Performance Target |
---|---|---|
persist_entries() | Batch persist log entries | >100k entries/sec |
entry() | Get single entry by index | <1ms latency |
get_entries() | Get entries in range | <1ms for 10k entries |
purge() | Remove logs up to index | <100ms for 10k entries |
truncate() | Remove entries from index | <100ms |
flush() | Sync writes to disk | Varies by backend |
reset() | Clear all data | <1s |
last_index() | Get highest persisted index | <100μs |
§MetaStore Methods
Method | Purpose | Criticality |
---|---|---|
save_hard_state() | Persist term/vote state | High - atomic required |
load_hard_state() | Load persisted state | High |
flush() | Sync metadata writes | Medium |
§4. Testing Your Implementation
Use the standardized test suite to ensure compatibility:
ⓘ
use d_engine::storage_engine_test::{StorageEngineBuilder, StorageEngineTestSuite};
use tempfile::TempDir;
struct CustomStorageEngineBuilder {
temp_dir: TempDir,
}
#[async_trait]
impl StorageEngineBuilder for CustomStorageEngineBuilder {
type Engine = CustomStorageEngine;
async fn build(&self) -> Result<Arc<Self::Engine>, Error> {
let path = self.temp_dir.path().join("storage");
let engine = CustomStorageEngine::new(path).await?;
Ok(Arc::new(engine))
}
async fn cleanup(&self) -> Result<(), Error> {
Ok(()) // TempDir auto-cleans on drop
}
}
#[tokio::test]
async fn test_custom_storage_engine() -> Result<(), Error> {
let builder = CustomStorageEngineBuilder::new();
StorageEngineTestSuite::run_all_tests(builder).await
}
#[tokio::test]
async fn test_performance() -> Result<(), Error> {
let builder = CustomStorageEngineBuilder::new();
let engine = builder.build().await?;
let log_store = engine.log_store();
// Performance test: persist 10,000 entries
let start = std::time::Instant::now();
let entries = (1..=10000)
.map(|i| create_test_entry(i, i))
.collect();
log_store.persist_entries(entries).await?;
let duration = start.elapsed();
assert!(duration.as_millis() < 1000, "Should persist 10k entries in <1s");
Ok(())
}
§5. Register with NodeBuilder
ⓘ
use d_engine::NodeBuilder;
let storage_engine = Arc::new(CustomStorageEngine::new().await?);
NodeBuilder::new(config, shutdown_rx)
.storage_engine(storage_engine) // Required component
.build();
§6. Production Examples
Reference implementations available in:
src/storage/adaptors/rocksdb/
- RocksDB storage enginesrc/storage/adaptors/sled/
- Sled storage enginesrc/storage/adaptors/file/
- File-based storagesrc/storage/adaptors/mem/
- In-memory storage
Enable RocksDB feature in your Cargo.toml
:
d-engine = { version = "0.1.3", features = ["rocksdb"] }
§7. Performance Optimization
- Batch Operations: Use batch writes for log persistence
- Caching: Cache frequently accessed data (e.g., last index)
- Concurrency: Use appropriate locking strategies
- Compression: Consider compressing log entries for large deployments
See src/storage/adaptors/rocksdb/rocksdb_engine.rs
for a production-grade implementation featuring:
- Write batches for atomic operations
- Log compaction to reclaim disk space
- Efficient snapshot storage
- Metrics integration
§Implementing Custom State Machines
d-engine supports pluggable state machines through the StateMachine
trait. This is where your application’s core logic executes—processing committed log entries to update application state.
§Architecture Context
- The StateMachine is the application logic layer that processes committed Raft log entries
- The StateMachineHandler manages the lifecycle: applying entries, triggering snapshots, handling purges
- Default implementations include file-based and RocksDB storage (requires
features = ["rocksdb"]
) - Custom implementations enable specialized behaviors for key-value stores, document databases, etc.
§1. Implement the Trait
ⓘ
use d_engine::{StateMachine, Result, Entry, LogId, SnapshotMetadata};
use async_trait::async_trait;
struct CustomStateMachine {
// Your storage backend
backend: Arc<dyn ApplicationStorage>,
last_applied: AtomicU64,
snapshot_meta: Mutex<Option<SnapshotMetadata>>
}
#[async_trait]
impl StateMachine for CustomStateMachine {
fn start(&self) -> Result<(), Error> {
// Initialize your state machine
Ok(())
}
fn stop(&self) -> Result<(), Error> {
// Cleanup resources
Ok(())
}
fn is_running(&self) -> bool {
// Return running status
true
}
fn get(&self, key_buffer: &[u8]) -> Result<Option<Vec<u8>>, Error> {
// Retrieve value by key
self.backend.get(key_buffer)
}
fn entry_term(&self, entry_id: u64) -> Option<u64> {
// Return term for specific entry
Some(1)
}
async fn apply_chunk(&self, chunk: Vec<Entry>) -> Result<()> {
// Deserialize and process entries
for entry in chunk {
let cmd: AppCommand = bincode::deserialize(&entry.data)?;
self.backend.execute(cmd)?;
}
Ok(())
}
fn len(&self) -> usize {
// Return number of entries
self.backend.count()
}
// Implement all required methods...
}
§2. Key Implementation Notes
- Atomic Operations: Ensure
apply_chunk()
either fully applies or fails the entire batch - Idempotency: Handle duplicate entries safely—Raft may resend committed entries
- Snapshot Isolation:
apply_snapshot_from_file()
must atomically replace state - Checksum Validation: Mandatory for snapshot integrity—validate before application
- Concurrency Control: Use appropriate locking for state mutations
- Resource Cleanup: Implement
Drop
to ensure proper flush on shutdown
§3. StateMachine API Reference
Method | Purpose | Sync/Async | Criticality |
---|---|---|---|
start() | Initialize state machine service | Sync | High |
stop() | Graceful shutdown | Sync | High |
is_running() | Check service status | Sync | Medium |
get() | Read value by key | Sync | High |
entry_term() | Get term for log index | Sync | Medium |
apply_chunk() | Apply committed entries | Async | Critical |
len() | Get entry count | Sync | Low |
is_empty() | Check if empty | Sync | Low |
update_last_applied() | Update applied index in memory | Sync | High |
last_applied() | Get last applied index | Sync | High |
persist_last_applied() | Persist applied index | Sync | High |
update_last_snapshot_metadata() | Update snapshot metadata in memory | Sync | Medium |
snapshot_metadata() | Get current snapshot metadata | Sync | Medium |
persist_last_snapshot_metadata() | Persist snapshot metadata | Sync | Medium |
apply_snapshot_from_file() | Replace state with snapshot | Async | Critical |
generate_snapshot_data() | Create new snapshot | Async | High |
save_hard_state() | Persist term/vote state | Sync | High |
flush() | Sync writes to storage | Sync | High |
flush_async() | Async flush | Async | High |
reset() | Reset to initial state | Async | Medium |
§4. Testing Your Implementation
Use the built-in test patterns from d-engine’s test suite:
ⓘ
use d_engine::{StateMachine, Error};
use tempfile::TempDir;
struct TestStateMachineBuilder {
temp_dir: TempDir,
}
impl TestStateMachineBuilder {
async fn build(&self) -> Result<Arc<CustomStateMachine>, Error> {
let path = self.temp_dir.path().join("sm_test");
let sm = CustomStateMachine::new(path).await?;
Ok(Arc::new(sm))
}
}
#[tokio::test]
async fn test_custom_state_machine() -> Result<(), Error> {
let builder = TestStateMachineBuilder::new();
let sm = builder.build().await?;
// Test basic functionality
sm.start()?;
assert!(sm.is_running());
// Test apply_chunk with sample entries
let entries = vec![create_test_entry(1, 1)];
sm.apply_chunk(entries).await?;
assert_eq!(sm.len(), 1);
// Test snapshot operations
let temp_dir = TempDir::new()?;
let checksum = sm.generate_snapshot_data(temp_dir.path().to_path_buf(), LogId::new(1, 1)).await?;
assert!(!checksum.is_zero());
sm.stop()?;
Ok(())
}
#[tokio::test]
async fn test_performance() -> Result<(), Error> {
let builder = TestStateMachineBuilder::new();
let sm = builder.build().await?;
sm.start()?;
// Performance test: apply 10,000 entries
let start = std::time::Instant::now();
let entries = (1..=10000)
.map(|i| create_test_entry(i, i))
.collect();
sm.apply_chunk(entries).await?;
let duration = start.elapsed();
assert!(duration.as_millis() < 1000, "Should apply 10k entries in <1s");
Ok(())
}
§5. Register with NodeBuilder
ⓘ
use d_engine::NodeBuilder;
let custom_sm = Arc::new(CustomStateMachine::new().await?);
NodeBuilder::new(config, shutdown_rx)
.state_machine(custom_sm) // Required component
.build();
§6. Production Examples
See complete implementations in:
examples/rocksdb-cluster/src/main.rs
- RocksDB state machineexamples/sled-cluster/src/main.rs
- Sled state machinesrc/storage/adaptors/
- Built-in implementations
Enable RocksDB feature in your Cargo.toml
:
d-engine = { version = "0.1.3", features = ["rocksdb"] }