Skip to main content

d_engine_server/api/
embedded.rs

1//! Embedded mode for d-engine - application-friendly API
2//!
3//! This module provides [`EmbeddedEngine`], a high-level wrapper around [`Node`]
4//! that simplifies lifecycle management for embedded use cases.
5//!
6//! ## Comparison: Node vs EmbeddedEngine
7//!
8//! ### Using Node (Low-level API)
9//! ```ignore
10//! let node = NodeBuilder::new(config).start().await?;
11//! // Node does not provide client directly - use EmbeddedEngine instead
12//! tokio::spawn(async move { node.run().await });
13//! // Manual lifecycle management required
14//! ```
15//!
16//! ### Using EmbeddedEngine (High-level API)
17//! ```ignore
18//! let engine = EmbeddedEngine::start("./data/my-app").await?;
19//! engine.wait_ready(Duration::from_secs(5)).await?;
20//! let client = engine.client();
21//! engine.stop().await?;
22//! // Lifecycle managed automatically
23//! ```
24//!
25//! ## When to Use
26//!
27//! - **EmbeddedEngine**: Application developers who want simplicity
28//! - **Node**: Framework developers who need fine-grained control
29//!
30//! # Application Responsibilities in Multi-Node Deployments
31//!
32//! EmbeddedEngine provides **cluster state APIs** but does NOT handle request routing.
33//! Applications are responsible for handling follower write requests.
34//!
35//! ## What EmbeddedEngine Provides
36//!
37//! - ✅ `is_leader()` - Check if current node is leader
38//! - ✅ `leader_info()` - Get leader ID and term
39//! - ✅ `EmbeddedClient` returns `NotLeader` error on follower writes
40//! - ✅ Zero-overhead in-process communication (<0.1ms)
41//!
42//! ## What Applications Must Handle
43//!
44//! - ❌ Request routing (follower → leader forwarding)
45//! - ❌ Load balancing across nodes
46//! - ❌ Health check endpoints for load balancers
47//!
48//! ## Integration Patterns
49//!
50//! ### Pattern 1: Load Balancer Health Checks (Recommended for HA)
51//!
52//! **Application provides HTTP health check endpoints:**
53//! ```ignore
54//! use axum::{Router, routing::get, http::StatusCode};
55//!
56//! // Health check endpoint for load balancer (e.g. HAProxy)
57//! async fn health_primary(engine: Arc<EmbeddedEngine>) -> StatusCode {
58//!     if engine.is_leader() {
59//!         StatusCode::OK  // 200 - Load balancer routes writes here
60//!     } else {
61//!         StatusCode::SERVICE_UNAVAILABLE  // 503 - Load balancer skips this node
62//!     }
63//! }
64//!
65//! async fn health_replica(engine: Arc<EmbeddedEngine>) -> StatusCode {
66//!     if !engine.is_leader() {
67//!         StatusCode::OK  // 200 - Load balancer routes reads here
68//!     } else {
69//!         StatusCode::SERVICE_UNAVAILABLE  // 503
70//!     }
71//! }
72//!
73//! let app = Router::new()
74//!     .route("/primary", get(health_primary))  // For write traffic
75//!     .route("/replica", get(health_replica)); // For read traffic
76//!
77//! axum::Server::bind(&"0.0.0.0:8008".parse()?)
78//!     .serve(app.into_make_service())
79//!     .await?;
80//! ```
81//!
82//! ### Pattern 2: Pre-check Before Write (Simple)
83//!
84//! **Application checks leadership before handling writes:**
85//! ```ignore
86//! async fn handle_write_request(engine: &EmbeddedEngine, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
87//!     if !engine.is_leader() {
88//!         // Return HTTP 503 to client
89//!         return Err("Not leader, please retry on another node");
90//!     }
91//!
92//!     // Safe to write (this node is leader)
93//!     engine.client().put(key, value).await?;
94//!     Ok(())
95//! }
96//! ```
97//!
98//! ### Pattern 3: HTTP 307 Redirect (Alternative)
99//!
100//! **Application returns redirect to leader:**
101//! ```ignore
102//! async fn handle_write_request(engine: &EmbeddedEngine) -> Response {
103//!     match engine.client().put(key, value).await {
104//!         Ok(_) => Response::ok(),
105//!         Err(LocalClientError::NotLeader { leader_address: Some(addr), .. }) => {
106//!             // Redirect client to leader
107//!             Response::redirect_307(addr)
108//!         }
109//!         Err(e) => Response::error(e),
110//!     }
111//! }
112//! ```
113//!
114//! ## Design Philosophy
115//!
116//! **Why doesn't EmbeddedEngine auto-forward writes?**
117//!
118//! 1. **Preserves zero-overhead guarantee** - Auto-forwarding requires gRPC client (adds network/serialization)
119//! 2. **Maintains simplicity** - No hidden network calls in "embedded" mode
120//! 3. **Flexible deployment** - Applications choose load balancer health checks, HTTP redirect, or other strategies
121//! 4. **Performance transparency** - Developers know exactly when network calls occur
122//!
123//! For auto-forwarding with gRPC overhead, use standalone mode with `GrpcClient`.
124
125use crate::Result;
126#[cfg(feature = "rocksdb")]
127use crate::RocksDBStateMachine;
128#[cfg(feature = "rocksdb")]
129use crate::RocksDBStorageEngine;
130#[cfg(feature = "rocksdb")]
131use crate::RocksDBUnifiedEngine;
132use crate::StateMachine;
133use crate::StorageEngine;
134use crate::api::EmbeddedClient;
135use crate::node::NodeBuilder;
136use std::sync::Arc;
137use std::time::Duration;
138use tokio::sync::Mutex;
139use tokio::sync::watch;
140use tokio::task::JoinHandle;
141use tracing::error;
142use tracing::info;
143
144struct Inner {
145    node_handle: Mutex<Option<JoinHandle<Result<()>>>>,
146    shutdown_tx: watch::Sender<()>,
147    client: Arc<EmbeddedClient>,
148    leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
149    membership_rx: watch::Receiver<crate::membership::MembershipSnapshot>,
150    is_stopped: Mutex<bool>,
151    node_id: u32,
152}
153
154/// Embedded d-engine with automatic lifecycle management.
155///
156/// **Thread-safe**: Clone and share across threads freely.
157/// All methods use `&self` - safe to call from multiple contexts.
158///
159/// Provides high-level KV API for embedded usage:
160/// - `start()` / `start_with()` - Initialize and spawn node
161/// - `wait_ready()` - Wait for leader election
162/// - `client()` - Get embedded client
163/// - `stop()` - Graceful shutdown
164///
165/// # Example
166/// ```ignore
167/// use d_engine::EmbeddedEngine;
168/// use std::time::Duration;
169///
170/// let engine = EmbeddedEngine::start("./data/my-app").await?;
171/// engine.wait_ready(Duration::from_secs(5)).await?;
172///
173/// let client = engine.client();
174/// client.put(b"key", b"value").await?;
175///
176/// engine.stop().await?;
177/// ```
178#[derive(Clone)]
179pub struct EmbeddedEngine {
180    inner: Arc<Inner>,
181}
182
183impl EmbeddedEngine {
184    /// Start engine with an explicit data directory.
185    ///
186    /// `data_dir` has highest priority and always overrides `cluster.db_root_dir` from
187    /// `CONFIG_PATH` or `RAFT__` environment variables. Other configuration (network,
188    /// Raft timeouts, cluster topology) is still read from those sources if set.
189    ///
190    /// The directory is created automatically if it does not exist.
191    /// If it already contains data the engine opens it in place (idempotent).
192    ///
193    /// # Example
194    /// ```ignore
195    /// // Minimal — just supply a path
196    /// let engine = EmbeddedEngine::start("./data/my-app").await?;
197    /// engine.wait_ready(Duration::from_secs(5)).await?;
198    ///
199    /// // Works with any AsRef<Path>
200    /// let engine = EmbeddedEngine::start(std::path::Path::new("/var/lib/my-app")).await?;
201    /// ```
202    #[cfg(feature = "rocksdb")]
203    pub async fn start(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
204        let mut config = d_engine_core::RaftNodeConfig::new()?;
205        config.cluster.db_root_dir = data_dir.as_ref().to_path_buf();
206        let config = config.validate()?;
207
208        let base_dir = config.cluster.db_root_dir.clone();
209        tokio::fs::create_dir_all(&base_dir)
210            .await
211            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
212
213        let (storage, mut sm) = if config.storage.unified_db {
214            let db_path = base_dir.join("db");
215            info!(
216                "Starting embedded engine with unified RocksDB at {:?}",
217                db_path
218            );
219            RocksDBUnifiedEngine::open(&db_path)?
220        } else {
221            info!(
222                "Starting embedded engine with separate RocksDB instances at {:?}",
223                base_dir
224            );
225            let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
226            let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
227            (storage, sm)
228        };
229
230        let lease_cfg = &config.raft.state_machine.lease;
231        if lease_cfg.enabled {
232            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
233            sm.set_lease(lease);
234        }
235
236        Self::start_node(config, Arc::new(storage), Arc::new(sm)).await
237    }
238
239    /// Start engine with explicit configuration file.
240    ///
241    /// Reads configuration from specified file path.
242    /// Data directory is determined by config's `cluster.db_root_dir` setting.
243    ///
244    /// # Arguments
245    /// - `config_path`: Path to configuration file (e.g. "d-engine.toml")
246    ///
247    /// # Example
248    /// ```ignore
249    /// let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
250    /// engine.wait_ready(Duration::from_secs(5)).await?;
251    /// ```
252    #[cfg(feature = "rocksdb")]
253    pub async fn start_with(config_path: &str) -> Result<Self> {
254        let config = d_engine_core::RaftNodeConfig::new()?
255            .with_override_config(config_path)?
256            .validate()?;
257        let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
258
259        tokio::fs::create_dir_all(&base_dir)
260            .await
261            .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
262
263        let (storage, mut sm) = if config.storage.unified_db {
264            let db_path = base_dir.join("db");
265            info!(
266                "Starting embedded engine with unified RocksDB at {:?}",
267                db_path
268            );
269            RocksDBUnifiedEngine::open(&db_path)?
270        } else {
271            info!(
272                "Starting embedded engine with separate RocksDB instances at {:?}",
273                base_dir
274            );
275            let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
276            let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
277            (storage, sm)
278        };
279
280        // Inject lease if enabled
281        let lease_cfg = &config.raft.state_machine.lease;
282        if lease_cfg.enabled {
283            let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
284            sm.set_lease(lease);
285        }
286
287        Self::start_custom(Arc::new(storage), Arc::new(sm), Some(config_path)).await
288    }
289
290    /// Start engine with custom storage and state machine.
291    ///
292    /// Advanced API for users providing custom storage implementations.
293    ///
294    /// # Arguments
295    /// - `config_path`: Optional path to configuration file
296    /// - `storage_engine`: Custom storage engine implementation
297    /// - `state_machine`: Custom state machine implementation
298    ///
299    /// # Example
300    /// ```ignore
301    /// let storage = Arc::new(MyCustomStorage::new()?);
302    /// let sm = Arc::new(MyCustomStateMachine::new()?);
303    /// let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
304    /// ```
305    pub async fn start_custom<SE, SM>(
306        storage_engine: Arc<SE>,
307        state_machine: Arc<SM>,
308        config_path: Option<&str>,
309    ) -> Result<Self>
310    where
311        SE: StorageEngine + std::fmt::Debug + 'static,
312        SM: StateMachine + std::fmt::Debug + 'static,
313    {
314        let node_config = if let Some(path) = config_path {
315            d_engine_core::RaftNodeConfig::default()
316                .with_override_config(path)?
317                .validate()?
318        } else {
319            d_engine_core::RaftNodeConfig::new()?.validate()?
320        };
321
322        Self::start_node(node_config, storage_engine, state_machine).await
323    }
324
325    /// Build and launch the node from a validated config and pre-built storage.
326    async fn start_node<SE, SM>(
327        node_config: d_engine_core::RaftNodeConfig,
328        storage_engine: Arc<SE>,
329        state_machine: Arc<SM>,
330    ) -> Result<Self>
331    where
332        SE: StorageEngine + std::fmt::Debug + 'static,
333        SM: StateMachine + std::fmt::Debug + 'static,
334    {
335        info!("Starting embedded d-engine");
336
337        let (shutdown_tx, shutdown_rx) = watch::channel(());
338
339        let node = NodeBuilder::init(node_config, shutdown_rx)
340            .storage_engine(storage_engine)
341            .state_machine(state_machine)
342            .start()
343            .await?;
344
345        let leader_elected_rx = node.leader_change_notifier();
346        let membership_rx = node.membership_change_notifier();
347
348        #[cfg(not(feature = "watch"))]
349        let client = Arc::new(EmbeddedClient::new_internal(
350            node.event_tx.clone(),
351            node.cmd_tx.clone(),
352            node.node_id,
353            Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
354        ));
355
356        #[cfg(feature = "watch")]
357        let client = {
358            let watch_registry = node.watch_registry.clone();
359            let mut client = EmbeddedClient::new_internal(
360                node.event_tx.clone(),
361                node.cmd_tx.clone(),
362                node.node_id,
363                Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
364            );
365            if let Some(registry) = &watch_registry {
366                client = client.with_watch_registry(registry.clone());
367            }
368            Arc::new(client)
369        };
370
371        let node_id = node.node_id();
372
373        let node_handle = tokio::spawn(async move {
374            if let Err(e) = node.run().await {
375                error!("Node run error: {:?}", e);
376                Err(e)
377            } else {
378                Ok(())
379            }
380        });
381
382        info!("Embedded d-engine started successfully");
383
384        Ok(Self {
385            inner: Arc::new(Inner {
386                node_handle: Mutex::new(Some(node_handle)),
387                shutdown_tx,
388                client,
389                leader_elected_rx,
390                membership_rx,
391                is_stopped: Mutex::new(false),
392                node_id,
393            }),
394        })
395    }
396
397    /// Wait until the cluster is ready to serve requests.
398    ///
399    /// Blocks until a leader has been elected **and** its no-op entry is committed
400    /// by a majority of nodes (Raft §8). Only at this point is the leader guaranteed
401    /// to be aware of all previously committed entries and safe to serve reads and writes.
402    ///
403    /// Event-driven notification (no polling), <1ms latency after noop commit.
404    ///
405    /// # Timeout Guidelines
406    ///
407    /// **Single-node mode** (most common for development):
408    /// - Typical: <100ms (near-instant election)
409    /// - Recommended: `Duration::from_secs(3)`
410    ///
411    /// **Multi-node HA cluster** (production):
412    /// - Typical: 1-3s (depends on network latency and `general_raft_timeout_duration_in_ms`)
413    /// - Recommended: `Duration::from_secs(10)`
414    ///
415    /// **Special cases**:
416    /// - Health checks: `Duration::from_secs(3)` (fail fast if cluster unhealthy)
417    /// - Startup scripts: `Duration::from_secs(30)` (allow time for cluster stabilization)
418    /// - Development/testing: `Duration::from_secs(5)` (balance between speed and reliability)
419    ///
420    /// # Returns
421    /// - `Ok(LeaderInfo)` - Leader elected successfully
422    /// - `Err(...)` - Timeout or cluster unavailable
423    ///
424    /// # Example
425    /// ```ignore
426    /// // Single-node development
427    /// let engine = EmbeddedEngine::start("./data/my-app").await?;
428    /// let leader = engine.wait_ready(Duration::from_secs(3)).await?;
429    ///
430    /// // Multi-node production
431    /// let engine = EmbeddedEngine::start_with("cluster.toml").await?;
432    /// let leader = engine.wait_ready(Duration::from_secs(10)).await?;
433    /// println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
434    /// ```
435    pub async fn wait_ready(
436        &self,
437        timeout: std::time::Duration,
438    ) -> Result<crate::LeaderInfo> {
439        let mut rx = self.inner.leader_elected_rx.clone();
440
441        tokio::time::timeout(timeout, async {
442            // Check current value first (leader may already be elected)
443            if let Some(info) = rx.borrow().as_ref() {
444                info!(
445                    "Leader already elected: {} (term {})",
446                    info.leader_id, info.term
447                );
448                return Ok(*info);
449            }
450
451            loop {
452                // Wait for leader election event (event-driven, no polling)
453                let _ = rx.changed().await;
454
455                // Check if a leader is elected
456                if let Some(info) = rx.borrow().as_ref() {
457                    info!("Leader elected: {} (term {})", info.leader_id, info.term);
458                    return Ok(*info);
459                }
460            }
461        })
462        .await
463        .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
464    }
465
466    /// Subscribe to leader change notifications.
467    ///
468    /// Returns a receiver that will be notified whenever:
469    /// - First leader is elected
470    /// - Leader changes (re-election)
471    /// - No leader exists (during election)
472    ///
473    /// # Performance
474    /// Event-driven notification (no polling), <1ms latency
475    ///
476    /// # Example
477    /// ```ignore
478    /// let mut leader_rx = engine.leader_change_notifier();
479    /// tokio::spawn(async move {
480    ///     while leader_rx.changed().await.is_ok() {
481    ///         match leader_rx.borrow().as_ref() {
482    ///             Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
483    ///             None => println!("No leader"),
484    ///         }
485    ///     }
486    /// });
487    /// ```
488    pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
489        self.inner.leader_elected_rx.clone()
490    }
491
492    /// Subscribe to committed membership change notifications.
493    ///
494    /// Returns a `watch::Receiver` that fires whenever a `ConfChange` entry
495    /// (node join, removal, or learner promotion) is committed by a majority.
496    ///
497    /// All nodes — leader, follower, and learner — fire the notification because
498    /// every node walks the same `CommitHandler::apply_config_change()` path.
499    ///
500    /// The first `borrow()` returns the current membership state immediately
501    /// without waiting for a change.
502    ///
503    /// ## Distinguishing this from other notifiers
504    ///
505    /// - [`Self::leader_change_notifier`]: fires on leader election changes, **not** membership changes
506    /// - [`Self::wait_ready`]: resolves when a leader is elected and the cluster is ready, **not** membership changes
507    ///
508    /// ## Usage
509    /// ```ignore
510    /// let mut rx = engine.watch_membership();
511    /// while rx.changed().await.is_ok() {
512    ///     let snapshot = rx.borrow_and_update().clone();
513    ///     // snapshot.members — current voters
514    ///     // snapshot.learners — current non-voting learners
515    ///     // snapshot.committed_index — idempotency key
516    ///     scheduler.on_membership_changed(snapshot).await;
517    /// }
518    /// ```
519    pub fn watch_membership(&self) -> watch::Receiver<crate::membership::MembershipSnapshot> {
520        self.inner.membership_rx.clone()
521    }
522
523    /// Returns true if the current node is the Raft leader.
524    ///
525    /// # Use Cases
526    /// - Load balancer health checks (e.g. HAProxy `/primary` endpoint returning HTTP 200/503)
527    /// - Prevent write requests to followers before they fail
528    /// - Application-level request routing decisions
529    ///
530    /// # Performance
531    /// Zero-cost operation (reads cached leader state from watch channel)
532    ///
533    /// # Example
534    /// ```ignore
535    /// // Load balancer health check endpoint
536    /// #[get("/primary")]
537    /// async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
538    ///     if engine.is_leader() {
539    ///         StatusCode::OK  // Routes writes here
540    ///     } else {
541    ///         StatusCode::SERVICE_UNAVAILABLE
542    ///     }
543    /// }
544    ///
545    /// // Application request handler
546    /// if engine.is_leader() {
547    ///     client.put(key, value).await?;
548    /// } else {
549    ///     return Err("Not leader, write rejected");
550    /// }
551    /// ```
552    pub fn is_leader(&self) -> bool {
553        self.inner
554            .leader_elected_rx
555            .borrow()
556            .as_ref()
557            .map(|info| info.leader_id == self.inner.node_id)
558            .unwrap_or(false)
559    }
560
561    /// Returns current leader information if available.
562    ///
563    /// # Returns
564    /// - `Some(LeaderInfo)` if a leader is elected (includes leader_id and term)
565    /// - `None` if no leader exists (during election or network partition)
566    ///
567    /// # Use Cases
568    /// - Monitoring dashboards showing cluster state
569    /// - Debugging leader election issues
570    /// - Logging cluster topology changes
571    ///
572    /// # Example
573    /// ```ignore
574    /// if let Some(info) = engine.leader_info() {
575    ///     println!("Leader: {} (term {})", info.leader_id, info.term);
576    /// } else {
577    ///     println!("No leader elected, cluster unavailable");
578    /// }
579    /// ```
580    pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
581        *self.inner.leader_elected_rx.borrow()
582    }
583
584    /// Get a reference to the local KV client.
585    ///
586    /// The client is available immediately after `start()`,
587    /// but requests will only succeed after `wait_ready()` completes.
588    ///
589    /// # Example
590    /// ```ignore
591    /// let engine = EmbeddedEngine::start("./data/my-app").await?;
592    /// engine.wait_ready(Duration::from_secs(5)).await?;
593    /// let client = engine.client();
594    /// client.put(b"key", b"value").await?;
595    /// ```
596    pub fn client(&self) -> Arc<EmbeddedClient> {
597        Arc::clone(&self.inner.client)
598    }
599
600    /// Stop the embedded d-engine gracefully (idempotent).
601    ///
602    /// This method:
603    /// 1. Sends shutdown signal to node
604    /// 2. Waits for node.run() to complete
605    /// 3. Propagates any errors from node execution
606    ///
607    /// Safe to call multiple times - subsequent calls are no-ops.
608    ///
609    /// # Errors
610    /// Returns error if node encountered issues during shutdown.
611    ///
612    /// # Example
613    /// ```ignore
614    /// engine.stop().await?;
615    /// engine.stop().await?;  // No-op, returns Ok(())
616    /// ```
617    pub async fn stop(&self) -> Result<()> {
618        let mut is_stopped = self.inner.is_stopped.lock().await;
619        if *is_stopped {
620            return Ok(());
621        }
622
623        info!("Stopping embedded d-engine");
624
625        // Send shutdown signal
626        let _ = self.inner.shutdown_tx.send(());
627
628        // Wait for node task to complete
629        let mut handle_guard = self.inner.node_handle.lock().await;
630        if let Some(handle) = handle_guard.take() {
631            match handle.await {
632                Ok(result) => {
633                    info!("Embedded d-engine stopped successfully");
634                    *is_stopped = true;
635                    result
636                }
637                Err(e) => {
638                    error!("Node task panicked: {:?}", e);
639                    *is_stopped = true;
640                    Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
641                }
642            }
643        } else {
644            *is_stopped = true;
645            Ok(())
646        }
647    }
648
649    /// Check if the engine has been stopped.
650    ///
651    /// # Example
652    /// ```ignore
653    /// if engine.is_stopped() {
654    ///     println!("Engine is stopped");
655    /// }
656    /// ```
657    pub async fn is_stopped(&self) -> bool {
658        *self.inner.is_stopped.lock().await
659    }
660
661    /// Returns the unique identifier for this Raft node.
662    pub fn node_id(&self) -> u32 {
663        self.inner.node_id
664    }
665}
666
667impl Drop for EmbeddedEngine {
668    fn drop(&mut self) {
669        // Warn if stop() was not called
670        if let Ok(handle) = self.inner.node_handle.try_lock()
671            && let Some(h) = &*handle
672            && !h.is_finished()
673        {
674            error!("EmbeddedEngine dropped without calling stop() - background task may leak");
675        }
676    }
677}