pub struct EmbeddedEngine { /* private fields */ }Expand description
Embedded d-engine with automatic lifecycle management.
Thread-safe: Clone and share across threads freely.
All methods use &self - safe to call from multiple contexts.
Provides high-level KV API for embedded usage:
start()/start_with()- Initialize and spawn nodewait_ready()- Wait for leader electionclient()- Get embedded clientstop()- Graceful shutdown
§Example
use d_engine::EmbeddedEngine;
use std::time::Duration;
let engine = EmbeddedEngine::start("./data/my-app").await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;
engine.stop().await?;Implementations§
Source§impl EmbeddedEngine
impl EmbeddedEngine
Sourcepub async fn start(data_dir: impl AsRef<Path>) -> Result<EmbeddedEngine, Error>
pub async fn start(data_dir: impl AsRef<Path>) -> Result<EmbeddedEngine, Error>
Start engine with an explicit data directory.
data_dir has highest priority and always overrides cluster.db_root_dir from
CONFIG_PATH or RAFT__ environment variables. Other configuration (network,
Raft timeouts, cluster topology) is still read from those sources if set.
The directory is created automatically if it does not exist. If it already contains data the engine opens it in place (idempotent).
§Example
// Minimal — just supply a path
let engine = EmbeddedEngine::start("./data/my-app").await?;
engine.wait_ready(Duration::from_secs(5)).await?;
// Works with any AsRef<Path>
let engine = EmbeddedEngine::start(std::path::Path::new("/var/lib/my-app")).await?;Sourcepub async fn start_with(config_path: &str) -> Result<EmbeddedEngine, Error>
pub async fn start_with(config_path: &str) -> Result<EmbeddedEngine, Error>
Start engine with explicit configuration file.
Reads configuration from specified file path.
Data directory is determined by config’s cluster.db_root_dir setting.
§Arguments
config_path: Path to configuration file (e.g. “d-engine.toml”)
§Example
let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
engine.wait_ready(Duration::from_secs(5)).await?;Sourcepub async fn start_custom<SE, SM>(
storage_engine: Arc<SE>,
state_machine: Arc<SM>,
config_path: Option<&str>,
) -> Result<EmbeddedEngine, Error>
pub async fn start_custom<SE, SM>( storage_engine: Arc<SE>, state_machine: Arc<SM>, config_path: Option<&str>, ) -> Result<EmbeddedEngine, Error>
Start engine with custom storage and state machine.
Advanced API for users providing custom storage implementations.
§Arguments
config_path: Optional path to configuration filestorage_engine: Custom storage engine implementationstate_machine: Custom state machine implementation
§Example
let storage = Arc::new(MyCustomStorage::new()?);
let sm = Arc::new(MyCustomStateMachine::new()?);
let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;Sourcepub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo, Error>
pub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo, Error>
Wait until the cluster is ready to serve requests.
Blocks until a leader has been elected and its no-op entry is committed by a majority of nodes (Raft §8). Only at this point is the leader guaranteed to be aware of all previously committed entries and safe to serve reads and writes.
Event-driven notification (no polling), <1ms latency after noop commit.
§Timeout Guidelines
Single-node mode (most common for development):
- Typical: <100ms (near-instant election)
- Recommended:
Duration::from_secs(3)
Multi-node HA cluster (production):
- Typical: 1-3s (depends on network latency and
general_raft_timeout_duration_in_ms) - Recommended:
Duration::from_secs(10)
Special cases:
- Health checks:
Duration::from_secs(3)(fail fast if cluster unhealthy) - Startup scripts:
Duration::from_secs(30)(allow time for cluster stabilization) - Development/testing:
Duration::from_secs(5)(balance between speed and reliability)
§Returns
Ok(LeaderInfo)- Leader elected successfullyErr(...)- Timeout or cluster unavailable
§Example
// Single-node development
let engine = EmbeddedEngine::start("./data/my-app").await?;
let leader = engine.wait_ready(Duration::from_secs(3)).await?;
// Multi-node production
let engine = EmbeddedEngine::start_with("cluster.toml").await?;
let leader = engine.wait_ready(Duration::from_secs(10)).await?;
println!("Leader elected: {} (term {})", leader.leader_id, leader.term);Sourcepub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
pub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>
Subscribe to leader change notifications.
Returns a receiver that will be notified whenever:
- First leader is elected
- Leader changes (re-election)
- No leader exists (during election)
§Performance
Event-driven notification (no polling), <1ms latency
§Example
let mut leader_rx = engine.leader_change_notifier();
tokio::spawn(async move {
while leader_rx.changed().await.is_ok() {
match leader_rx.borrow().as_ref() {
Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
None => println!("No leader"),
}
}
});Sourcepub fn watch_membership(&self) -> Receiver<MembershipSnapshot>
pub fn watch_membership(&self) -> Receiver<MembershipSnapshot>
Subscribe to committed membership change notifications.
Returns a watch::Receiver that fires whenever a ConfChange entry
(node join, removal, or learner promotion) is committed by a majority.
All nodes — leader, follower, and learner — fire the notification because
every node walks the same CommitHandler::apply_config_change() path.
The first borrow() returns the current membership state immediately
without waiting for a change.
§Distinguishing this from other notifiers
Self::leader_change_notifier: fires on leader election changes, not membership changesSelf::wait_ready: resolves when a leader is elected and the cluster is ready, not membership changes
§Usage
let mut rx = engine.watch_membership();
while rx.changed().await.is_ok() {
let snapshot = rx.borrow_and_update().clone();
// snapshot.members — current voters
// snapshot.learners — current non-voting learners
// snapshot.committed_index — idempotency key
scheduler.on_membership_changed(snapshot).await;
}Sourcepub fn is_leader(&self) -> bool
pub fn is_leader(&self) -> bool
Returns true if the current node is the Raft leader.
§Use Cases
- Load balancer health checks (e.g. HAProxy
/primaryendpoint returning HTTP 200/503) - Prevent write requests to followers before they fail
- Application-level request routing decisions
§Performance
Zero-cost operation (reads cached leader state from watch channel)
§Example
// Load balancer health check endpoint
#[get("/primary")]
async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
if engine.is_leader() {
StatusCode::OK // Routes writes here
} else {
StatusCode::SERVICE_UNAVAILABLE
}
}
// Application request handler
if engine.is_leader() {
client.put(key, value).await?;
} else {
return Err("Not leader, write rejected");
}Sourcepub fn leader_info(&self) -> Option<LeaderInfo>
pub fn leader_info(&self) -> Option<LeaderInfo>
Returns current leader information if available.
§Returns
Some(LeaderInfo)if a leader is elected (includes leader_id and term)Noneif no leader exists (during election or network partition)
§Use Cases
- Monitoring dashboards showing cluster state
- Debugging leader election issues
- Logging cluster topology changes
§Example
if let Some(info) = engine.leader_info() {
println!("Leader: {} (term {})", info.leader_id, info.term);
} else {
println!("No leader elected, cluster unavailable");
}Sourcepub fn client(&self) -> Arc<EmbeddedClient>
pub fn client(&self) -> Arc<EmbeddedClient>
Get a reference to the local KV client.
The client is available immediately after start(),
but requests will only succeed after wait_ready() completes.
§Example
let engine = EmbeddedEngine::start("./data/my-app").await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;Sourcepub async fn stop(&self) -> Result<(), Error>
pub async fn stop(&self) -> Result<(), Error>
Stop the embedded d-engine gracefully (idempotent).
This method:
- Sends shutdown signal to node
- Waits for node.run() to complete
- Propagates any errors from node execution
Safe to call multiple times - subsequent calls are no-ops.
§Errors
Returns error if node encountered issues during shutdown.
§Example
engine.stop().await?;
engine.stop().await?; // No-op, returns Ok(())Sourcepub async fn is_stopped(&self) -> bool
pub async fn is_stopped(&self) -> bool
Trait Implementations§
Source§impl Clone for EmbeddedEngine
impl Clone for EmbeddedEngine
Source§fn clone(&self) -> EmbeddedEngine
fn clone(&self) -> EmbeddedEngine
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Drop for EmbeddedEngine
impl Drop for EmbeddedEngine
Auto Trait Implementations§
impl Freeze for EmbeddedEngine
impl !RefUnwindSafe for EmbeddedEngine
impl Send for EmbeddedEngine
impl Sync for EmbeddedEngine
impl Unpin for EmbeddedEngine
impl UnsafeUnpin for EmbeddedEngine
impl !UnwindSafe for EmbeddedEngine
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request