use crate::Result;
#[cfg(feature = "rocksdb")]
use crate::RocksDBStateMachine;
#[cfg(feature = "rocksdb")]
use crate::RocksDBStorageEngine;
use crate::StateMachine;
use crate::StorageEngine;
use crate::api::EmbeddedClient;
use crate::node::NodeBuilder;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tracing::error;
use tracing::info;
struct Inner {
node_handle: Mutex<Option<JoinHandle<Result<()>>>>,
shutdown_tx: watch::Sender<()>,
client: Arc<EmbeddedClient>,
leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
is_stopped: Mutex<bool>,
}
#[derive(Clone)]
pub struct EmbeddedEngine {
inner: Arc<Inner>,
}
impl EmbeddedEngine {
#[cfg(feature = "rocksdb")]
pub async fn start() -> Result<Self> {
let config = d_engine_core::RaftNodeConfig::new()?.validate()?;
let base_dir = &config.cluster.db_root_dir;
tokio::fs::create_dir_all(base_dir)
.await
.map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
let storage_path = base_dir.join("storage");
let sm_path = base_dir.join("state_machine");
let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
let mut sm = RocksDBStateMachine::new(sm_path)?;
let lease_cfg = &config.raft.state_machine.lease;
if lease_cfg.enabled {
let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
sm.set_lease(lease);
}
info!("Starting embedded engine with RocksDB at {:?}", base_dir);
Self::start_custom(storage, Arc::new(sm), None).await
}
#[cfg(feature = "rocksdb")]
pub async fn start_with(config_path: &str) -> Result<Self> {
let config = d_engine_core::RaftNodeConfig::new()?
.with_override_config(config_path)?
.validate()?;
let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
tokio::fs::create_dir_all(&base_dir)
.await
.map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
let storage_path = base_dir.join("storage");
let sm_path = base_dir.join("state_machine");
let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
let mut sm = RocksDBStateMachine::new(sm_path)?;
let lease_cfg = &config.raft.state_machine.lease;
if lease_cfg.enabled {
let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
sm.set_lease(lease);
}
info!("Starting embedded engine with RocksDB at {:?}", base_dir);
Self::start_custom(storage, Arc::new(sm), Some(config_path)).await
}
pub async fn start_custom<SE, SM>(
storage_engine: Arc<SE>,
state_machine: Arc<SM>,
config_path: Option<&str>,
) -> Result<Self>
where
SE: StorageEngine + std::fmt::Debug + 'static,
SM: StateMachine + std::fmt::Debug + 'static,
{
info!("Starting embedded d-engine");
let (shutdown_tx, shutdown_rx) = watch::channel(());
let node_config = if let Some(path) = config_path {
d_engine_core::RaftNodeConfig::default()
.with_override_config(path)?
.validate()?
} else {
d_engine_core::RaftNodeConfig::new()?.validate()?
};
let node = NodeBuilder::init(node_config, shutdown_rx)
.storage_engine(storage_engine)
.state_machine(state_machine)
.start()
.await?;
let leader_elected_rx = node.leader_change_notifier();
#[cfg(not(feature = "watch"))]
let client = Arc::new(EmbeddedClient::new_internal(
node.event_tx.clone(),
node.cmd_tx.clone(),
node.node_id,
Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
));
#[cfg(feature = "watch")]
let client = {
let watch_registry = node.watch_registry.clone();
let mut client = EmbeddedClient::new_internal(
node.event_tx.clone(),
node.cmd_tx.clone(),
node.node_id,
Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
);
if let Some(registry) = &watch_registry {
client = client.with_watch_registry(registry.clone());
}
Arc::new(client)
};
let node_handle = tokio::spawn(async move {
if let Err(e) = node.run().await {
error!("Node run error: {:?}", e);
Err(e)
} else {
Ok(())
}
});
info!("Embedded d-engine started successfully");
Ok(Self {
inner: Arc::new(Inner {
node_handle: Mutex::new(Some(node_handle)),
shutdown_tx,
client,
leader_elected_rx,
is_stopped: Mutex::new(false),
}),
})
}
pub async fn wait_ready(
&self,
timeout: std::time::Duration,
) -> Result<crate::LeaderInfo> {
let mut rx = self.inner.leader_elected_rx.clone();
tokio::time::timeout(timeout, async {
if let Some(info) = rx.borrow().as_ref() {
info!(
"Leader already elected: {} (term {})",
info.leader_id, info.term
);
return Ok(*info);
}
loop {
let _ = rx.changed().await;
if let Some(info) = rx.borrow().as_ref() {
info!("Leader elected: {} (term {})", info.leader_id, info.term);
return Ok(*info);
}
}
})
.await
.map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
}
pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
self.inner.leader_elected_rx.clone()
}
pub fn is_leader(&self) -> bool {
self.inner
.leader_elected_rx
.borrow()
.as_ref()
.map(|info| info.leader_id == self.inner.client.node_id())
.unwrap_or(false)
}
pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
*self.inner.leader_elected_rx.borrow()
}
pub fn client(&self) -> Arc<EmbeddedClient> {
Arc::clone(&self.inner.client)
}
pub async fn stop(&self) -> Result<()> {
let mut is_stopped = self.inner.is_stopped.lock().await;
if *is_stopped {
return Ok(());
}
info!("Stopping embedded d-engine");
let _ = self.inner.shutdown_tx.send(());
let mut handle_guard = self.inner.node_handle.lock().await;
if let Some(handle) = handle_guard.take() {
match handle.await {
Ok(result) => {
info!("Embedded d-engine stopped successfully");
*is_stopped = true;
result
}
Err(e) => {
error!("Node task panicked: {:?}", e);
*is_stopped = true;
Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
}
}
} else {
*is_stopped = true;
Ok(())
}
}
pub async fn is_stopped(&self) -> bool {
*self.inner.is_stopped.lock().await
}
pub fn node_id(&self) -> u32 {
self.inner.client.node_id()
}
}
impl Drop for EmbeddedEngine {
fn drop(&mut self) {
if let Ok(handle) = self.inner.node_handle.try_lock() {
if let Some(h) = &*handle {
if !h.is_finished() {
error!(
"EmbeddedEngine dropped without calling stop() - background task may leak"
);
}
}
}
}
}