Skip to main content

d_engine_server/api/
standalone.rs

1//! Standalone mode for d-engine - independent server deployment
2
3use std::sync::Arc;
4
5use tokio::sync::watch;
6
7use crate::Result;
8#[cfg(feature = "rocksdb")]
9use crate::RocksDBStateMachine;
10#[cfg(feature = "rocksdb")]
11use crate::RocksDBStorageEngine;
12#[cfg(feature = "rocksdb")]
13use crate::RocksDBUnifiedEngine;
14use crate::StateMachine;
15use crate::StorageEngine;
16use crate::node::NodeBuilder;
17
18/// Standalone d-engine engine for independent deployment
19pub struct StandaloneEngine;
20
21impl StandaloneEngine {
22    /// Run server with an explicit data directory.
23    ///
24    /// `data_dir` has highest priority and always overrides `cluster.db_root_dir` from
25    /// `CONFIG_PATH` or `RAFT__` environment variables. Other configuration (network,
26    /// Raft timeouts, cluster topology) is still read from those sources if set.
27    ///
28    /// The directory is created automatically if it does not exist.
29    /// Blocks until shutdown signal is received.
30    ///
31    /// # Arguments
32    /// * `data_dir` - Path to the data directory
33    /// * `shutdown_rx` - Shutdown signal receiver
34    ///
35    /// # Example
36    /// ```ignore
37    /// let (shutdown_tx, shutdown_rx) = watch::channel(());
38    /// StandaloneEngine::run("./data/my-node", shutdown_rx).await?;
39    /// ```
40    #[cfg(feature = "rocksdb")]
41    pub async fn run(
42        data_dir: impl AsRef<std::path::Path>,
43        shutdown_rx: watch::Receiver<()>,
44    ) -> Result<()> {
45        let mut config = d_engine_core::RaftNodeConfig::new()?;
46        config.cluster.db_root_dir = data_dir.as_ref().to_path_buf();
47        let config = config.validate()?;
48        let base_dir = config.cluster.db_root_dir.clone();
49
50        tokio::fs::create_dir_all(&base_dir)
51            .await
52            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
53
54        let (storage, mut sm) = if config.storage.unified_db {
55            let db_path = base_dir.join("db");
56            tracing::info!(
57                "Starting standalone server with unified RocksDB at {:?}",
58                db_path
59            );
60            RocksDBUnifiedEngine::open(&db_path)?
61        } else {
62            tracing::info!(
63                "Starting standalone server with separate RocksDB instances at {:?}",
64                base_dir
65            );
66            let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
67            let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
68            (storage, sm)
69        };
70
71        let lease_cfg = &config.raft.state_machine.lease;
72        if lease_cfg.enabled {
73            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
74            sm.set_lease(lease);
75        }
76
77        Self::start_node(config, Arc::new(storage), Arc::new(sm), shutdown_rx).await
78    }
79
80    /// Run server with explicit configuration file.
81    ///
82    /// Reads configuration from specified file path.
83    /// Data directory is determined by config's `cluster.db_root_dir` setting.
84    /// Blocks until shutdown signal is received.
85    ///
86    /// # Arguments
87    /// * `config_path` - Path to configuration file
88    /// * `shutdown_rx` - Shutdown signal receiver
89    ///
90    /// # Example
91    /// ```ignore
92    /// let (shutdown_tx, shutdown_rx) = watch::channel(());
93    /// StandaloneEngine::run_with("config/node1.toml", shutdown_rx).await?;
94    /// ```
95    #[cfg(feature = "rocksdb")]
96    pub async fn run_with(
97        config_path: &str,
98        shutdown_rx: watch::Receiver<()>,
99    ) -> Result<()> {
100        let config = d_engine_core::RaftNodeConfig::new()?
101            .with_override_config(config_path)?
102            .validate()?;
103        let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
104
105        tokio::fs::create_dir_all(&base_dir)
106            .await
107            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
108
109        let (storage, mut sm) = if config.storage.unified_db {
110            let db_path = base_dir.join("db");
111            tracing::info!(
112                "Starting standalone server with unified RocksDB at {:?}",
113                db_path
114            );
115            RocksDBUnifiedEngine::open(&db_path)?
116        } else {
117            tracing::info!(
118                "Starting standalone server with separate RocksDB instances at {:?}",
119                base_dir
120            );
121            let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
122            let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
123            (storage, sm)
124        };
125
126        // Inject lease if enabled
127        let lease_cfg = &config.raft.state_machine.lease;
128        if lease_cfg.enabled {
129            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
130            sm.set_lease(lease);
131        }
132
133        Self::start_node(config, Arc::new(storage), Arc::new(sm), shutdown_rx).await
134    }
135
136    /// Run server with custom storage engine and state machine.
137    ///
138    /// Advanced API for users providing custom storage implementations.
139    /// Blocks until shutdown signal is received.
140    ///
141    /// # Arguments
142    /// * `config` - Node configuration
143    /// * `storage_engine` - Custom storage engine implementation
144    /// * `state_machine` - Custom state machine implementation
145    /// * `shutdown_rx` - Shutdown signal receiver
146    ///
147    /// # Example
148    /// ```ignore
149    /// let storage = Arc::new(MyCustomStorage::new()?);
150    /// let sm = Arc::new(MyCustomStateMachine::new()?);
151    ///
152    /// let (shutdown_tx, shutdown_rx) = watch::channel(());
153    /// StandaloneEngine::run_custom(storage, sm, shutdown_rx, Some("config.toml")).await?;
154    /// ```
155    pub async fn run_custom<SE, SM>(
156        storage_engine: Arc<SE>,
157        state_machine: Arc<SM>,
158        shutdown_rx: watch::Receiver<()>,
159        config_path: Option<&str>,
160    ) -> Result<()>
161    where
162        SE: StorageEngine + std::fmt::Debug + 'static,
163        SM: StateMachine + std::fmt::Debug + 'static,
164    {
165        let config = if let Some(path) = config_path {
166            d_engine_core::RaftNodeConfig::default()
167                .with_override_config(path)?
168                .validate()?
169        } else {
170            d_engine_core::RaftNodeConfig::new()?.validate()?
171        };
172        Self::start_node(config, storage_engine, state_machine, shutdown_rx).await
173    }
174
175    async fn start_node<SE, SM>(
176        config: d_engine_core::RaftNodeConfig,
177        storage_engine: Arc<SE>,
178        state_machine: Arc<SM>,
179        shutdown_rx: watch::Receiver<()>,
180    ) -> Result<()>
181    where
182        SE: StorageEngine + std::fmt::Debug + 'static,
183        SM: StateMachine + std::fmt::Debug + 'static,
184    {
185        let node = NodeBuilder::init(config, shutdown_rx)
186            .storage_engine(storage_engine)
187            .state_machine(state_machine)
188            .start()
189            .await?;
190        node.run().await
191    }
192}