d_engine_server/api/
embedded.rs

1//! Embedded mode for d-engine - application-friendly API
2//!
3//! This module provides [`EmbeddedEngine`], a high-level wrapper around [`Node`]
4//! that simplifies lifecycle management for embedded use cases.
5//!
6//! ## Comparison: Node vs EmbeddedEngine
7//!
8//! ### Using Node (Low-level API)
9//! ```ignore
10//! let node = NodeBuilder::new(config).start().await?;
11//! let client = node.local_client();
12//! tokio::spawn(async move { node.run().await });
13//! // Manual lifecycle management required
14//! ```
15//!
16//! ### Using EmbeddedEngine (High-level API)
17//! ```ignore
18//! let engine = EmbeddedEngine::start().await?;
19//! engine.wait_ready(Duration::from_secs(5)).await?;
20//! let client = engine.client();
21//! engine.stop().await?;
22//! // Lifecycle managed automatically
23//! ```
24//!
25//! ## When to Use
26//!
27//! - **EmbeddedEngine**: Application developers who want simplicity
28//! - **Node**: Framework developers who need fine-grained control
29//!
30//! # Application Responsibilities in Multi-Node Deployments
31//!
32//! EmbeddedEngine provides **cluster state APIs** but does NOT handle request routing.
33//! Applications are responsible for handling follower write requests.
34//!
35//! ## What EmbeddedEngine Provides
36//!
37//! - ✅ `is_leader()` - Check if current node is leader
38//! - ✅ `leader_info()` - Get leader ID and term
39//! - ✅ `LocalKvClient` returns `NotLeader` error on follower writes
40//! - ✅ Zero-overhead in-process communication (<0.1ms)
41//!
42//! ## What Applications Must Handle
43//!
44//! - ❌ Request routing (follower → leader forwarding)
45//! - ❌ Load balancing across nodes
46//! - ❌ Health check endpoints for load balancers
47//!
48//! ## Integration Patterns
49//!
50//! ### Pattern 1: Load Balancer Health Checks (Recommended for HA)
51//!
52//! **Application provides HTTP health check endpoints:**
53//! ```ignore
54//! use axum::{Router, routing::get, http::StatusCode};
55//!
56//! // Health check endpoint for load balancer (e.g. HAProxy)
57//! async fn health_primary(engine: Arc<EmbeddedEngine>) -> StatusCode {
58//!     if engine.is_leader() {
59//!         StatusCode::OK  // 200 - Load balancer routes writes here
60//!     } else {
61//!         StatusCode::SERVICE_UNAVAILABLE  // 503 - Load balancer skips this node
62//!     }
63//! }
64//!
65//! async fn health_replica(engine: Arc<EmbeddedEngine>) -> StatusCode {
66//!     if !engine.is_leader() {
67//!         StatusCode::OK  // 200 - Load balancer routes reads here
68//!     } else {
69//!         StatusCode::SERVICE_UNAVAILABLE  // 503
70//!     }
71//! }
72//!
73//! let app = Router::new()
74//!     .route("/primary", get(health_primary))  // For write traffic
75//!     .route("/replica", get(health_replica)); // For read traffic
76//!
77//! axum::Server::bind(&"0.0.0.0:8008".parse()?)
78//!     .serve(app.into_make_service())
79//!     .await?;
80//! ```
81//!
82//! ### Pattern 2: Pre-check Before Write (Simple)
83//!
84//! **Application checks leadership before handling writes:**
85//! ```ignore
86//! async fn handle_write_request(engine: &EmbeddedEngine, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
87//!     if !engine.is_leader() {
88//!         // Return HTTP 503 to client
89//!         return Err("Not leader, please retry on another node");
90//!     }
91//!
92//!     // Safe to write (this node is leader)
93//!     engine.client().put(key, value).await?;
94//!     Ok(())
95//! }
96//! ```
97//!
98//! ### Pattern 3: HTTP 307 Redirect (Alternative)
99//!
100//! **Application returns redirect to leader:**
101//! ```ignore
102//! async fn handle_write_request(engine: &EmbeddedEngine) -> Response {
103//!     match engine.client().put(key, value).await {
104//!         Ok(_) => Response::ok(),
105//!         Err(LocalClientError::NotLeader { leader_address: Some(addr), .. }) => {
106//!             // Redirect client to leader
107//!             Response::redirect_307(addr)
108//!         }
109//!         Err(e) => Response::error(e),
110//!     }
111//! }
112//! ```
113//!
114//! ## Design Philosophy
115//!
116//! **Why doesn't EmbeddedEngine auto-forward writes?**
117//!
118//! 1. **Preserves zero-overhead guarantee** - Auto-forwarding requires gRPC client (adds network/serialization)
119//! 2. **Maintains simplicity** - No hidden network calls in "embedded" mode
120//! 3. **Flexible deployment** - Applications choose load balancer health checks, HTTP redirect, or other strategies
121//! 4. **Performance transparency** - Developers know exactly when network calls occur
122//!
123//! For auto-forwarding with gRPC overhead, use standalone mode with `GrpcKvClient`.
124
125use std::sync::Arc;
126
127#[cfg(feature = "watch")]
128use bytes::Bytes;
129#[cfg(feature = "watch")]
130use d_engine_core::watch::WatchRegistry;
131#[cfg(feature = "watch")]
132use d_engine_core::watch::WatcherHandle;
133use tokio::sync::watch;
134use tokio::task::JoinHandle;
135use tracing::error;
136use tracing::info;
137
138use crate::Result;
139#[cfg(feature = "rocksdb")]
140use crate::RocksDBStateMachine;
141#[cfg(feature = "rocksdb")]
142use crate::RocksDBStorageEngine;
143use crate::StateMachine;
144use crate::StorageEngine;
145use crate::node::LocalKvClient;
146use crate::node::NodeBuilder;
147
148/// Embedded d-engine with automatic lifecycle management.
149///
150/// Provides high-level KV API for embedded usage:
151/// - `start()` / `start_with()` - Initialize and spawn node
152/// - `wait_ready()` - Wait for leader election
153/// - `client()` - Get local KV client
154/// - `stop()` - Graceful shutdown
155///
156/// # Example
157/// ```ignore
158/// use d_engine::EmbeddedEngine;
159/// use std::time::Duration;
160///
161/// let engine = EmbeddedEngine::start().await?;
162/// engine.wait_ready(Duration::from_secs(5)).await?;
163///
164/// let client = engine.client();
165/// client.put(b"key", b"value").await?;
166///
167/// engine.stop().await?;
168/// ```
169pub struct EmbeddedEngine {
170    node_handle: Option<JoinHandle<Result<()>>>,
171    shutdown_tx: watch::Sender<()>,
172    kv_client: LocalKvClient,
173    leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
174    #[cfg(feature = "watch")]
175    watch_registry: Option<Arc<WatchRegistry>>,
176}
177
178impl EmbeddedEngine {
179    /// Start engine with configuration from environment.
180    ///
181    /// Reads `CONFIG_PATH` environment variable or uses default configuration.
182    /// Data directory is determined by config's `cluster.db_root_dir` setting.
183    ///
184    /// # Example
185    /// ```ignore
186    /// // Set config path via environment variable
187    /// std::env::set_var("CONFIG_PATH", "/etc/d-engine/production.toml");
188    ///
189    /// let engine = EmbeddedEngine::start().await?;
190    /// engine.wait_ready(Duration::from_secs(5)).await?;
191    /// ```
192    #[cfg(feature = "rocksdb")]
193    pub async fn start() -> Result<Self> {
194        let config = d_engine_core::RaftNodeConfig::new()?.validate()?;
195        let base_dir = &config.cluster.db_root_dir;
196        tokio::fs::create_dir_all(base_dir)
197            .await
198            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
199
200        let storage_path = base_dir.join("storage");
201        let sm_path = base_dir.join("state_machine");
202
203        let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
204        let mut sm = RocksDBStateMachine::new(sm_path)?;
205
206        // Inject lease if enabled
207        let lease_cfg = &config.raft.state_machine.lease;
208        if lease_cfg.enabled {
209            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
210            sm.set_lease(lease);
211        }
212
213        let sm = Arc::new(sm);
214
215        info!("Starting embedded engine with RocksDB at {:?}", base_dir);
216
217        Self::start_custom(storage, sm, None).await
218    }
219
220    /// Start engine with explicit configuration file.
221    ///
222    /// Reads configuration from specified file path.
223    /// Data directory is determined by config's `cluster.db_root_dir` setting.
224    ///
225    /// # Arguments
226    /// - `config_path`: Path to configuration file (e.g. "d-engine.toml")
227    ///
228    /// # Example
229    /// ```ignore
230    /// let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
231    /// engine.wait_ready(Duration::from_secs(5)).await?;
232    /// ```
233    #[cfg(feature = "rocksdb")]
234    pub async fn start_with(config_path: &str) -> Result<Self> {
235        let config = d_engine_core::RaftNodeConfig::new()?
236            .with_override_config(config_path)?
237            .validate()?;
238        let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
239
240        tokio::fs::create_dir_all(&base_dir)
241            .await
242            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
243
244        let storage_path = base_dir.join("storage");
245        let sm_path = base_dir.join("state_machine");
246
247        let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
248        let mut sm = RocksDBStateMachine::new(sm_path)?;
249
250        // Inject lease if enabled
251        let lease_cfg = &config.raft.state_machine.lease;
252        if lease_cfg.enabled {
253            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
254            sm.set_lease(lease);
255        }
256
257        let sm = Arc::new(sm);
258
259        info!("Starting embedded engine with RocksDB at {:?}", base_dir);
260
261        Self::start_custom(storage, sm, Some(config_path)).await
262    }
263
264    /// Start engine with custom storage and state machine.
265    ///
266    /// Advanced API for users providing custom storage implementations.
267    ///
268    /// # Arguments
269    /// - `config_path`: Optional path to configuration file
270    /// - `storage_engine`: Custom storage engine implementation
271    /// - `state_machine`: Custom state machine implementation
272    ///
273    /// # Example
274    /// ```ignore
275    /// let storage = Arc::new(MyCustomStorage::new()?);
276    /// let sm = Arc::new(MyCustomStateMachine::new()?);
277    /// let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
278    /// ```
279    pub async fn start_custom<SE, SM>(
280        storage_engine: Arc<SE>,
281        state_machine: Arc<SM>,
282        config_path: Option<&str>,
283    ) -> Result<Self>
284    where
285        SE: StorageEngine + std::fmt::Debug + 'static,
286        SM: StateMachine + std::fmt::Debug + 'static,
287    {
288        info!("Starting embedded d-engine");
289
290        // Create shutdown channel
291        let (shutdown_tx, shutdown_rx) = watch::channel(());
292
293        // Load config or use default
294        let node_config = if let Some(path) = config_path {
295            d_engine_core::RaftNodeConfig::default()
296                .with_override_config(path)?
297                .validate()?
298        } else {
299            d_engine_core::RaftNodeConfig::new()?.validate()?
300        };
301
302        // Build node and start RPC server
303        let node = NodeBuilder::init(node_config, shutdown_rx)
304            .storage_engine(storage_engine)
305            .state_machine(state_machine)
306            .start()
307            .await?;
308
309        // Get leader change notifier before moving node
310        let leader_elected_rx = node.leader_change_notifier();
311
312        // Create local KV client before spawning
313        let kv_client = node.local_client();
314
315        // Capture watch registry (if enabled)
316        #[cfg(feature = "watch")]
317        let watch_registry = node.watch_registry.clone();
318
319        // Spawn node.run() in background
320        let node_handle = tokio::spawn(async move {
321            if let Err(e) = node.run().await {
322                error!("Node run error: {:?}", e);
323                Err(e)
324            } else {
325                Ok(())
326            }
327        });
328
329        info!("Embedded d-engine started successfully");
330
331        Ok(Self {
332            node_handle: Some(node_handle),
333            shutdown_tx,
334            kv_client,
335            leader_elected_rx,
336            #[cfg(feature = "watch")]
337            watch_registry,
338        })
339    }
340
341    /// Wait for leader election to complete.
342    ///
343    /// Blocks until a leader has been elected in the cluster.
344    /// Event-driven notification (no polling), <1ms latency.
345    ///
346    /// # Timeout Guidelines
347    ///
348    /// **Single-node mode** (most common for development):
349    /// - Typical: <100ms (near-instant election)
350    /// - Recommended: `Duration::from_secs(3)`
351    ///
352    /// **Multi-node HA cluster** (production):
353    /// - Typical: 1-3s (depends on network latency and `general_raft_timeout_duration_in_ms`)
354    /// - Recommended: `Duration::from_secs(10)`
355    ///
356    /// **Special cases**:
357    /// - Health checks: `Duration::from_secs(3)` (fail fast if cluster unhealthy)
358    /// - Startup scripts: `Duration::from_secs(30)` (allow time for cluster stabilization)
359    /// - Development/testing: `Duration::from_secs(5)` (balance between speed and reliability)
360    ///
361    /// # Returns
362    /// - `Ok(LeaderInfo)` - Leader elected successfully
363    /// - `Err(...)` - Timeout or cluster unavailable
364    ///
365    /// # Example
366    /// ```ignore
367    /// // Single-node development
368    /// let engine = EmbeddedEngine::start().await?;
369    /// let leader = engine.wait_ready(Duration::from_secs(3)).await?;
370    ///
371    /// // Multi-node production
372    /// let engine = EmbeddedEngine::start_with("cluster.toml").await?;
373    /// let leader = engine.wait_ready(Duration::from_secs(10)).await?;
374    /// println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
375    /// ```
376    pub async fn wait_ready(
377        &self,
378        timeout: std::time::Duration,
379    ) -> Result<crate::LeaderInfo> {
380        let mut rx = self.leader_elected_rx.clone();
381
382        tokio::time::timeout(timeout, async {
383            // Check current value first (leader may already be elected)
384            if let Some(info) = rx.borrow().as_ref() {
385                info!(
386                    "Leader already elected: {} (term {})",
387                    info.leader_id, info.term
388                );
389                return Ok(*info);
390            }
391
392            loop {
393                // Wait for leader election event (event-driven, no polling)
394                let _ = rx.changed().await;
395
396                // Check if a leader is elected
397                if let Some(info) = rx.borrow().as_ref() {
398                    info!("Leader elected: {} (term {})", info.leader_id, info.term);
399                    return Ok(*info);
400                }
401            }
402        })
403        .await
404        .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
405    }
406
407    /// Subscribe to leader change notifications.
408    ///
409    /// Returns a receiver that will be notified whenever:
410    /// - First leader is elected
411    /// - Leader changes (re-election)
412    /// - No leader exists (during election)
413    ///
414    /// # Performance
415    /// Event-driven notification (no polling), <1ms latency
416    ///
417    /// # Example
418    /// ```ignore
419    /// let mut leader_rx = engine.leader_change_notifier();
420    /// tokio::spawn(async move {
421    ///     while leader_rx.changed().await.is_ok() {
422    ///         match leader_rx.borrow().as_ref() {
423    ///             Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
424    ///             None => println!("No leader"),
425    ///         }
426    ///     }
427    /// });
428    /// ```
429    pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
430        self.leader_elected_rx.clone()
431    }
432
433    /// Returns true if the current node is the Raft leader.
434    ///
435    /// # Use Cases
436    /// - Load balancer health checks (e.g. HAProxy `/primary` endpoint returning HTTP 200/503)
437    /// - Prevent write requests to followers before they fail
438    /// - Application-level request routing decisions
439    ///
440    /// # Performance
441    /// Zero-cost operation (reads cached leader state from watch channel)
442    ///
443    /// # Example
444    /// ```ignore
445    /// // Load balancer health check endpoint
446    /// #[get("/primary")]
447    /// async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
448    ///     if engine.is_leader() {
449    ///         StatusCode::OK  // Routes writes here
450    ///     } else {
451    ///         StatusCode::SERVICE_UNAVAILABLE
452    ///     }
453    /// }
454    ///
455    /// // Application request handler
456    /// if engine.is_leader() {
457    ///     client.put(key, value).await?;
458    /// } else {
459    ///     return Err("Not leader, write rejected");
460    /// }
461    /// ```
462    pub fn is_leader(&self) -> bool {
463        self.leader_elected_rx
464            .borrow()
465            .as_ref()
466            .map(|info| info.leader_id == self.kv_client.node_id())
467            .unwrap_or(false)
468    }
469
470    /// Returns current leader information if available.
471    ///
472    /// # Returns
473    /// - `Some(LeaderInfo)` if a leader is elected (includes leader_id and term)
474    /// - `None` if no leader exists (during election or network partition)
475    ///
476    /// # Use Cases
477    /// - Monitoring dashboards showing cluster state
478    /// - Debugging leader election issues
479    /// - Logging cluster topology changes
480    ///
481    /// # Example
482    /// ```ignore
483    /// if let Some(info) = engine.leader_info() {
484    ///     println!("Leader: {} (term {})", info.leader_id, info.term);
485    /// } else {
486    ///     println!("No leader elected, cluster unavailable");
487    /// }
488    /// ```
489    pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
490        *self.leader_elected_rx.borrow()
491    }
492
493    /// Get a reference to the local KV client.
494    ///
495    /// The client is available immediately after `start()`,
496    /// but requests will only succeed after `wait_ready()` completes.
497    ///
498    /// # Example
499    /// ```ignore
500    /// let engine = EmbeddedEngine::start().await?;
501    /// engine.wait_ready(Duration::from_secs(5)).await?;
502    /// let client = engine.client();
503    /// client.put(b"key", b"value").await?;
504    /// ```
505    pub fn client(&self) -> &LocalKvClient {
506        &self.kv_client
507    }
508
509    /// Register a watcher for a specific key.
510    ///
511    /// Returns a handle that receives watch events via an mpsc channel.
512    /// The watcher is automatically unregistered when the handle is dropped.
513    ///
514    /// # Arguments
515    /// * `key` - The exact key to watch
516    ///
517    /// # Returns
518    /// * `Result<WatcherHandle>` - Handle for receiving events
519    ///
520    /// # Example
521    /// ```ignore
522    /// let engine = EmbeddedEngine::start().await?;
523    /// let mut handle = engine.watch(b"mykey")?;
524    /// while let Some(event) = handle.receiver_mut().recv().await {
525    ///     println!("Key changed: {:?}", event);
526    /// }
527    /// ```
528    #[cfg(feature = "watch")]
529    pub fn watch(
530        &self,
531        key: impl AsRef<[u8]>,
532    ) -> Result<WatcherHandle> {
533        let registry = self.watch_registry.as_ref().ok_or_else(|| {
534            crate::Error::Fatal(
535                "Watch feature disabled (WatchRegistry not initialized)".to_string(),
536            )
537        })?;
538
539        let key_bytes = Bytes::copy_from_slice(key.as_ref());
540        Ok(registry.register(key_bytes))
541    }
542
543    /// Gracefully stop the embedded engine.
544    ///
545    /// This method:
546    /// 1. Sends shutdown signal to node
547    /// 2. Waits for node.run() to complete
548    /// 3. Propagates any errors from node execution
549    ///
550    /// # Errors
551    /// Returns error if node encountered issues during shutdown.
552    ///
553    /// # Example
554    /// ```ignore
555    /// engine.stop().await?;
556    /// ```
557    pub async fn stop(mut self) -> Result<()> {
558        info!("Stopping embedded d-engine");
559
560        // Send shutdown signal
561        let _ = self.shutdown_tx.send(());
562
563        // Wait for node task to complete
564        if let Some(handle) = self.node_handle.take() {
565            match handle.await {
566                Ok(result) => {
567                    info!("Embedded d-engine stopped successfully");
568                    result
569                }
570                Err(e) => {
571                    error!("Node task panicked: {:?}", e);
572                    Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
573                }
574            }
575        } else {
576            Ok(())
577        }
578    }
579
580    /// Returns the node ID for testing purposes.
581    ///
582    /// Useful in integration tests that need to identify which node
583    /// they're interacting with, especially in multi-node scenarios.
584    pub fn node_id(&self) -> u32 {
585        self.kv_client.node_id()
586    }
587}
588
589impl Drop for EmbeddedEngine {
590    fn drop(&mut self) {
591        // Warn if stop() was not called
592        if let Some(handle) = &self.node_handle {
593            if !handle.is_finished() {
594                error!("EmbeddedEngine dropped without calling stop() - background task may leak");
595            }
596        }
597    }
598}