pub struct EmbeddedEngine { /* private fields */ }Expand description
Embedded d-engine with automatic lifecycle management.
Thread-safe: Clone and share across threads freely.
All methods use &self - safe to call from multiple contexts.
Provides high-level KV API for embedded usage:
start()/start_with()- Initialize and spawn nodewait_ready()- Wait for leader electionclient()- Get embedded clientstop()- Graceful shutdown
§Example
use d_engine::EmbeddedEngine;
use std::time::Duration;
let engine = EmbeddedEngine::start().await?; // Returns Arc<EmbeddedEngine>
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;
engine.stop().await?;Implementations§
Source§impl EmbeddedEngine
impl EmbeddedEngine
Sourcepub async fn start() -> Result<Self>
pub async fn start() -> Result<Self>
Start engine with configuration from environment.
Reads CONFIG_PATH environment variable or uses default configuration.
Data directory is determined by config’s cluster.db_root_dir setting.
§Example
// Set config path via environment variable
std::env::set_var("CONFIG_PATH", "/etc/d-engine/production.toml");
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;Sourcepub async fn start_with(config_path: &str) -> Result<Self>
pub async fn start_with(config_path: &str) -> Result<Self>
Start engine with explicit configuration file.
Reads configuration from specified file path.
Data directory is determined by config’s cluster.db_root_dir setting.
§Arguments
config_path: Path to configuration file (e.g. “d-engine.toml”)
§Example
let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
engine.wait_ready(Duration::from_secs(5)).await?;Sourcepub async fn start_custom<SE, SM>(
storage_engine: Arc<SE>,
state_machine: Arc<SM>,
config_path: Option<&str>,
) -> Result<Self>
pub async fn start_custom<SE, SM>( storage_engine: Arc<SE>, state_machine: Arc<SM>, config_path: Option<&str>, ) -> Result<Self>
Start engine with custom storage and state machine.
Advanced API for users providing custom storage implementations.
§Arguments
config_path: Optional path to configuration filestorage_engine: Custom storage engine implementationstate_machine: Custom state machine implementation
§Example
let storage = Arc::new(MyCustomStorage::new()?);
let sm = Arc::new(MyCustomStateMachine::new()?);
let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;Sourcepub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo>
pub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo>
Wait until the cluster is ready to serve requests.
Blocks until a leader has been elected and its no-op entry is committed by a majority of nodes (Raft §8). Only at this point is the leader guaranteed to be aware of all previously committed entries and safe to serve reads and writes.
Event-driven notification (no polling), <1ms latency after noop commit.
§Timeout Guidelines
Single-node mode (most common for development):
- Typical: <100ms (near-instant election)
- Recommended:
Duration::from_secs(3)
Multi-node HA cluster (production):
- Typical: 1-3s (depends on network latency and
general_raft_timeout_duration_in_ms) - Recommended:
Duration::from_secs(10)
Special cases:
- Health checks:
Duration::from_secs(3)(fail fast if cluster unhealthy) - Startup scripts:
Duration::from_secs(30)(allow time for cluster stabilization) - Development/testing:
Duration::from_secs(5)(balance between speed and reliability)
§Returns
Ok(LeaderInfo)- Leader elected successfullyErr(...)- Timeout or cluster unavailable
§Example
// Single-node development
let engine = EmbeddedEngine::start().await?;
let leader = engine.wait_ready(Duration::from_secs(3)).await?;
// Multi-node production
let engine = EmbeddedEngine::start_with("cluster.toml").await?;
let leader = engine.wait_ready(Duration::from_secs(10)).await?;
println!("Leader elected: {} (term {})", leader.leader_id, leader.term);Sourcepub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
pub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
Subscribe to leader change notifications.
Returns a receiver that will be notified whenever:
- First leader is elected
- Leader changes (re-election)
- No leader exists (during election)
§Performance
Event-driven notification (no polling), <1ms latency
§Example
let mut leader_rx = engine.leader_change_notifier();
tokio::spawn(async move {
while leader_rx.changed().await.is_ok() {
match leader_rx.borrow().as_ref() {
Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
None => println!("No leader"),
}
}
});Sourcepub fn is_leader(&self) -> bool
pub fn is_leader(&self) -> bool
Returns true if the current node is the Raft leader.
§Use Cases
- Load balancer health checks (e.g. HAProxy
/primaryendpoint returning HTTP 200/503) - Prevent write requests to followers before they fail
- Application-level request routing decisions
§Performance
Zero-cost operation (reads cached leader state from watch channel)
§Example
// Load balancer health check endpoint
#[get("/primary")]
async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
if engine.is_leader() {
StatusCode::OK // Routes writes here
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
// Application request handler
if engine.is_leader() {
client.put(key, value).await?;
} else {
return Err("Not leader, write rejected");
}Sourcepub fn leader_info(&self) -> Option<LeaderInfo>
pub fn leader_info(&self) -> Option<LeaderInfo>
Returns current leader information if available.
§Returns
Some(LeaderInfo)if a leader is elected (includes leader_id and term)Noneif no leader exists (during election or network partition)
§Use Cases
- Monitoring dashboards showing cluster state
- Debugging leader election issues
- Logging cluster topology changes
§Example
if let Some(info) = engine.leader_info() {
println!("Leader: {} (term {})", info.leader_id, info.term);
} else {
println!("No leader elected, cluster unavailable");
}Sourcepub fn client(&self) -> Arc<EmbeddedClient>
pub fn client(&self) -> Arc<EmbeddedClient>
Get a reference to the local KV client.
The client is available immediately after start(),
but requests will only succeed after wait_ready() completes.
§Example
let engine = EmbeddedEngine::start().await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;Sourcepub async fn stop(&self) -> Result<()>
pub async fn stop(&self) -> Result<()>
Gracefully stop the embedded engine. Stop the embedded d-engine gracefully (idempotent).
This method:
- Sends shutdown signal to node
- Waits for node.run() to complete
- Propagates any errors from node execution
Safe to call multiple times - subsequent calls are no-ops.
§Errors
Returns error if node encountered issues during shutdown.
§Example
engine.stop().await?;
engine.stop().await?; // No-op, returns Ok(())Sourcepub async fn is_stopped(&self) -> bool
pub async fn is_stopped(&self) -> bool
Trait Implementations§
Source§impl Clone for EmbeddedEngine
impl Clone for EmbeddedEngine
Source§fn clone(&self) -> EmbeddedEngine
fn clone(&self) -> EmbeddedEngine
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for EmbeddedEngine
impl !RefUnwindSafe for EmbeddedEngine
impl Send for EmbeddedEngine
impl Sync for EmbeddedEngine
impl Unpin for EmbeddedEngine
impl UnsafeUnpin for EmbeddedEngine
impl !UnwindSafe for EmbeddedEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request