Skip to main content

d_engine_server/api/
embedded.rs

1//! Embedded mode for d-engine - application-friendly API
2//!
3//! This module provides [`EmbeddedEngine`], a high-level wrapper around [`Node`]
4//! that simplifies lifecycle management for embedded use cases.
5//!
6//! ## Comparison: Node vs EmbeddedEngine
7//!
8//! ### Using Node (Low-level API)
9//! ```ignore
10//! let node = NodeBuilder::new(config).start().await?;
11//! // Node does not provide client directly - use EmbeddedEngine instead
12//! tokio::spawn(async move { node.run().await });
13//! // Manual lifecycle management required
14//! ```
15//!
16//! ### Using EmbeddedEngine (High-level API)
17//! ```ignore
18//! let engine = EmbeddedEngine::start().await?;
19//! engine.wait_ready(Duration::from_secs(5)).await?;
20//! let client = engine.client();
21//! engine.stop().await?;
22//! // Lifecycle managed automatically
23//! ```
24//!
25//! ## When to Use
26//!
27//! - **EmbeddedEngine**: Application developers who want simplicity
28//! - **Node**: Framework developers who need fine-grained control
29//!
30//! # Application Responsibilities in Multi-Node Deployments
31//!
32//! EmbeddedEngine provides **cluster state APIs** but does NOT handle request routing.
33//! Applications are responsible for handling follower write requests.
34//!
35//! ## What EmbeddedEngine Provides
36//!
37//! - ✅ `is_leader()` - Check if current node is leader
38//! - ✅ `leader_info()` - Get leader ID and term
39//! - ✅ `EmbeddedClient` returns `NotLeader` error on follower writes
40//! - ✅ Zero-overhead in-process communication (<0.1ms)
41//!
42//! ## What Applications Must Handle
43//!
44//! - ❌ Request routing (follower → leader forwarding)
45//! - ❌ Load balancing across nodes
46//! - ❌ Health check endpoints for load balancers
47//!
48//! ## Integration Patterns
49//!
50//! ### Pattern 1: Load Balancer Health Checks (Recommended for HA)
51//!
52//! **Application provides HTTP health check endpoints:**
53//! ```ignore
54//! use axum::{Router, routing::get, http::StatusCode};
55//!
56//! // Health check endpoint for load balancer (e.g. HAProxy)
57//! async fn health_primary(engine: Arc<EmbeddedEngine>) -> StatusCode {
58//!     if engine.is_leader() {
59//!         StatusCode::OK  // 200 - Load balancer routes writes here
60//!     } else {
61//!         StatusCode::SERVICE_UNAVAILABLE  // 503 - Load balancer skips this node
62//!     }
63//! }
64//!
65//! async fn health_replica(engine: Arc<EmbeddedEngine>) -> StatusCode {
66//!     if !engine.is_leader() {
67//!         StatusCode::OK  // 200 - Load balancer routes reads here
68//!     } else {
69//!         StatusCode::SERVICE_UNAVAILABLE  // 503
70//!     }
71//! }
72//!
73//! let app = Router::new()
74//!     .route("/primary", get(health_primary))  // For write traffic
75//!     .route("/replica", get(health_replica)); // For read traffic
76//!
77//! axum::Server::bind(&"0.0.0.0:8008".parse()?)
78//!     .serve(app.into_make_service())
79//!     .await?;
80//! ```
81//!
82//! ### Pattern 2: Pre-check Before Write (Simple)
83//!
84//! **Application checks leadership before handling writes:**
85//! ```ignore
86//! async fn handle_write_request(engine: &EmbeddedEngine, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
87//!     if !engine.is_leader() {
88//!         // Return HTTP 503 to client
89//!         return Err("Not leader, please retry on another node");
90//!     }
91//!
92//!     // Safe to write (this node is leader)
93//!     engine.client().put(key, value).await?;
94//!     Ok(())
95//! }
96//! ```
97//!
98//! ### Pattern 3: HTTP 307 Redirect (Alternative)
99//!
100//! **Application returns redirect to leader:**
101//! ```ignore
102//! async fn handle_write_request(engine: &EmbeddedEngine) -> Response {
103//!     match engine.client().put(key, value).await {
104//!         Ok(_) => Response::ok(),
105//!         Err(LocalClientError::NotLeader { leader_address: Some(addr), .. }) => {
106//!             // Redirect client to leader
107//!             Response::redirect_307(addr)
108//!         }
109//!         Err(e) => Response::error(e),
110//!     }
111//! }
112//! ```
113//!
114//! ## Design Philosophy
115//!
116//! **Why doesn't EmbeddedEngine auto-forward writes?**
117//!
118//! 1. **Preserves zero-overhead guarantee** - Auto-forwarding requires gRPC client (adds network/serialization)
119//! 2. **Maintains simplicity** - No hidden network calls in "embedded" mode
120//! 3. **Flexible deployment** - Applications choose load balancer health checks, HTTP redirect, or other strategies
121//! 4. **Performance transparency** - Developers know exactly when network calls occur
122//!
123//! For auto-forwarding with gRPC overhead, use standalone mode with `GrpcClient`.
124
125use crate::Result;
126#[cfg(feature = "rocksdb")]
127use crate::RocksDBStateMachine;
128#[cfg(feature = "rocksdb")]
129use crate::RocksDBStorageEngine;
130use crate::StateMachine;
131use crate::StorageEngine;
132use crate::api::EmbeddedClient;
133use crate::node::NodeBuilder;
134use std::sync::Arc;
135use std::time::Duration;
136use tokio::sync::Mutex;
137use tokio::sync::watch;
138use tokio::task::JoinHandle;
139use tracing::error;
140use tracing::info;
141
142struct Inner {
143    node_handle: Mutex<Option<JoinHandle<Result<()>>>>,
144    shutdown_tx: watch::Sender<()>,
145    client: Arc<EmbeddedClient>,
146    leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
147    is_stopped: Mutex<bool>,
148}
149
150/// Embedded d-engine with automatic lifecycle management.
151///
152/// **Thread-safe**: Clone and share across threads freely.
153/// All methods use `&self` - safe to call from multiple contexts.
154///
155/// Provides high-level KV API for embedded usage:
156/// - `start()` / `start_with()` - Initialize and spawn node
157/// - `wait_ready()` - Wait for leader election
158/// - `client()` - Get embedded client
159/// - `stop()` - Graceful shutdown
160///
161/// # Example
162/// ```ignore
163/// use d_engine::EmbeddedEngine;
164/// use std::time::Duration;
165///
166/// let engine = EmbeddedEngine::start().await?;  // Returns Arc<EmbeddedEngine>
167/// engine.wait_ready(Duration::from_secs(5)).await?;
168///
169/// let client = engine.client();
170/// client.put(b"key", b"value").await?;
171///
172/// engine.stop().await?;
173/// ```
174#[derive(Clone)]
175pub struct EmbeddedEngine {
176    inner: Arc<Inner>,
177}
178
179impl EmbeddedEngine {
180    /// Start engine with configuration from environment.
181    ///
182    /// Reads `CONFIG_PATH` environment variable or uses default configuration.
183    /// Data directory is determined by config's `cluster.db_root_dir` setting.
184    ///
185    /// # Example
186    /// ```ignore
187    /// // Set config path via environment variable
188    /// std::env::set_var("CONFIG_PATH", "/etc/d-engine/production.toml");
189    ///
190    /// let engine = EmbeddedEngine::start().await?;
191    /// engine.wait_ready(Duration::from_secs(5)).await?;
192    /// ```
193    #[cfg(feature = "rocksdb")]
194    pub async fn start() -> Result<Self> {
195        let config = d_engine_core::RaftNodeConfig::new()?.validate()?;
196        let base_dir = &config.cluster.db_root_dir;
197        tokio::fs::create_dir_all(base_dir)
198            .await
199            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
200
201        let storage_path = base_dir.join("storage");
202        let sm_path = base_dir.join("state_machine");
203
204        let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
205        let mut sm = RocksDBStateMachine::new(sm_path)?;
206
207        // Inject lease if enabled
208        let lease_cfg = &config.raft.state_machine.lease;
209        if lease_cfg.enabled {
210            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
211            sm.set_lease(lease);
212        }
213
214        info!("Starting embedded engine with RocksDB at {:?}", base_dir);
215
216        Self::start_custom(storage, Arc::new(sm), None).await
217    }
218
219    /// Start engine with explicit configuration file.
220    ///
221    /// Reads configuration from specified file path.
222    /// Data directory is determined by config's `cluster.db_root_dir` setting.
223    ///
224    /// # Arguments
225    /// - `config_path`: Path to configuration file (e.g. "d-engine.toml")
226    ///
227    /// # Example
228    /// ```ignore
229    /// let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
230    /// engine.wait_ready(Duration::from_secs(5)).await?;
231    /// ```
232    #[cfg(feature = "rocksdb")]
233    pub async fn start_with(config_path: &str) -> Result<Self> {
234        let config = d_engine_core::RaftNodeConfig::new()?
235            .with_override_config(config_path)?
236            .validate()?;
237        let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
238
239        tokio::fs::create_dir_all(&base_dir)
240            .await
241            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
242
243        let storage_path = base_dir.join("storage");
244        let sm_path = base_dir.join("state_machine");
245
246        let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
247        let mut sm = RocksDBStateMachine::new(sm_path)?;
248
249        // Inject lease if enabled
250        let lease_cfg = &config.raft.state_machine.lease;
251        if lease_cfg.enabled {
252            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
253            sm.set_lease(lease);
254        }
255
256        info!("Starting embedded engine with RocksDB at {:?}", base_dir);
257
258        Self::start_custom(storage, Arc::new(sm), Some(config_path)).await
259    }
260
261    /// Start engine with custom storage and state machine.
262    ///
263    /// Advanced API for users providing custom storage implementations.
264    ///
265    /// # Arguments
266    /// - `config_path`: Optional path to configuration file
267    /// - `storage_engine`: Custom storage engine implementation
268    /// - `state_machine`: Custom state machine implementation
269    ///
270    /// # Example
271    /// ```ignore
272    /// let storage = Arc::new(MyCustomStorage::new()?);
273    /// let sm = Arc::new(MyCustomStateMachine::new()?);
274    /// let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
275    /// ```
276    pub async fn start_custom<SE, SM>(
277        storage_engine: Arc<SE>,
278        state_machine: Arc<SM>,
279        config_path: Option<&str>,
280    ) -> Result<Self>
281    where
282        SE: StorageEngine + std::fmt::Debug + 'static,
283        SM: StateMachine + std::fmt::Debug + 'static,
284    {
285        info!("Starting embedded d-engine");
286
287        // Create shutdown channel
288        let (shutdown_tx, shutdown_rx) = watch::channel(());
289
290        // Load config or use default
291        let node_config = if let Some(path) = config_path {
292            d_engine_core::RaftNodeConfig::default()
293                .with_override_config(path)?
294                .validate()?
295        } else {
296            d_engine_core::RaftNodeConfig::new()?.validate()?
297        };
298
299        // Build node and start RPC server
300        let node = NodeBuilder::init(node_config, shutdown_rx)
301            .storage_engine(storage_engine)
302            .state_machine(state_machine)
303            .start()
304            .await?;
305
306        // Get leader change notifier before moving node
307        let leader_elected_rx = node.leader_change_notifier();
308
309        // Create client before spawning
310        #[cfg(not(feature = "watch"))]
311        let client = Arc::new(EmbeddedClient::new_internal(
312            node.event_tx.clone(),
313            node.cmd_tx.clone(),
314            node.node_id,
315            Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
316        ));
317
318        #[cfg(feature = "watch")]
319        let client = {
320            let watch_registry = node.watch_registry.clone();
321            let mut client = EmbeddedClient::new_internal(
322                node.event_tx.clone(),
323                node.cmd_tx.clone(),
324                node.node_id,
325                Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
326            );
327            if let Some(registry) = &watch_registry {
328                client = client.with_watch_registry(registry.clone());
329            }
330            Arc::new(client)
331        };
332
333        // Spawn node.run() in background
334        let node_handle = tokio::spawn(async move {
335            if let Err(e) = node.run().await {
336                error!("Node run error: {:?}", e);
337                Err(e)
338            } else {
339                Ok(())
340            }
341        });
342
343        info!("Embedded d-engine started successfully");
344
345        Ok(Self {
346            inner: Arc::new(Inner {
347                node_handle: Mutex::new(Some(node_handle)),
348                shutdown_tx,
349                client,
350                leader_elected_rx,
351                is_stopped: Mutex::new(false),
352            }),
353        })
354    }
355
356    /// Wait until the cluster is ready to serve requests.
357    ///
358    /// Blocks until a leader has been elected **and** its no-op entry is committed
359    /// by a majority of nodes (Raft §8). Only at this point is the leader guaranteed
360    /// to be aware of all previously committed entries and safe to serve reads and writes.
361    ///
362    /// Event-driven notification (no polling), <1ms latency after noop commit.
363    ///
364    /// # Timeout Guidelines
365    ///
366    /// **Single-node mode** (most common for development):
367    /// - Typical: <100ms (near-instant election)
368    /// - Recommended: `Duration::from_secs(3)`
369    ///
370    /// **Multi-node HA cluster** (production):
371    /// - Typical: 1-3s (depends on network latency and `general_raft_timeout_duration_in_ms`)
372    /// - Recommended: `Duration::from_secs(10)`
373    ///
374    /// **Special cases**:
375    /// - Health checks: `Duration::from_secs(3)` (fail fast if cluster unhealthy)
376    /// - Startup scripts: `Duration::from_secs(30)` (allow time for cluster stabilization)
377    /// - Development/testing: `Duration::from_secs(5)` (balance between speed and reliability)
378    ///
379    /// # Returns
380    /// - `Ok(LeaderInfo)` - Leader elected successfully
381    /// - `Err(...)` - Timeout or cluster unavailable
382    ///
383    /// # Example
384    /// ```ignore
385    /// // Single-node development
386    /// let engine = EmbeddedEngine::start().await?;
387    /// let leader = engine.wait_ready(Duration::from_secs(3)).await?;
388    ///
389    /// // Multi-node production
390    /// let engine = EmbeddedEngine::start_with("cluster.toml").await?;
391    /// let leader = engine.wait_ready(Duration::from_secs(10)).await?;
392    /// println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
393    /// ```
394    pub async fn wait_ready(
395        &self,
396        timeout: std::time::Duration,
397    ) -> Result<crate::LeaderInfo> {
398        let mut rx = self.inner.leader_elected_rx.clone();
399
400        tokio::time::timeout(timeout, async {
401            // Check current value first (leader may already be elected)
402            if let Some(info) = rx.borrow().as_ref() {
403                info!(
404                    "Leader already elected: {} (term {})",
405                    info.leader_id, info.term
406                );
407                return Ok(*info);
408            }
409
410            loop {
411                // Wait for leader election event (event-driven, no polling)
412                let _ = rx.changed().await;
413
414                // Check if a leader is elected
415                if let Some(info) = rx.borrow().as_ref() {
416                    info!("Leader elected: {} (term {})", info.leader_id, info.term);
417                    return Ok(*info);
418                }
419            }
420        })
421        .await
422        .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
423    }
424
425    /// Subscribe to leader change notifications.
426    ///
427    /// Returns a receiver that will be notified whenever:
428    /// - First leader is elected
429    /// - Leader changes (re-election)
430    /// - No leader exists (during election)
431    ///
432    /// # Performance
433    /// Event-driven notification (no polling), <1ms latency
434    ///
435    /// # Example
436    /// ```ignore
437    /// let mut leader_rx = engine.leader_change_notifier();
438    /// tokio::spawn(async move {
439    ///     while leader_rx.changed().await.is_ok() {
440    ///         match leader_rx.borrow().as_ref() {
441    ///             Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
442    ///             None => println!("No leader"),
443    ///         }
444    ///     }
445    /// });
446    /// ```
447    pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
448        self.inner.leader_elected_rx.clone()
449    }
450
451    /// Returns true if the current node is the Raft leader.
452    ///
453    /// # Use Cases
454    /// - Load balancer health checks (e.g. HAProxy `/primary` endpoint returning HTTP 200/503)
455    /// - Prevent write requests to followers before they fail
456    /// - Application-level request routing decisions
457    ///
458    /// # Performance
459    /// Zero-cost operation (reads cached leader state from watch channel)
460    ///
461    /// # Example
462    /// ```ignore
463    /// // Load balancer health check endpoint
464    /// #[get("/primary")]
465    /// async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
466    ///     if engine.is_leader() {
467    ///         StatusCode::OK  // Routes writes here
468    ///     } else {
469    ///         StatusCode::SERVICE_UNAVAILABLE
470    ///     }
471    /// }
472    ///
473    /// // Application request handler
474    /// if engine.is_leader() {
475    ///     client.put(key, value).await?;
476    /// } else {
477    ///     return Err("Not leader, write rejected");
478    /// }
479    /// ```
480    pub fn is_leader(&self) -> bool {
481        self.inner
482            .leader_elected_rx
483            .borrow()
484            .as_ref()
485            .map(|info| info.leader_id == self.inner.client.node_id())
486            .unwrap_or(false)
487    }
488
489    /// Returns current leader information if available.
490    ///
491    /// # Returns
492    /// - `Some(LeaderInfo)` if a leader is elected (includes leader_id and term)
493    /// - `None` if no leader exists (during election or network partition)
494    ///
495    /// # Use Cases
496    /// - Monitoring dashboards showing cluster state
497    /// - Debugging leader election issues
498    /// - Logging cluster topology changes
499    ///
500    /// # Example
501    /// ```ignore
502    /// if let Some(info) = engine.leader_info() {
503    ///     println!("Leader: {} (term {})", info.leader_id, info.term);
504    /// } else {
505    ///     println!("No leader elected, cluster unavailable");
506    /// }
507    /// ```
508    pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
509        *self.inner.leader_elected_rx.borrow()
510    }
511
512    /// Get a reference to the local KV client.
513    ///
514    /// The client is available immediately after `start()`,
515    /// but requests will only succeed after `wait_ready()` completes.
516    ///
517    /// # Example
518    /// ```ignore
519    /// let engine = EmbeddedEngine::start().await?;
520    /// engine.wait_ready(Duration::from_secs(5)).await?;
521    /// let client = engine.client();
522    /// client.put(b"key", b"value").await?;
523    /// ```
524    pub fn client(&self) -> Arc<EmbeddedClient> {
525        Arc::clone(&self.inner.client)
526    }
527
528    /// Gracefully stop the embedded engine.
529    /// Stop the embedded d-engine gracefully (idempotent).
530    ///
531    /// This method:
532    /// 1. Sends shutdown signal to node
533    /// 2. Waits for node.run() to complete
534    /// 3. Propagates any errors from node execution
535    ///
536    /// Safe to call multiple times - subsequent calls are no-ops.
537    ///
538    /// # Errors
539    /// Returns error if node encountered issues during shutdown.
540    ///
541    /// # Example
542    /// ```ignore
543    /// engine.stop().await?;
544    /// engine.stop().await?;  // No-op, returns Ok(())
545    /// ```
546    pub async fn stop(&self) -> Result<()> {
547        let mut is_stopped = self.inner.is_stopped.lock().await;
548        if *is_stopped {
549            return Ok(());
550        }
551
552        info!("Stopping embedded d-engine");
553
554        // Send shutdown signal
555        let _ = self.inner.shutdown_tx.send(());
556
557        // Wait for node task to complete
558        let mut handle_guard = self.inner.node_handle.lock().await;
559        if let Some(handle) = handle_guard.take() {
560            match handle.await {
561                Ok(result) => {
562                    info!("Embedded d-engine stopped successfully");
563                    *is_stopped = true;
564                    result
565                }
566                Err(e) => {
567                    error!("Node task panicked: {:?}", e);
568                    *is_stopped = true;
569                    Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
570                }
571            }
572        } else {
573            *is_stopped = true;
574            Ok(())
575        }
576    }
577
578    /// Check if the engine has been stopped.
579    ///
580    /// # Example
581    /// ```ignore
582    /// if engine.is_stopped() {
583    ///     println!("Engine is stopped");
584    /// }
585    /// ```
586    pub async fn is_stopped(&self) -> bool {
587        *self.inner.is_stopped.lock().await
588    }
589
590    /// Returns the node ID for testing purposes.
591    ///
592    /// Useful in integration tests that need to identify which node
593    /// they're interacting with, especially in multi-node scenarios.
594    pub fn node_id(&self) -> u32 {
595        self.inner.client.node_id()
596    }
597}
598
599impl Drop for EmbeddedEngine {
600    fn drop(&mut self) {
601        // Warn if stop() was not called
602        if let Ok(handle) = self.inner.node_handle.try_lock() {
603            if let Some(h) = &*handle {
604                if !h.is_finished() {
605                    error!(
606                        "EmbeddedEngine dropped without calling stop() - background task may leak"
607                    );
608                }
609            }
610        }
611    }
612}