Skip to main content

EmbeddedEngine

Struct EmbeddedEngine 

Source
pub struct EmbeddedEngine { /* private fields */ }
Expand description

Embedded d-engine with automatic lifecycle management.

Thread-safe: Clone and share across threads freely. All methods use &self - safe to call from multiple contexts.

Provides high-level KV API for embedded usage:

  • start() / start_with() - Initialize and spawn node
  • wait_ready() - Wait for leader election
  • client() - Get embedded client
  • stop() - Graceful shutdown

§Example

use d_engine::EmbeddedEngine;
use std::time::Duration;

let engine = EmbeddedEngine::start("./data/my-app").await?;
engine.wait_ready(Duration::from_secs(5)).await?;

let client = engine.client();
client.put(b"key", b"value").await?;

engine.stop().await?;

Implementations§

Source§

impl EmbeddedEngine

Source

pub async fn start(data_dir: impl AsRef<Path>) -> Result<EmbeddedEngine, Error>

Start engine with an explicit data directory.

data_dir has highest priority and always overrides cluster.db_root_dir from CONFIG_PATH or RAFT__ environment variables. Other configuration (network, Raft timeouts, cluster topology) is still read from those sources if set.

The directory is created automatically if it does not exist. If it already contains data the engine opens it in place (idempotent).

§Example
// Minimal — just supply a path
let engine = EmbeddedEngine::start("./data/my-app").await?;
engine.wait_ready(Duration::from_secs(5)).await?;

// Works with any AsRef<Path>
let engine = EmbeddedEngine::start(std::path::Path::new("/var/lib/my-app")).await?;
Source

pub async fn start_with(config_path: &str) -> Result<EmbeddedEngine, Error>

Start engine with explicit configuration file.

Reads configuration from specified file path. Data directory is determined by config’s cluster.db_root_dir setting.

§Arguments
  • config_path: Path to configuration file (e.g. “d-engine.toml”)
§Example
let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
engine.wait_ready(Duration::from_secs(5)).await?;
Source

pub async fn start_custom<SE, SM>( storage_engine: Arc<SE>, state_machine: Arc<SM>, config_path: Option<&str>, ) -> Result<EmbeddedEngine, Error>
where SE: StorageEngine + Debug + 'static, SM: StateMachine + Debug + 'static,

Start engine with custom storage and state machine.

Advanced API for users providing custom storage implementations.

§Arguments
  • config_path: Optional path to configuration file
  • storage_engine: Custom storage engine implementation
  • state_machine: Custom state machine implementation
§Example
let storage = Arc::new(MyCustomStorage::new()?);
let sm = Arc::new(MyCustomStateMachine::new()?);
let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
Source

pub async fn wait_ready(&self, timeout: Duration) -> Result<LeaderInfo, Error>

Wait until the cluster is ready to serve requests.

Blocks until a leader has been elected and its no-op entry is committed by a majority of nodes (Raft §8). Only at this point is the leader guaranteed to be aware of all previously committed entries and safe to serve reads and writes.

Event-driven notification (no polling), <1ms latency after noop commit.

§Timeout Guidelines

Single-node mode (most common for development):

  • Typical: <100ms (near-instant election)
  • Recommended: Duration::from_secs(3)

Multi-node HA cluster (production):

  • Typical: 1-3s (depends on network latency and general_raft_timeout_duration_in_ms)
  • Recommended: Duration::from_secs(10)

Special cases:

  • Health checks: Duration::from_secs(3) (fail fast if cluster unhealthy)
  • Startup scripts: Duration::from_secs(30) (allow time for cluster stabilization)
  • Development/testing: Duration::from_secs(5) (balance between speed and reliability)
§Returns
  • Ok(LeaderInfo) - Leader elected successfully
  • Err(...) - Timeout or cluster unavailable
§Example
// Single-node development
let engine = EmbeddedEngine::start("./data/my-app").await?;
let leader = engine.wait_ready(Duration::from_secs(3)).await?;

