pub struct EmbeddedEngine { /* private fields */ }Expand description
Embedded d-engine with automatic lifecycle management.
Provides high-level KV API for embedded usage:
start()/start_with()- Initialize and spawn nodewait_ready()- Wait for leader electionclient()- Get local KV clientstop()- Graceful shutdown
§Example
use d_engine::EmbeddedEngine;
use std::time::Duration;
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;
engine.stop().await?;Implementations§
Source§impl EmbeddedEngine
impl EmbeddedEngine
Sourcepub async fn start() -> Result<EmbeddedEngine, Error>
pub async fn start() -> Result<EmbeddedEngine, Error>
Start engine with configuration from environment.
Reads CONFIG_PATH environment variable or uses default configuration.
Data directory is determined by config’s cluster.db_root_dir setting.
§Example
// Set config path via environment variable
std::env::set_var("CONFIG_PATH", "/etc/d-engine/production.toml");
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;Sourcepub async fn start_with(config_path: &str) -> Result<EmbeddedEngine, Error>
pub async fn start_with(config_path: &str) -> Result<EmbeddedEngine, Error>
Start engine with explicit configuration file.
Reads configuration from specified file path.
Data directory is determined by config’s cluster.db_root_dir setting.
§Arguments
config_path: Path to configuration file (e.g. “d-engine.toml”)
§Example
let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
engine.wait_ready(Duration::from_secs(5)).await?;Sourcepub async fn start_custom<SE, SM>(
storage_engine: Arc<SE>,
state_machine: Arc<SM>,
config_path: Option<&str>,
) -> Result<EmbeddedEngine, Error>
pub async fn start_custom<SE, SM>( storage_engine: Arc<SE>, state_machine: Arc<SM>, config_path: Option<&str>, ) -> Result<EmbeddedEngine, Error>
Start engine with custom storage and state machine.
Advanced API for users providing custom storage implementations.
§Arguments
config_path: Optional path to configuration filestorage_engine: Custom storage engine implementationstate_machine: Custom state machine implementation
§Example
let storage = Arc::new(MyCustomStorage::new()?);
let sm = Arc::new(MyCustomStateMachine::new()?);
let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;Sourcepub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo, Error>
pub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo, Error>
Wait for leader election to complete.
Blocks until a leader has been elected in the cluster. Event-driven notification (no polling), <1ms latency.
§Timeout Guidelines
Single-node mode (most common for development):
- Typical: <100ms (near-instant election)
- Recommended:
Duration::from_secs(3)
Multi-node HA cluster (production):
- Typical: 1-3s (depends on network latency and
general_raft_timeout_duration_in_ms) - Recommended:
Duration::from_secs(10)
Special cases:
- Health checks:
Duration::from_secs(3)(fail fast if cluster unhealthy) - Startup scripts:
Duration::from_secs(30)(allow time for cluster stabilization) - Development/testing:
Duration::from_secs(5)(balance between speed and reliability)
§Returns
Ok(LeaderInfo)- Leader elected successfullyErr(...)- Timeout or cluster unavailable
§Example
// Single-node development
let engine = EmbeddedEngine::start().await?;
let leader = engine.wait_ready(Duration::from_secs(3)).await?;
// Multi-node production
let engine = EmbeddedEngine::start_with("cluster.toml").await?;
let leader = engine.wait_ready(Duration::from_secs(10)).await?;
println!("Leader elected: {} (term {})", leader.leader_id, leader.term);Sourcepub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
pub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
Subscribe to leader change notifications.
Returns a receiver that will be notified whenever:
- First leader is elected
- Leader changes (re-election)
- No leader exists (during election)
§Performance
Event-driven notification (no polling), <1ms latency
§Example
let mut leader_rx = engine.leader_change_notifier();
tokio::spawn(async move {
while leader_rx.changed().await.is_ok() {
match leader_rx.borrow().as_ref() {
Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
None => println!("No leader"),
}
}
});Sourcepub fn is_leader(&self) -> bool
pub fn is_leader(&self) -> bool
Returns true if the current node is the Raft leader.
§Use Cases
- Load balancer health checks (e.g. HAProxy
/primaryendpoint returning HTTP 200/503) - Prevent write requests to followers before they fail
- Application-level request routing decisions
§Performance
Zero-cost operation (reads cached leader state from watch channel)
§Example
// Load balancer health check endpoint
#[get("/primary")]
async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
if engine.is_leader() {
StatusCode::OK // Routes writes here
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
// Application request handler
if engine.is_leader() {
client.put(key, value).await?;
} else {
return Err("Not leader, write rejected");
}Sourcepub fn leader_info(&self) -> Option<LeaderInfo>
pub fn leader_info(&self) -> Option<LeaderInfo>
Returns current leader information if available.
§Returns
Some(LeaderInfo)if a leader is elected (includes leader_id and term)Noneif no leader exists (during election or network partition)
§Use Cases
- Monitoring dashboards showing cluster state
- Debugging leader election issues
- Logging cluster topology changes
§Example
if let Some(info) = engine.leader_info() {
println!("Leader: {} (term {})", info.leader_id, info.term);
} else {
println!("No leader elected, cluster unavailable");
}Sourcepub fn client(&self) -> &LocalKvClient
pub fn client(&self) -> &LocalKvClient
Get a reference to the local KV client.
The client is available immediately after start(),
but requests will only succeed after wait_ready() completes.
§Example
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;Sourcepub fn watch(&self, key: impl AsRef<[u8]>) -> Result<WatcherHandle, Error>
pub fn watch(&self, key: impl AsRef<[u8]>) -> Result<WatcherHandle, Error>
Register a watcher for a specific key.
Returns a handle that receives watch events via an mpsc channel. The watcher is automatically unregistered when the handle is dropped.
§Arguments
key- The exact key to watch
§Returns
Result<WatcherHandle>- Handle for receiving events
§Example
let engine = EmbeddedEngine::start().await?;
let mut handle = engine.watch(b"mykey")?;
while let Some(event) = handle.receiver_mut().recv().await {
println!("Key changed: {:?}", event);
}Trait Implementations§
Auto Trait Implementations§
impl Freeze for EmbeddedEngine
impl !RefUnwindSafe for EmbeddedEngine
impl Send for EmbeddedEngine
impl Sync for EmbeddedEngine
impl Unpin for EmbeddedEngine
impl !UnwindSafe for EmbeddedEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request