d_engine_server/api/embedded.rs
1//! Embedded mode for d-engine - application-friendly API
2//!
3//! This module provides [`EmbeddedEngine`], a high-level wrapper around [`Node`]
4//! that simplifies lifecycle management for embedded use cases.
5//!
6//! ## Comparison: Node vs EmbeddedEngine
7//!
8//! ### Using Node (Low-level API)
9//! ```ignore
10//! let node = NodeBuilder::new(config).start().await?;
11//! let client = node.local_client();
12//! tokio::spawn(async move { node.run().await });
13//! // Manual lifecycle management required
14//! ```
15//!
16//! ### Using EmbeddedEngine (High-level API)
17//! ```ignore
18//! let engine = EmbeddedEngine::start().await?;
19//! engine.wait_ready(Duration::from_secs(5)).await?;
20//! let client = engine.client();
21//! engine.stop().await?;
22//! // Lifecycle managed automatically
23//! ```
24//!
25//! ## When to Use
26//!
27//! - **EmbeddedEngine**: Application developers who want simplicity
28//! - **Node**: Framework developers who need fine-grained control
29//!
30//! # Application Responsibilities in Multi-Node Deployments
31//!
32//! EmbeddedEngine provides **cluster state APIs** but does NOT handle request routing.
33//! Applications are responsible for handling follower write requests.
34//!
35//! ## What EmbeddedEngine Provides
36//!
37//! - ✅ `is_leader()` - Check if current node is leader
38//! - ✅ `leader_info()` - Get leader ID and term
39//! - ✅ `LocalKvClient` returns `NotLeader` error on follower writes
40//! - ✅ Zero-overhead in-process communication (<0.1ms)
41//!
42//! ## What Applications Must Handle
43//!
44//! - ❌ Request routing (follower → leader forwarding)
45//! - ❌ Load balancing across nodes
46//! - ❌ Health check endpoints for load balancers
47//!
48//! ## Integration Patterns
49//!
50//! ### Pattern 1: Load Balancer Health Checks (Recommended for HA)
51//!
52//! **Application provides HTTP health check endpoints:**
53//! ```ignore
54//! use axum::{Router, routing::get, http::StatusCode};
55//!
56//! // Health check endpoint for load balancer (e.g. HAProxy)
57//! async fn health_primary(engine: Arc<EmbeddedEngine>) -> StatusCode {
58//! if engine.is_leader() {
59//! StatusCode::OK // 200 - Load balancer routes writes here
60//! } else {
61//! StatusCode::SERVICE_UNAVAILABLE // 503 - Load balancer skips this node
62//! }
63//! }
64//!
65//! async fn health_replica(engine: Arc<EmbeddedEngine>) -> StatusCode {
66//! if !engine.is_leader() {
67//! StatusCode::OK // 200 - Load balancer routes reads here
68//! } else {
69//! StatusCode::SERVICE_UNAVAILABLE // 503
70//! }
71//! }
72//!
73//! let app = Router::new()
74//! .route("/primary", get(health_primary)) // For write traffic
75//! .route("/replica", get(health_replica)); // For read traffic
76//!
77//! axum::Server::bind(&"0.0.0.0:8008".parse()?)
78//! .serve(app.into_make_service())
79//! .await?;
80//! ```
81//!
82//! ### Pattern 2: Pre-check Before Write (Simple)
83//!
84//! **Application checks leadership before handling writes:**
85//! ```ignore
86//! async fn handle_write_request(engine: &EmbeddedEngine, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
87//! if !engine.is_leader() {
88//! // Return HTTP 503 to client
89//! return Err("Not leader, please retry on another node");
90//! }
91//!
92//! // Safe to write (this node is leader)
93//! engine.client().put(key, value).await?;
94//! Ok(())
95//! }
96//! ```
97//!
98//! ### Pattern 3: HTTP 307 Redirect (Alternative)
99//!
100//! **Application returns redirect to leader:**
101//! ```ignore
102//! async fn handle_write_request(engine: &EmbeddedEngine) -> Response {
103//! match engine.client().put(key, value).await {
104//! Ok(_) => Response::ok(),
105//! Err(LocalClientError::NotLeader { leader_address: Some(addr), .. }) => {
106//! // Redirect client to leader
107//! Response::redirect_307(addr)
108//! }
109//! Err(e) => Response::error(e),
110//! }
111//! }
112//! ```
113//!
114//! ## Design Philosophy
115//!
116//! **Why doesn't EmbeddedEngine auto-forward writes?**
117//!
118//! 1. **Preserves zero-overhead guarantee** - Auto-forwarding requires gRPC client (adds network/serialization)
119//! 2. **Maintains simplicity** - No hidden network calls in "embedded" mode
120//! 3. **Flexible deployment** - Applications choose load balancer health checks, HTTP redirect, or other strategies
121//! 4. **Performance transparency** - Developers know exactly when network calls occur
122//!
123//! For auto-forwarding with gRPC overhead, use standalone mode with `GrpcKvClient`.
124
125use std::sync::Arc;
126
127#[cfg(feature = "watch")]
128use bytes::Bytes;
129#[cfg(feature = "watch")]
130use d_engine_core::watch::WatchRegistry;
131#[cfg(feature = "watch")]
132use d_engine_core::watch::WatcherHandle;
133use tokio::sync::watch;
134use tokio::task::JoinHandle;
135use tracing::error;
136use tracing::info;
137
138use crate::Result;
139#[cfg(feature = "rocksdb")]
140use crate::RocksDBStateMachine;
141#[cfg(feature = "rocksdb")]
142use crate::RocksDBStorageEngine;
143use crate::StateMachine;
144use crate::StorageEngine;
145use crate::node::LocalKvClient;
146use crate::node::NodeBuilder;
147
148/// Embedded d-engine with automatic lifecycle management.
149///
150/// Provides high-level KV API for embedded usage:
151/// - `start()` / `start_with()` - Initialize and spawn node
152/// - `wait_ready()` - Wait for leader election
153/// - `client()` - Get local KV client
154/// - `stop()` - Graceful shutdown
155///
156/// # Example
157/// ```ignore
158/// use d_engine::EmbeddedEngine;
159/// use std::time::Duration;
160///
161/// let engine = EmbeddedEngine::start().await?;
162/// engine.wait_ready(Duration::from_secs(5)).await?;
163///
164/// let client = engine.client();
165/// client.put(b"key", b"value").await?;
166///
167/// engine.stop().await?;
168/// ```
169pub struct EmbeddedEngine {
170 node_handle: Option<JoinHandle<Result<()>>>,
171 shutdown_tx: watch::Sender<()>,
172 kv_client: LocalKvClient,
173 leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
174 #[cfg(feature = "watch")]
175 watch_registry: Option<Arc<WatchRegistry>>,
176}
177
178impl EmbeddedEngine {
179 /// Start engine with configuration from environment.
180 ///
181 /// Reads `CONFIG_PATH` environment variable or uses default configuration.
182 /// Data directory is determined by config's `cluster.db_root_dir` setting.
183 ///
184 /// # Example
185 /// ```ignore
186 /// // Set config path via environment variable
187 /// std::env::set_var("CONFIG_PATH", "/etc/d-engine/production.toml");
188 ///
189 /// let engine = EmbeddedEngine::start().await?;
190 /// engine.wait_ready(Duration::from_secs(5)).await?;
191 /// ```
192 #[cfg(feature = "rocksdb")]
193 pub async fn start() -> Result<Self> {
194 let config = d_engine_core::RaftNodeConfig::new()?.validate()?;
195 let base_dir = &config.cluster.db_root_dir;
196 tokio::fs::create_dir_all(base_dir)
197 .await
198 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
199
200 let storage_path = base_dir.join("storage");
201 let sm_path = base_dir.join("state_machine");
202
203 let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
204 let mut sm = RocksDBStateMachine::new(sm_path)?;
205
206 // Inject lease if enabled
207 let lease_cfg = &config.raft.state_machine.lease;
208 if lease_cfg.enabled {
209 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
210 sm.set_lease(lease);
211 }
212
213 let sm = Arc::new(sm);
214
215 info!("Starting embedded engine with RocksDB at {:?}", base_dir);
216
217 Self::start_custom(storage, sm, None).await
218 }
219
220 /// Start engine with explicit configuration file.
221 ///
222 /// Reads configuration from specified file path.
223 /// Data directory is determined by config's `cluster.db_root_dir` setting.
224 ///
225 /// # Arguments
226 /// - `config_path`: Path to configuration file (e.g. "d-engine.toml")
227 ///
228 /// # Example
229 /// ```ignore
230 /// let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
231 /// engine.wait_ready(Duration::from_secs(5)).await?;
232 /// ```
233 #[cfg(feature = "rocksdb")]
234 pub async fn start_with(config_path: &str) -> Result<Self> {
235 let config = d_engine_core::RaftNodeConfig::new()?
236 .with_override_config(config_path)?
237 .validate()?;
238 let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
239
240 tokio::fs::create_dir_all(&base_dir)
241 .await
242 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
243
244 let storage_path = base_dir.join("storage");
245 let sm_path = base_dir.join("state_machine");
246
247 let storage = Arc::new(RocksDBStorageEngine::new(storage_path)?);
248 let mut sm = RocksDBStateMachine::new(sm_path)?;
249
250 // Inject lease if enabled
251 let lease_cfg = &config.raft.state_machine.lease;
252 if lease_cfg.enabled {
253 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
254 sm.set_lease(lease);
255 }
256
257 let sm = Arc::new(sm);
258
259 info!("Starting embedded engine with RocksDB at {:?}", base_dir);
260
261 Self::start_custom(storage, sm, Some(config_path)).await
262 }
263
264 /// Start engine with custom storage and state machine.
265 ///
266 /// Advanced API for users providing custom storage implementations.
267 ///
268 /// # Arguments
269 /// - `config_path`: Optional path to configuration file
270 /// - `storage_engine`: Custom storage engine implementation
271 /// - `state_machine`: Custom state machine implementation
272 ///
273 /// # Example
274 /// ```ignore
275 /// let storage = Arc::new(MyCustomStorage::new()?);
276 /// let sm = Arc::new(MyCustomStateMachine::new()?);
277 /// let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
278 /// ```
279 pub async fn start_custom<SE, SM>(
280 storage_engine: Arc<SE>,
281 state_machine: Arc<SM>,
282 config_path: Option<&str>,
283 ) -> Result<Self>
284 where
285 SE: StorageEngine + std::fmt::Debug + 'static,
286 SM: StateMachine + std::fmt::Debug + 'static,
287 {
288 info!("Starting embedded d-engine");
289
290 // Create shutdown channel
291 let (shutdown_tx, shutdown_rx) = watch::channel(());
292
293 // Load config or use default
294 let node_config = if let Some(path) = config_path {
295 d_engine_core::RaftNodeConfig::default()
296 .with_override_config(path)?
297 .validate()?
298 } else {
299 d_engine_core::RaftNodeConfig::new()?.validate()?
300 };
301
302 // Build node and start RPC server
303 let node = NodeBuilder::init(node_config, shutdown_rx)
304 .storage_engine(storage_engine)
305 .state_machine(state_machine)
306 .start()
307 .await?;
308
309 // Get leader change notifier before moving node
310 let leader_elected_rx = node.leader_change_notifier();
311
312 // Create local KV client before spawning
313 let kv_client = node.local_client();
314
315 // Capture watch registry (if enabled)
316 #[cfg(feature = "watch")]
317 let watch_registry = node.watch_registry.clone();
318
319 // Spawn node.run() in background
320 let node_handle = tokio::spawn(async move {
321 if let Err(e) = node.run().await {
322 error!("Node run error: {:?}", e);
323 Err(e)
324 } else {
325 Ok(())
326 }
327 });
328
329 info!("Embedded d-engine started successfully");
330
331 Ok(Self {
332 node_handle: Some(node_handle),
333 shutdown_tx,
334 kv_client,
335 leader_elected_rx,
336 #[cfg(feature = "watch")]
337 watch_registry,
338 })
339 }
340
341 /// Wait for leader election to complete.
342 ///
343 /// Blocks until a leader has been elected in the cluster.
344 /// Event-driven notification (no polling), <1ms latency.
345 ///
346 /// # Timeout Guidelines
347 ///
348 /// **Single-node mode** (most common for development):
349 /// - Typical: <100ms (near-instant election)
350 /// - Recommended: `Duration::from_secs(3)`
351 ///
352 /// **Multi-node HA cluster** (production):
353 /// - Typical: 1-3s (depends on network latency and `general_raft_timeout_duration_in_ms`)
354 /// - Recommended: `Duration::from_secs(10)`
355 ///
356 /// **Special cases**:
357 /// - Health checks: `Duration::from_secs(3)` (fail fast if cluster unhealthy)
358 /// - Startup scripts: `Duration::from_secs(30)` (allow time for cluster stabilization)
359 /// - Development/testing: `Duration::from_secs(5)` (balance between speed and reliability)
360 ///
361 /// # Returns
362 /// - `Ok(LeaderInfo)` - Leader elected successfully
363 /// - `Err(...)` - Timeout or cluster unavailable
364 ///
365 /// # Example
366 /// ```ignore
367 /// // Single-node development
368 /// let engine = EmbeddedEngine::start().await?;
369 /// let leader = engine.wait_ready(Duration::from_secs(3)).await?;
370 ///
371 /// // Multi-node production
372 /// let engine = EmbeddedEngine::start_with("cluster.toml").await?;
373 /// let leader = engine.wait_ready(Duration::from_secs(10)).await?;
374 /// println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
375 /// ```
376 pub async fn wait_ready(
377 &self,
378 timeout: std::time::Duration,
379 ) -> Result<crate::LeaderInfo> {
380 let mut rx = self.leader_elected_rx.clone();
381
382 tokio::time::timeout(timeout, async {
383 // Check current value first (leader may already be elected)
384 if let Some(info) = rx.borrow().as_ref() {
385 info!(
386 "Leader already elected: {} (term {})",
387 info.leader_id, info.term
388 );
389 return Ok(*info);
390 }
391
392 loop {
393 // Wait for leader election event (event-driven, no polling)
394 let _ = rx.changed().await;
395
396 // Check if a leader is elected
397 if let Some(info) = rx.borrow().as_ref() {
398 info!("Leader elected: {} (term {})", info.leader_id, info.term);
399 return Ok(*info);
400 }
401 }
402 })
403 .await
404 .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
405 }
406
407 /// Subscribe to leader change notifications.
408 ///
409 /// Returns a receiver that will be notified whenever:
410 /// - First leader is elected
411 /// - Leader changes (re-election)
412 /// - No leader exists (during election)
413 ///
414 /// # Performance
415 /// Event-driven notification (no polling), <1ms latency
416 ///
417 /// # Example
418 /// ```ignore
419 /// let mut leader_rx = engine.leader_change_notifier();
420 /// tokio::spawn(async move {
421 /// while leader_rx.changed().await.is_ok() {
422 /// match leader_rx.borrow().as_ref() {
423 /// Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
424 /// None => println!("No leader"),
425 /// }
426 /// }
427 /// });
428 /// ```
429 pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
430 self.leader_elected_rx.clone()
431 }
432
433 /// Returns true if the current node is the Raft leader.
434 ///
435 /// # Use Cases
436 /// - Load balancer health checks (e.g. HAProxy `/primary` endpoint returning HTTP 200/503)
437 /// - Prevent write requests to followers before they fail
438 /// - Application-level request routing decisions
439 ///
440 /// # Performance
441 /// Zero-cost operation (reads cached leader state from watch channel)
442 ///
443 /// # Example
444 /// ```ignore
445 /// // Load balancer health check endpoint
446 /// #[get("/primary")]
447 /// async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
448 /// if engine.is_leader() {
449 /// StatusCode::OK // Routes writes here
450 /// } else {
451 /// StatusCode::SERVICE_UNAVAILABLE
452 /// }
453 /// }
454 ///
455 /// // Application request handler
456 /// if engine.is_leader() {
457 /// client.put(key, value).await?;
458 /// } else {
459 /// return Err("Not leader, write rejected");
460 /// }
461 /// ```
462 pub fn is_leader(&self) -> bool {
463 self.leader_elected_rx
464 .borrow()
465 .as_ref()
466 .map(|info| info.leader_id == self.kv_client.node_id())
467 .unwrap_or(false)
468 }
469
470 /// Returns current leader information if available.
471 ///
472 /// # Returns
473 /// - `Some(LeaderInfo)` if a leader is elected (includes leader_id and term)
474 /// - `None` if no leader exists (during election or network partition)
475 ///
476 /// # Use Cases
477 /// - Monitoring dashboards showing cluster state
478 /// - Debugging leader election issues
479 /// - Logging cluster topology changes
480 ///
481 /// # Example
482 /// ```ignore
483 /// if let Some(info) = engine.leader_info() {
484 /// println!("Leader: {} (term {})", info.leader_id, info.term);
485 /// } else {
486 /// println!("No leader elected, cluster unavailable");
487 /// }
488 /// ```
489 pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
490 *self.leader_elected_rx.borrow()
491 }
492
493 /// Get a reference to the local KV client.
494 ///
495 /// The client is available immediately after `start()`,
496 /// but requests will only succeed after `wait_ready()` completes.
497 ///
498 /// # Example
499 /// ```ignore
500 /// let engine = EmbeddedEngine::start().await?;
501 /// engine.wait_ready(Duration::from_secs(5)).await?;
502 /// let client = engine.client();
503 /// client.put(b"key", b"value").await?;
504 /// ```
505 pub fn client(&self) -> &LocalKvClient {
506 &self.kv_client
507 }
508
509 /// Register a watcher for a specific key.
510 ///
511 /// Returns a handle that receives watch events via an mpsc channel.
512 /// The watcher is automatically unregistered when the handle is dropped.
513 ///
514 /// # Arguments
515 /// * `key` - The exact key to watch
516 ///
517 /// # Returns
518 /// * `Result<WatcherHandle>` - Handle for receiving events
519 ///
520 /// # Example
521 /// ```ignore
522 /// let engine = EmbeddedEngine::start().await?;
523 /// let mut handle = engine.watch(b"mykey")?;
524 /// while let Some(event) = handle.receiver_mut().recv().await {
525 /// println!("Key changed: {:?}", event);
526 /// }
527 /// ```
528 #[cfg(feature = "watch")]
529 pub fn watch(
530 &self,
531 key: impl AsRef<[u8]>,
532 ) -> Result<WatcherHandle> {
533 let registry = self.watch_registry.as_ref().ok_or_else(|| {
534 crate::Error::Fatal(
535 "Watch feature disabled (WatchRegistry not initialized)".to_string(),
536 )
537 })?;
538
539 let key_bytes = Bytes::copy_from_slice(key.as_ref());
540 Ok(registry.register(key_bytes))
541 }
542
543 /// Gracefully stop the embedded engine.
544 ///
545 /// This method:
546 /// 1. Sends shutdown signal to node
547 /// 2. Waits for node.run() to complete
548 /// 3. Propagates any errors from node execution
549 ///
550 /// # Errors
551 /// Returns error if node encountered issues during shutdown.
552 ///
553 /// # Example
554 /// ```ignore
555 /// engine.stop().await?;
556 /// ```
557 pub async fn stop(mut self) -> Result<()> {
558 info!("Stopping embedded d-engine");
559
560 // Send shutdown signal
561 let _ = self.shutdown_tx.send(());
562
563 // Wait for node task to complete
564 if let Some(handle) = self.node_handle.take() {
565 match handle.await {
566 Ok(result) => {
567 info!("Embedded d-engine stopped successfully");
568 result
569 }
570 Err(e) => {
571 error!("Node task panicked: {:?}", e);
572 Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
573 }
574 }
575 } else {
576 Ok(())
577 }
578 }
579
580 /// Returns the node ID for testing purposes.
581 ///
582 /// Useful in integration tests that need to identify which node
583 /// they're interacting with, especially in multi-node scenarios.
584 pub fn node_id(&self) -> u32 {
585 self.kv_client.node_id()
586 }
587}
588
589impl Drop for EmbeddedEngine {
590 fn drop(&mut self) {
591 // Warn if stop() was not called
592 if let Some(handle) = &self.node_handle {
593 if !handle.is_finished() {
594 error!("EmbeddedEngine dropped without calling stop() - background task may leak");
595 }
596 }
597 }
598}