// Multi-node production
let engine = EmbeddedEngine::start_with("cluster.toml").await?;
let leader = engine.wait_ready(Duration::from_secs(10)).await?;
println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
Source

pub fn leader_change_notifier(&self) -> Receiver<Option<LeaderInfo>>

Subscribe to leader change notifications.

Returns a receiver that will be notified whenever:

  • First leader is elected
  • Leader changes (re-election)
  • No leader exists (during election)
§Performance

Event-driven notification (no polling), <1ms latency

§Example
let mut leader_rx = engine.leader_change_notifier();
tokio::spawn(async move {
    while leader_rx.changed().await.is_ok() {
        match leader_rx.borrow().as_ref() {
            Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
            None => println!("No leader"),
        }
    }
});
Source

pub fn watch_membership(&self) -> Receiver<MembershipSnapshot>

Subscribe to committed membership change notifications.

Returns a watch::Receiver that fires whenever a ConfChange entry (node join, removal, or learner promotion) is committed by a majority.

All nodes — leader, follower, and learner — fire the notification because every node walks the same CommitHandler::apply_config_change() path.

The first borrow() returns the current membership state immediately without waiting for a change.

§Distinguishing this from other notifiers
§Usage
let mut rx = engine.watch_membership();
while rx.changed().await.is_ok() {
    let snapshot = rx.borrow_and_update().clone();
    // snapshot.members — current voters
    // snapshot.learners — current non-voting learners
    // snapshot.committed_index — idempotency key
    scheduler.on_membership_changed(snapshot).await;
}
Source

pub fn is_leader(&self) -> bool

Returns true if the current node is the Raft leader.

§Use Cases
  • Load balancer health checks (e.g. HAProxy /primary endpoint returning HTTP 200/503)
  • Prevent write requests to followers before they fail
  • Application-level request routing decisions
§Performance

Zero-cost operation (reads cached leader state from watch channel)

§Example
// Load balancer health check endpoint
#[get("/primary")]
async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
    if engine.is_leader() {
        StatusCode::OK  // Routes writes here
    } else {
        StatusCode::SERVICE_UNAVAILABLE
    }
}

// Application request handler
if engine.is_leader() {
    client.put(key, value).await?;
} else {
    return Err("Not leader, write rejected");
}
Source

pub fn leader_info(&self) -> Option<LeaderInfo>

Returns current leader information if available.

§Returns
  • Some(LeaderInfo) if a leader is elected (includes leader_id and term)
  • None if no leader exists (during election or network partition)
§Use Cases
  • Monitoring dashboards showing cluster state
  • Debugging leader election issues
  • Logging cluster topology changes
§Example
if let Some(info) = engine.leader_info() {
    println!("Leader: {} (term {})", info.leader_id, info.term);
} else {
    println!("No leader elected, cluster unavailable");
}
Source

pub fn client(&self) -> Arc<EmbeddedClient>

Get a reference to the local KV client.

The client is available immediately after start(), but requests will only succeed after wait_ready() completes.

§Example
let engine = EmbeddedEngine::start("./data/my-app").await?;
engine.wait_ready(Duration::from_secs(5)).await?;
let client = engine.client();
client.put(b"key", b"value").await?;
Source

pub async fn stop(&self) -> Result<(), Error>

Stop the embedded d-engine gracefully (idempotent).

This method:

  1. Sends shutdown signal to node
  2. Waits for node.run() to complete
  3. Propagates any errors from node execution

Safe to call multiple times - subsequent calls are no-ops.

§Errors

Returns error if node encountered issues during shutdown.

§Example
engine.stop().await?;
engine.stop().await?;  // No-op, returns Ok(())
Source

pub async fn is_stopped(&self) -> bool

Check if the engine has been stopped.

§Example
if engine.is_stopped() {
    println!("Engine is stopped");
}
Source

pub fn node_id(&self) -> u32

Returns the unique identifier for this Raft node.

Trait Implementations§

Source§

impl Clone for EmbeddedEngine

Source§

fn clone(&self) -> EmbeddedEngine

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Drop for EmbeddedEngine

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more