use std::sync::Arc;
use tokio::sync::watch;
use crate::Result;
#[cfg(feature = "rocksdb")]
use crate::RocksDBStateMachine;
#[cfg(feature = "rocksdb")]
use crate::RocksDBStorageEngine;
use crate::StateMachine;
use crate::StorageEngine;
use crate::node::NodeBuilder;
pub struct StandaloneEngine;
impl StandaloneEngine {
#[cfg(feature = "rocksdb")]
pub async fn run(shutdown_rx: watch::Receiver<()>) -> Result<()> {
let config = d_engine_core::RaftNodeConfig::new()?.validate()?;
let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
tokio::fs::create_dir_all(&base_dir)
.await
.map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
let storage_path = base_dir.join("storage");
let sm_path = base_dir.join("state_machine");
tracing::info!("Starting standalone server with RocksDB at {:?}", base_dir);
let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
let mut sm = RocksDBStateMachine::new(sm_path)?;
let lease_cfg = &config.raft.state_machine.lease;
if lease_cfg.enabled {
let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
sm.set_lease(lease);
}
let sm = Arc::new(sm);
Self::run_custom(storage, sm, shutdown_rx, None).await
}
#[cfg(feature = "rocksdb")]
pub async fn run_with(
config_path: &str,
shutdown_rx: watch::Receiver<()>,
) -> Result<()> {
let config = d_engine_core::RaftNodeConfig::new()?
.with_override_config(config_path)?
.validate()?;
let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
tokio::fs::create_dir_all(&base_dir)
.await
.map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
let storage_path = base_dir.join("storage");
let sm_path = base_dir.join("state_machine");
tracing::info!("Starting standalone server with RocksDB at {:?}", base_dir);
let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
let mut sm = RocksDBStateMachine::new(sm_path)?;
let lease_cfg = &config.raft.state_machine.lease;
if lease_cfg.enabled {
let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
sm.set_lease(lease);
}
let sm = Arc::new(sm);
Self::run_custom(storage, sm, shutdown_rx, Some(config_path)).await
}
pub async fn run_custom<SE, SM>(
storage_engine: Arc<SE>,
state_machine: Arc<SM>,
shutdown_rx: watch::Receiver<()>,
config_path: Option<&str>,
) -> Result<()>
where
SE: StorageEngine + std::fmt::Debug + 'static,
SM: StateMachine + std::fmt::Debug + 'static,
{
let node_config = if let Some(path) = config_path {
d_engine_core::RaftNodeConfig::default()
.with_override_config(path)?
.validate()?
} else {
d_engine_core::RaftNodeConfig::new()?.validate()?
};
let node = NodeBuilder::init(node_config, shutdown_rx)
.storage_engine(storage_engine)
.state_machine(state_machine)
.start()
.await?;
node.run().await
}
}