d_engine_server/api/embedded.rs
1//! Embedded mode for d-engine - application-friendly API
2//!
3//! This module provides [`EmbeddedEngine`], a high-level wrapper around [`Node`]
4//! that simplifies lifecycle management for embedded use cases.
5//!
6//! ## Comparison: Node vs EmbeddedEngine
7//!
8//! ### Using Node (Low-level API)
9//! ```ignore
10//! let node = NodeBuilder::new(config).start().await?;
11//! // Node does not provide client directly - use EmbeddedEngine instead
12//! tokio::spawn(async move { node.run().await });
13//! // Manual lifecycle management required
14//! ```
15//!
16//! ### Using EmbeddedEngine (High-level API)
17//! ```ignore
18//! let engine = EmbeddedEngine::start().await?;
19//! engine.wait_ready(Duration::from_secs(5)).await?;
20//! let client = engine.client();
21//! engine.stop().await?;
22//! // Lifecycle managed automatically
23//! ```
24//!
25//! ## When to Use
26//!
27//! - **EmbeddedEngine**: Application developers who want simplicity
28//! - **Node**: Framework developers who need fine-grained control
29//!
30//! # Application Responsibilities in Multi-Node Deployments
31//!
32//! EmbeddedEngine provides **cluster state APIs** but does NOT handle request routing.
33//! Applications are responsible for handling follower write requests.
34//!
35//! ## What EmbeddedEngine Provides
36//!
37//! - ✅ `is_leader()` - Check if current node is leader
38//! - ✅ `leader_info()` - Get leader ID and term
39//! - ✅ `EmbeddedClient` returns `NotLeader` error on follower writes
40//! - ✅ Zero-overhead in-process communication (<0.1ms)
41//!
42//! ## What Applications Must Handle
43//!
44//! - ❌ Request routing (follower → leader forwarding)
45//! - ❌ Load balancing across nodes
46//! - ❌ Health check endpoints for load balancers
47//!
48//! ## Integration Patterns
49//!
50//! ### Pattern 1: Load Balancer Health Checks (Recommended for HA)
51//!
52//! **Application provides HTTP health check endpoints:**
53//! ```ignore
54//! use axum::{Router, routing::get, http::StatusCode};
55//!
56//! // Health check endpoint for load balancer (e.g. HAProxy)
57//! async fn health_primary(engine: Arc<EmbeddedEngine>) -> StatusCode {
58//! if engine.is_leader() {
59//! StatusCode::OK // 200 - Load balancer routes writes here
60//! } else {
61//! StatusCode::SERVICE_UNAVAILABLE // 503 - Load balancer skips this node
62//! }
63//! }
64//!
65//! async fn health_replica(engine: Arc<EmbeddedEngine>) -> StatusCode {
66//! if !engine.is_leader() {
67//! StatusCode::OK // 200 - Load balancer routes reads here
68//! } else {
69//! StatusCode::SERVICE_UNAVAILABLE // 503
70//! }
71//! }
72//!
73//! let app = Router::new()
74//! .route("/primary", get(health_primary)) // For write traffic
75//! .route("/replica", get(health_replica)); // For read traffic
76//!
77//! axum::Server::bind(&"0.0.0.0:8008".parse()?)
78//! .serve(app.into_make_service())
79//! .await?;
80//! ```
81//!
82//! ### Pattern 2: Pre-check Before Write (Simple)
83//!
84//! **Application checks leadership before handling writes:**
85//! ```ignore
86//! async fn handle_write_request(engine: &EmbeddedEngine, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
87//! if !engine.is_leader() {
88//! // Return HTTP 503 to client
89//! return Err("Not leader, please retry on another node");
90//! }
91//!
92//! // Safe to write (this node is leader)
93//! engine.client().put(key, value).await?;
94//! Ok(())
95//! }
96//! ```
97//!
98//! ### Pattern 3: HTTP 307 Redirect (Alternative)
99//!
100//! **Application returns redirect to leader:**
101//! ```ignore
102//! async fn handle_write_request(engine: &EmbeddedEngine) -> Response {
103//! match engine.client().put(key, value).await {
104//! Ok(_) => Response::ok(),
105//! Err(LocalClientError::NotLeader { leader_address: Some(addr), .. }) => {
106//! // Redirect client to leader
107//! Response::redirect_307(addr)
108//! }
109//! Err(e) => Response::error(e),
110//! }
111//! }
112//! ```
113//!
114//! ## Design Philosophy
115//!
116//! **Why doesn't EmbeddedEngine auto-forward writes?**
117//!
118//! 1. **Preserves zero-overhead guarantee** - Auto-forwarding requires gRPC client (adds network/serialization)
119//! 2. **Maintains simplicity** - No hidden network calls in "embedded" mode
120//! 3. **Flexible deployment** - Applications choose load balancer health checks, HTTP redirect, or other strategies
121//! 4. **Performance transparency** - Developers know exactly when network calls occur
122//!
123//! For auto-forwarding with gRPC overhead, use standalone mode with `GrpcClient`.
124
125use crate::Result;
126#[cfg(feature = "rocksdb")]
127use crate::RocksDBStateMachine;
128#[cfg(feature = "rocksdb")]
129use crate::RocksDBStorageEngine;
130use crate::StateMachine;
131use crate::StorageEngine;
132use crate::api::EmbeddedClient;
133use crate::node::NodeBuilder;
134use std::sync::Arc;
135use std::time::Duration;
136use tokio::sync::Mutex;
137use tokio::sync::watch;
138use tokio::task::JoinHandle;
139use tracing::error;
140use tracing::info;
141
142struct Inner {
143 node_handle: Mutex<Option<JoinHandle<Result<()>>>>,
144 shutdown_tx: watch::Sender<()>,
145 client: Arc<EmbeddedClient>,
146 leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
147 is_stopped: Mutex<bool>,
148}
149
150/// Embedded d-engine with automatic lifecycle management.
151///
152/// **Thread-safe**: Clone and share across threads freely.
153/// All methods use `&self` - safe to call from multiple contexts.
154///
155/// Provides high-level KV API for embedded usage:
156/// - `start()` / `start_with()` - Initialize and spawn node
157/// - `wait_ready()` - Wait for leader election
158/// - `client()` - Get embedded client
159/// - `stop()` - Graceful shutdown
160///
161/// # Example
162/// ```ignore
163/// use d_engine::EmbeddedEngine;
164/// use std::time::Duration;
165///
166/// let engine = EmbeddedEngine::start().await?; // Returns Arc<EmbeddedEngine>
167/// engine.wait_ready(Duration::from_secs(5)).await?;
168///
169/// let client = engine.client();
170/// client.put(b"key", b"value").await?;
171///
172/// engine.stop().await?;
173/// ```
174#[derive(Clone)]
175pub struct EmbeddedEngine {
176 inner: Arc<Inner>,
177}
178
179impl EmbeddedEngine {
180 /// Start engine with configuration from environment.
181 ///
182 /// Reads `CONFIG_PATH` environment variable or uses default configuration.
183 /// Data directory is determined by config's `cluster.db_root_dir` setting.
184 ///
185 /// # Example
186 /// ```ignore
187 /// // Set config path via environment variable
188 /// std::env::set_var("CONFIG_PATH", "/etc/d-engine/production.toml");
189 ///
190 /// let engine = EmbeddedEngine::start().await?;
191 /// engine.wait_ready(Duration::from_secs(5)).await?;
192 /// ```
193 #[cfg(feature = "rocksdb")]
194 pub async fn start() -> Result<Self> {
195 let config = d_engine_core::RaftNodeConfig::new()?.validate()?;
196 let base_dir = &config.cluster.db_root_dir;
197 tokio::fs::create_dir_all(base_dir)
198 .await
199 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
200
201 let storage_path = base_dir.join("storage");
202 let sm_path = base_dir.join("state_machine");
203
204 let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
205 let mut sm = RocksDBStateMachine::new(sm_path)?;
206
207 // Inject lease if enabled
208 let lease_cfg = &config.raft.state_machine.lease;
209 if lease_cfg.enabled {
210 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
211 sm.set_lease(lease);
212 }
213
214 info!("Starting embedded engine with RocksDB at {:?}", base_dir);
215
216 Self::start_custom(storage, Arc::new(sm), None).await
217 }
218
219 /// Start engine with explicit configuration file.
220 ///
221 /// Reads configuration from specified file path.
222 /// Data directory is determined by config's `cluster.db_root_dir` setting.
223 ///
224 /// # Arguments
225 /// - `config_path`: Path to configuration file (e.g. "d-engine.toml")
226 ///
227 /// # Example
228 /// ```ignore
229 /// let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
230 /// engine.wait_ready(Duration::from_secs(5)).await?;
231 /// ```
232 #[cfg(feature = "rocksdb")]
233 pub async fn start_with(config_path: &str) -> Result<Self> {
234 let config = d_engine_core::RaftNodeConfig::new()?
235 .with_override_config(config_path)?
236 .validate()?;
237 let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
238
239 tokio::fs::create_dir_all(&base_dir)
240 .await
241 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
242
243 let storage_path = base_dir.join("storage");
244 let sm_path = base_dir.join("state_machine");
245
246 let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
247 let mut sm = RocksDBStateMachine::new(sm_path)?;
248
249 // Inject lease if enabled
250 let lease_cfg = &config.raft.state_machine.lease;
251 if lease_cfg.enabled {
252 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
253 sm.set_lease(lease);
254 }
255
256 info!("Starting embedded engine with RocksDB at {:?}", base_dir);
257
258 Self::start_custom(storage, Arc::new(sm), Some(config_path)).await
259 }
260
261 /// Start engine with custom storage and state machine.
262 ///
263 /// Advanced API for users providing custom storage implementations.
264 ///
265 /// # Arguments
266 /// - `config_path`: Optional path to configuration file
267 /// - `storage_engine`: Custom storage engine implementation
268 /// - `state_machine`: Custom state machine implementation
269 ///
270 /// # Example
271 /// ```ignore
272 /// let storage = Arc::new(MyCustomStorage::new()?);
273 /// let sm = Arc::new(MyCustomStateMachine::new()?);
274 /// let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
275 /// ```
276 pub async fn start_custom<SE, SM>(
277 storage_engine: Arc<SE>,
278 state_machine: Arc<SM>,
279 config_path: Option<&str>,
280 ) -> Result<Self>
281 where
282 SE: StorageEngine + std::fmt::Debug + 'static,
283 SM: StateMachine + std::fmt::Debug + 'static,
284 {
285 info!("Starting embedded d-engine");
286
287 // Create shutdown channel
288 let (shutdown_tx, shutdown_rx) = watch::channel(());
289
290 // Load config or use default
291 let node_config = if let Some(path) = config_path {
292 d_engine_core::RaftNodeConfig::default()
293 .with_override_config(path)?
294 .validate()?
295 } else {
296 d_engine_core::RaftNodeConfig::new()?.validate()?
297 };
298
299 // Build node and start RPC server
300 let node = NodeBuilder::init(node_config, shutdown_rx)
301 .storage_engine(storage_engine)
302 .state_machine(state_machine)
303 .start()
304 .await?;
305
306 // Get leader change notifier before moving node
307 let leader_elected_rx = node.leader_change_notifier();
308
309 // Create client before spawning
310 #[cfg(not(feature = "watch"))]
311 let client = Arc::new(EmbeddedClient::new_internal(
312 node.event_tx.clone(),
313 node.cmd_tx.clone(),
314 node.node_id,
315 Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
316 ));
317
318 #[cfg(feature = "watch")]
319 let client = {
320 let watch_registry = node.watch_registry.clone();
321 let mut client = EmbeddedClient::new_internal(
322 node.event_tx.clone(),
323 node.cmd_tx.clone(),
324 node.node_id,
325 Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
326 );
327 if let Some(registry) = &watch_registry {
328 client = client.with_watch_registry(registry.clone());
329 }
330 Arc::new(client)
331 };
332
333 // Spawn node.run() in background
334 let node_handle = tokio::spawn(async move {
335 if let Err(e) = node.run().await {
336 error!("Node run error: {:?}", e);
337 Err(e)
338 } else {
339 Ok(())
340 }
341 });
342
343 info!("Embedded d-engine started successfully");
344
345 Ok(Self {
346 inner: Arc::new(Inner {
347 node_handle: Mutex::new(Some(node_handle)),
348 shutdown_tx,
349 client,
350 leader_elected_rx,
351 is_stopped: Mutex::new(false),
352 }),
353 })
354 }
355
356 /// Wait until the cluster is ready to serve requests.
357 ///
358 /// Blocks until a leader has been elected **and** its no-op entry is committed
359 /// by a majority of nodes (Raft §8). Only at this point is the leader guaranteed
360 /// to be aware of all previously committed entries and safe to serve reads and writes.
361 ///
362 /// Event-driven notification (no polling), <1ms latency after noop commit.
363 ///
364 /// # Timeout Guidelines
365 ///
366 /// **Single-node mode** (most common for development):
367 /// - Typical: <100ms (near-instant election)
368 /// - Recommended: `Duration::from_secs(3)`
369 ///
370 /// **Multi-node HA cluster** (production):
371 /// - Typical: 1-3s (depends on network latency and `general_raft_timeout_duration_in_ms`)
372 /// - Recommended: `Duration::from_secs(10)`
373 ///
374 /// **Special cases**:
375 /// - Health checks: `Duration::from_secs(3)` (fail fast if cluster unhealthy)
376 /// - Startup scripts: `Duration::from_secs(30)` (allow time for cluster stabilization)
377 /// - Development/testing: `Duration::from_secs(5)` (balance between speed and reliability)
378 ///
379 /// # Returns
380 /// - `Ok(LeaderInfo)` - Leader elected successfully
381 /// - `Err(...)` - Timeout or cluster unavailable
382 ///
383 /// # Example
384 /// ```ignore
385 /// // Single-node development
386 /// let engine = EmbeddedEngine::start().await?;
387 /// let leader = engine.wait_ready(Duration::from_secs(3)).await?;
388 ///
389 /// // Multi-node production
390 /// let engine = EmbeddedEngine::start_with("cluster.toml").await?;
391 /// let leader = engine.wait_ready(Duration::from_secs(10)).await?;
392 /// println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
393 /// ```
394 pub async fn wait_ready(
395 &self,
396 timeout: std::time::Duration,
397 ) -> Result<crate::LeaderInfo> {
398 let mut rx = self.inner.leader_elected_rx.clone();
399
400 tokio::time::timeout(timeout, async {
401 // Check current value first (leader may already be elected)
402 if let Some(info) = rx.borrow().as_ref() {
403 info!(
404 "Leader already elected: {} (term {})",
405 info.leader_id, info.term
406 );
407 return Ok(*info);
408 }
409
410 loop {
411 // Wait for leader election event (event-driven, no polling)
412 let _ = rx.changed().await;
413
414 // Check if a leader is elected
415 if let Some(info) = rx.borrow().as_ref() {
416 info!("Leader elected: {} (term {})", info.leader_id, info.term);
417 return Ok(*info);
418 }
419 }
420 })
421 .await
422 .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
423 }
424
425 /// Subscribe to leader change notifications.
426 ///
427 /// Returns a receiver that will be notified whenever:
428 /// - First leader is elected
429 /// - Leader changes (re-election)
430 /// - No leader exists (during election)
431 ///
432 /// # Performance
433 /// Event-driven notification (no polling), <1ms latency
434 ///
435 /// # Example
436 /// ```ignore
437 /// let mut leader_rx = engine.leader_change_notifier();
438 /// tokio::spawn(async move {
439 /// while leader_rx.changed().await.is_ok() {
440 /// match leader_rx.borrow().as_ref() {
441 /// Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
442 /// None => println!("No leader"),
443 /// }
444 /// }
445 /// });
446 /// ```
447 pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
448 self.inner.leader_elected_rx.clone()
449 }
450
451 /// Returns true if the current node is the Raft leader.
452 ///
453 /// # Use Cases
454 /// - Load balancer health checks (e.g. HAProxy `/primary` endpoint returning HTTP 200/503)
455 /// - Prevent write requests to followers before they fail
456 /// - Application-level request routing decisions
457 ///
458 /// # Performance
459 /// Zero-cost operation (reads cached leader state from watch channel)
460 ///
461 /// # Example
462 /// ```ignore
463 /// // Load balancer health check endpoint
464 /// #[get("/primary")]
465 /// async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
466 /// if engine.is_leader() {
467 /// StatusCode::OK // Routes writes here
468 /// } else {
469 /// StatusCode::SERVICE_UNAVAILABLE
470 /// }
471 /// }
472 ///
473 /// // Application request handler
474 /// if engine.is_leader() {
475 /// client.put(key, value).await?;
476 /// } else {
477 /// return Err("Not leader, write rejected");
478 /// }
479 /// ```
480 pub fn is_leader(&self) -> bool {
481 self.inner
482 .leader_elected_rx
483 .borrow()
484 .as_ref()
485 .map(|info| info.leader_id == self.inner.client.node_id())
486 .unwrap_or(false)
487 }
488
489 /// Returns current leader information if available.
490 ///
491 /// # Returns
492 /// - `Some(LeaderInfo)` if a leader is elected (includes leader_id and term)
493 /// - `None` if no leader exists (during election or network partition)
494 ///
495 /// # Use Cases
496 /// - Monitoring dashboards showing cluster state
497 /// - Debugging leader election issues
498 /// - Logging cluster topology changes
499 ///
500 /// # Example
501 /// ```ignore
502 /// if let Some(info) = engine.leader_info() {
503 /// println!("Leader: {} (term {})", info.leader_id, info.term);
504 /// } else {
505 /// println!("No leader elected, cluster unavailable");
506 /// }
507 /// ```
508 pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
509 *self.inner.leader_elected_rx.borrow()
510 }
511
512 /// Get a reference to the local KV client.
513 ///
514 /// The client is available immediately after `start()`,
515 /// but requests will only succeed after `wait_ready()` completes.
516 ///
517 /// # Example
518 /// ```ignore
519 /// let engine = EmbeddedEngine::start().await?;
520 /// engine.wait_ready(Duration::from_secs(5)).await?;
521 /// let client = engine.client();
522 /// client.put(b"key", b"value").await?;
523 /// ```
524 pub fn client(&self) -> Arc<EmbeddedClient> {
525 Arc::clone(&self.inner.client)
526 }
527
528 /// Gracefully stop the embedded engine.
529 /// Stop the embedded d-engine gracefully (idempotent).
530 ///
531 /// This method:
532 /// 1. Sends shutdown signal to node
533 /// 2. Waits for node.run() to complete
534 /// 3. Propagates any errors from node execution
535 ///
536 /// Safe to call multiple times - subsequent calls are no-ops.
537 ///
538 /// # Errors
539 /// Returns error if node encountered issues during shutdown.
540 ///
541 /// # Example
542 /// ```ignore
543 /// engine.stop().await?;
544 /// engine.stop().await?; // No-op, returns Ok(())
545 /// ```
546 pub async fn stop(&self) -> Result<()> {
547 let mut is_stopped = self.inner.is_stopped.lock().await;
548 if *is_stopped {
549 return Ok(());
550 }
551
552 info!("Stopping embedded d-engine");
553
554 // Send shutdown signal
555 let _ = self.inner.shutdown_tx.send(());
556
557 // Wait for node task to complete
558 let mut handle_guard = self.inner.node_handle.lock().await;
559 if let Some(handle) = handle_guard.take() {
560 match handle.await {
561 Ok(result) => {
562 info!("Embedded d-engine stopped successfully");
563 *is_stopped = true;
564 result
565 }
566 Err(e) => {
567 error!("Node task panicked: {:?}", e);
568 *is_stopped = true;
569 Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
570 }
571 }
572 } else {
573 *is_stopped = true;
574 Ok(())
575 }
576 }
577
578 /// Check if the engine has been stopped.
579 ///
580 /// # Example
581 /// ```ignore
582 /// if engine.is_stopped() {
583 /// println!("Engine is stopped");
584 /// }
585 /// ```
586 pub async fn is_stopped(&self) -> bool {
587 *self.inner.is_stopped.lock().await
588 }
589
590 /// Returns the node ID for testing purposes.
591 ///
592 /// Useful in integration tests that need to identify which node
593 /// they're interacting with, especially in multi-node scenarios.
594 pub fn node_id(&self) -> u32 {
595 self.inner.client.node_id()
596 }
597}
598
599impl Drop for EmbeddedEngine {
600 fn drop(&mut self) {
601 // Warn if stop() was not called
602 if let Ok(handle) = self.inner.node_handle.try_lock() {
603 if let Some(h) = &*handle {
604 if !h.is_finished() {
605 error!(
606 "EmbeddedEngine dropped without calling stop() - background task may leak"
607 );
608 }
609 }
610 }
611 }
612}