d_engine_server/api/
standalone.rs1use std::sync::Arc;
4
5use tokio::sync::watch;
6
7use crate::Result;
8#[cfg(feature = "rocksdb")]
9use crate::RocksDBStateMachine;
10#[cfg(feature = "rocksdb")]
11use crate::RocksDBStorageEngine;
12#[cfg(feature = "rocksdb")]
13use crate::RocksDBUnifiedEngine;
14use crate::StateMachine;
15use crate::StorageEngine;
16use crate::node::NodeBuilder;
17
18pub struct StandaloneEngine;
20
21impl StandaloneEngine {
22 #[cfg(feature = "rocksdb")]
41 pub async fn run(
42 data_dir: impl AsRef<std::path::Path>,
43 shutdown_rx: watch::Receiver<()>,
44 ) -> Result<()> {
45 let mut config = d_engine_core::RaftNodeConfig::new()?;
46 config.cluster.db_root_dir = data_dir.as_ref().to_path_buf();
47 let config = config.validate()?;
48 let base_dir = config.cluster.db_root_dir.clone();
49
50 tokio::fs::create_dir_all(&base_dir)
51 .await
52 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
53
54 let (storage, mut sm) = if config.storage.unified_db {
55 let db_path = base_dir.join("db");
56 tracing::info!(
57 "Starting standalone server with unified RocksDB at {:?}",
58 db_path
59 );
60 RocksDBUnifiedEngine::open(&db_path)?
61 } else {
62 tracing::info!(
63 "Starting standalone server with separate RocksDB instances at {:?}",
64 base_dir
65 );
66 let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
67 let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
68 (storage, sm)
69 };
70
71 let lease_cfg = &config.raft.state_machine.lease;
72 if lease_cfg.enabled {
73 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
74 sm.set_lease(lease);
75 }
76
77 Self::start_node(config, Arc::new(storage), Arc::new(sm), shutdown_rx).await
78 }
79
80 #[cfg(feature = "rocksdb")]
96 pub async fn run_with(
97 config_path: &str,
98 shutdown_rx: watch::Receiver<()>,
99 ) -> Result<()> {
100 let config = d_engine_core::RaftNodeConfig::new()?
101 .with_override_config(config_path)?
102 .validate()?;
103 let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
104
105 tokio::fs::create_dir_all(&base_dir)
106 .await
107 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
108
109 let (storage, mut sm) = if config.storage.unified_db {
110 let db_path = base_dir.join("db");
111 tracing::info!(
112 "Starting standalone server with unified RocksDB at {:?}",
113 db_path
114 );
115 RocksDBUnifiedEngine::open(&db_path)?
116 } else {
117 tracing::info!(
118 "Starting standalone server with separate RocksDB instances at {:?}",
119 base_dir
120 );
121 let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
122 let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
123 (storage, sm)
124 };
125
126 let lease_cfg = &config.raft.state_machine.lease;
128 if lease_cfg.enabled {
129 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
130 sm.set_lease(lease);
131 }
132
133 Self::start_node(config, Arc::new(storage), Arc::new(sm), shutdown_rx).await
134 }
135
136 pub async fn run_custom<SE, SM>(
156 storage_engine: Arc<SE>,
157 state_machine: Arc<SM>,
158 shutdown_rx: watch::Receiver<()>,
159 config_path: Option<&str>,
160 ) -> Result<()>
161 where
162 SE: StorageEngine + std::fmt::Debug + 'static,
163 SM: StateMachine + std::fmt::Debug + 'static,
164 {
165 let config = if let Some(path) = config_path {
166 d_engine_core::RaftNodeConfig::default()
167 .with_override_config(path)?
168 .validate()?
169 } else {
170 d_engine_core::RaftNodeConfig::new()?.validate()?
171 };
172 Self::start_node(config, storage_engine, state_machine, shutdown_rx).await
173 }
174
175 async fn start_node<SE, SM>(
176 config: d_engine_core::RaftNodeConfig,
177 storage_engine: Arc<SE>,
178 state_machine: Arc<SM>,
179 shutdown_rx: watch::Receiver<()>,
180 ) -> Result<()>
181 where
182 SE: StorageEngine + std::fmt::Debug + 'static,
183 SM: StateMachine + std::fmt::Debug + 'static,
184 {
185 let node = NodeBuilder::init(config, shutdown_rx)
186 .storage_engine(storage_engine)
187 .state_machine(state_machine)
188 .start()
189 .await?;
190 node.run().await
191 }
192}