Skip to main content

vibe_ready/api/
engine.rs

1use crate::api::capabilities::VibeCapabilities;
2use crate::api::engine_config::VibeEngineConfig;
3use crate::api::engine_context::VibeEngineContext;
4use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
5use crate::api::engine_executor::{VibeEngineExecutor, VibeEngineTask, VibeRuntimeHandle};
6use crate::api::scheduler::{
7    VibeCancellationToken, VibeTaskHandle, VibeTaskPanel, VibeTaskPriority, VibeTaskScheduler,
8};
9use crate::log::log_def::{LogListener, DESC};
10use crate::log::log_level::LogLevel;
11use crate::store::kv_store::VibeKvStore;
12use crate::{log_e, log_t, platform};
13use std::future::Future;
14use std::sync::atomic::{AtomicU8, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17use threadpool::ThreadPool;
18use tokio::runtime::Handle;
19use tokio::sync::mpsc::channel;
20
21const DEFAULT_DESTROY_TIMEOUT: Duration = Duration::from_secs(5);
22
23#[repr(u8)]
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25/// Lifecycle state of a [`VibeEngine`].
26pub enum VibeEngineState {
27    /// The engine value has been constructed but is not accepting work yet.
28    Created = 0,
29    /// The engine is ready to accept tasks and storage operations.
30    Running = 1,
31    /// The engine is shutting down resources and no longer accepts new work.
32    Closing = 2,
33    /// The engine has released its runtime-owned resources.
34    Closed = 3,
35}
36
37impl VibeEngineState {
38    fn from_u8(value: u8) -> Self {
39        match value {
40            0 => Self::Created,
41            1 => Self::Running,
42            2 => Self::Closing,
43            3 => Self::Closed,
44            _ => Self::Closed,
45        }
46    }
47}
48
49/// Main runtime facade for task execution, logging, and SDK context access.
50pub struct VibeEngine {
51    executor: VibeEngineExecutor,
52    /// Shared engine context for advanced integrations that need low-level clients.
53    pub ctx: Arc<VibeEngineContext>,
54    state: Arc<AtomicU8>,
55    destroy_lock: Arc<Mutex<()>>,
56    scheduler: Arc<VibeTaskScheduler>,
57    #[cfg(feature = "net-http")]
58    http: Arc<std::sync::OnceLock<crate::net::VibeHttpClient>>,
59}
60
61impl VibeEngine {
62    /// Returns compile-time capabilities enabled for this crate build.
63    ///
64    /// # Returns
65    ///
66    /// A [`VibeCapabilities`] snapshot describing enabled storage, logging,
67    /// and platform capabilities.
68    ///
69    /// # Examples
70    ///
71    /// ```no_run
72    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
73    ///
74    /// # fn demo() -> VibeResult<()> {
75    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
76    /// let capabilities = engine.capabilities();
77    /// assert_eq!(capabilities.log_store, cfg!(feature = "log-diesel"));
78    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
79    /// # Ok(())
80    /// # }
81    /// ```
82    pub fn capabilities(&self) -> VibeCapabilities {
83        VibeCapabilities::current()
84    }
85
86    /// Returns the current engine lifecycle state.
87    ///
88    /// # Returns
89    ///
90    /// A [`VibeEngineState`] value such as [`VibeEngineState::Running`] or
91    /// [`VibeEngineState::Closed`].
92    ///
93    /// # Examples
94    ///
95    /// ```no_run
96    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeEngineState, VibeResult};
97    ///
98    /// # fn demo() -> VibeResult<()> {
99    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
100    /// assert_eq!(engine.state(), VibeEngineState::Running);
101    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
102    /// # Ok(())
103    /// # }
104    /// ```
105    pub fn state(&self) -> VibeEngineState {
106        VibeEngineState::from_u8(self.state.load(Ordering::SeqCst))
107    }
108
109    /// Clones the engine executor for advanced task and callback integrations.
110    ///
111    /// # Returns
112    ///
113    /// A cheap clone of [`VibeEngineExecutor`] sharing the engine runtime.
114    ///
115    /// # Examples
116    ///
117    /// ```no_run
118    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
119    ///
120    /// # fn demo() -> VibeResult<()> {
121    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
122    /// let executor = engine.executor();
123    /// executor.post(async {})?;
124    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
125    /// # Ok(())
126    /// # }
127    /// ```
128    pub fn executor(&self) -> VibeEngineExecutor {
129        self.executor.clone()
130    }
131
132    /// Creates a high-level key-value store facade bound to this engine.
133    ///
134    /// # Returns
135    ///
136    /// A [`VibeKvStore`] that performs blocking-friendly operations through
137    /// the engine executor.
138    ///
139    /// # Examples
140    ///
141    /// ```no_run
142    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
143    ///
144    /// # fn demo() -> VibeResult<()> {
145    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
146    /// let store = engine.store();
147    /// store.set_str("theme", "dark")?;
148    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
149    /// # Ok(())
150    /// # }
151    /// ```
152    pub fn store(&self) -> VibeKvStore {
153        VibeKvStore::new(self.ctx.db_client().clone(), self.executor.clone())
154    }
155
156    /// Returns a shared HTTP client bound to this engine, building it on first use.
157    ///
158    /// The client is created once with default configuration and cached; later
159    /// calls return cheap clones that share the same connection pool. Requires
160    /// the `net-http` feature.
161    ///
162    /// # Returns
163    ///
164    /// `Ok(VibeHttpClient)` on success, or [`VibeEngineError`] if the client
165    /// could not be constructed.
166    ///
167    /// # Examples
168    ///
169    /// ```no_run
170    /// # #[cfg(feature = "net-http")]
171    /// # async fn demo(engine: &vibe_ready::VibeEngine) -> vibe_ready::VibeResult<()> {
172    /// let client = engine.http()?;
173    /// let response = client.get("https://example.com").await?;
174    /// assert!(response.status() > 0);
175    /// # Ok(())
176    /// # }
177    /// ```
178    #[cfg(feature = "net-http")]
179    pub fn http(&self) -> Result<crate::net::VibeHttpClient, VibeEngineError> {
180        if let Some(client) = self.http.get() {
181            return Ok(client.clone());
182        }
183        let client = crate::net::VibeHttpClient::new()?;
184        let _ = self.http.set(client.clone());
185        Ok(self.http.get().cloned().unwrap_or(client))
186    }
187
188    /// Runs a future on the engine runtime and waits for its result.
189    ///
190    /// Use this for short async operations where the caller needs the return
191    /// value synchronously.
192    ///
193    /// # Returns
194    ///
195    /// `Ok(F)` with the future output, or [`VibeEngineError`] if the engine is
196    /// not running or the task cannot be delivered.
197    ///
198    /// # Examples
199    ///
200    /// ```no_run
201    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
202    ///
203    /// # fn demo() -> VibeResult<()> {
204    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
205    /// let answer = engine.invoke(async { 42 })?;
206    /// assert_eq!(answer, 42);
207    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
208    /// # Ok(())
209    /// # }
210    /// ```
211    pub fn invoke<T, F>(&self, future: T) -> Result<F, VibeEngineError>
212    where
213        T: Future<Output = F> + Send + 'static,
214        F: Send + 'static,
215    {
216        if self.state() != VibeEngineState::Running {
217            return Err(VibeEngineError::from_error_code(
218                VibeEngineErrorCode::PostError,
219            ));
220        }
221        self.executor.invoke(future)
222    }
223
224    /// Posts a fire-and-forget future to the engine runtime.
225    ///
226    /// The method logs failures instead of returning them, making it suitable
227    /// for background work where the caller does not need a result.
228    ///
229    /// # Returns
230    ///
231    /// This method returns `()`; delivery errors are written to the SDK log.
232    ///
233    /// # Examples
234    ///
235    /// ```no_run
236    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
237    ///
238    /// # fn demo() -> VibeResult<()> {
239    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
240    /// engine.post(async {
241    ///     // perform background work here
242    /// });
243    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
244    /// # Ok(())
245    /// # }
246    /// ```
247    pub fn post<T>(&self, future: T)
248    where
249        T: Future<Output = ()> + Send + 'static,
250    {
251        if self.state() != VibeEngineState::Running {
252            log_e!("post", DESC, "engine is not running");
253            return;
254        }
255        if let Err(error) = self.executor.post(future) {
256            log_e!("post", DESC, format!("executor post error: {}", error));
257        }
258    }
259
260    /// Wraps a one-argument callback so it runs on the engine callback pool.
261    ///
262    /// # Returns
263    ///
264    /// A `FnOnce(R)` wrapper that schedules `cb` on the callback thread pool.
265    ///
266    /// # Examples
267    ///
268    /// ```no_run
269    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
270    ///
271    /// # fn demo() -> VibeResult<()> {
272    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
273    /// let callback = engine.cb_pool_once(|value: i32| assert_eq!(value, 7));
274    /// callback(7);
275    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
276    /// # Ok(())
277    /// # }
278    /// ```
279    pub fn cb_pool_once<F, R>(&self, cb: F) -> impl FnOnce(R)
280    where
281        F: FnOnce(R) + Send + 'static,
282        R: Send + 'static,
283    {
284        self.executor.callback().once(cb)
285    }
286
287    /// Wraps a two-argument callback so it runs on the engine callback pool.
288    ///
289    /// # Returns
290    ///
291    /// A `FnOnce(R1, R2)` wrapper that schedules `cb` on the callback thread pool.
292    ///
293    /// # Examples
294    ///
295    /// ```no_run
296    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
297    ///
298    /// # fn demo() -> VibeResult<()> {
299    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
300    /// let callback = engine.cb_pool_once2(|left: i32, right: i32| assert_eq!(left + right, 3));
301    /// callback(1, 2);
302    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
303    /// # Ok(())
304    /// # }
305    /// ```
306    pub fn cb_pool_once2<F, R1, R2>(&self, cb: F) -> impl FnOnce(R1, R2)
307    where
308        F: FnOnce(R1, R2) + Send + 'static,
309        R1: Send + 'static,
310        R2: Send + 'static,
311    {
312        self.executor.callback().once2(cb)
313    }
314}
315
316impl VibeEngine {
317    /// Posts a future to a dedicated priority lane.
318    ///
319    /// Tasks submitted to a higher priority lane run before lower-priority
320    /// tasks queued at the same time. Returns a [`VibeTaskHandle`] that can
321    /// be cancelled or `await`ed.
322    pub fn post_with_priority<F>(
323        &self,
324        name: impl Into<String>,
325        priority: VibeTaskPriority,
326        future: F,
327    ) -> Result<VibeTaskHandle, VibeEngineError>
328    where
329        F: std::future::Future<Output = ()> + Send + 'static,
330    {
331        if self.state() != VibeEngineState::Running {
332            return Err(VibeEngineError::from_error_code(
333                VibeEngineErrorCode::PostError,
334            ));
335        }
336        self.scheduler.post_with_priority(name, priority, future)
337    }
338
339    /// Schedule a one-shot task to run after `delay`.
340    ///
341    /// The builder receives a [`VibeCancellationToken`] so the user task can
342    /// abort cooperatively when the handle is cancelled.
343    pub fn schedule_after<F, Fut>(
344        &self,
345        name: impl Into<String>,
346        delay: Duration,
347        builder: F,
348    ) -> Result<VibeTaskHandle, VibeEngineError>
349    where
350        F: FnOnce(VibeCancellationToken) -> Fut + Send + 'static,
351        Fut: std::future::Future<Output = ()> + Send + 'static,
352    {
353        if self.state() != VibeEngineState::Running {
354            return Err(VibeEngineError::from_error_code(
355                VibeEngineErrorCode::PostError,
356            ));
357        }
358        self.scheduler.schedule_after(name, delay, builder)
359    }
360
361    /// Schedule a periodic task. The builder is invoked once every `period`
362    /// until the returned handle is cancelled or the engine is destroyed.
363    pub fn schedule_every<F, Fut>(
364        &self,
365        name: impl Into<String>,
366        period: Duration,
367        builder: F,
368    ) -> Result<VibeTaskHandle, VibeEngineError>
369    where
370        F: FnMut(VibeCancellationToken) -> Fut + Send + 'static,
371        Fut: std::future::Future<Output = ()> + Send + 'static,
372    {
373        if self.state() != VibeEngineState::Running {
374            return Err(VibeEngineError::from_error_code(
375                VibeEngineErrorCode::PostError,
376            ));
377        }
378        self.scheduler.schedule_every(name, period, builder)
379    }
380
381    /// Diagnostic panel exposing live snapshots of scheduler-tracked tasks.
382    pub fn tasks(&self) -> VibeTaskPanel {
383        self.scheduler.panel()
384    }
385}
386
387impl VibeEngine {
388    /// Creates an engine with a Tokio runtime owned by vibe-ready.
389    ///
390    /// # Returns
391    ///
392    /// `Ok(VibeEngine)` when configuration is valid and storage/logging
393    /// backends open successfully, otherwise [`VibeEngineError`].
394    ///
395    /// # Examples
396    ///
397    /// ```no_run
398    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
399    ///
400    /// # fn demo() -> VibeResult<()> {
401    /// let engine = VibeEngine::create(VibeEngineConfig::builder().app_name("demo").build())?;
402    /// engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
403    /// # Ok(())
404    /// # }
405    /// ```
406    pub fn create(config: VibeEngineConfig) -> Result<Self, VibeEngineError> {
407        config.validate()?;
408        let runtime_config = config.runtime_config().clone();
409        let runtime = Arc::new(
410            tokio::runtime::Builder::new_multi_thread()
411                .worker_threads(runtime_config.worker_threads)
412                .enable_all()
413                .build()
414                .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?,
415        );
416        let handle = runtime.handle().clone();
417
418        Self::create_with_runtime(config, VibeRuntimeHandle::owned(runtime), handle)
419    }
420
421    /// Creates an engine using a Tokio runtime owned by the host application.
422    ///
423    /// The host runtime must stay alive for the lifetime of the engine. Destroying
424    /// the engine closes vibe-ready resources, but does not shut down this runtime.
425    ///
426    /// # Returns
427    ///
428    /// `Ok(VibeEngine)` bound to `runtime_handle`, or [`VibeEngineError`] if
429    /// validation or backend initialization fails.
430    ///
431    /// # Examples
432    ///
433    /// ```no_run
434    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
435    ///
436    /// # fn demo() -> VibeResult<()> {
437    /// let runtime = tokio::runtime::Runtime::new().expect("create runtime");
438    /// let engine = VibeEngine::create_with_runtime_handle(
439    ///     VibeEngineConfig::builder().app_name("hosted").build(),
440    ///     runtime.handle().clone(),
441    /// )?;
442    /// engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
443    /// # Ok(())
444    /// # }
445    /// ```
446    pub fn create_with_runtime_handle(
447        config: VibeEngineConfig,
448        runtime_handle: Handle,
449    ) -> Result<Self, VibeEngineError> {
450        config.validate()?;
451
452        Self::create_with_runtime(
453            config,
454            VibeRuntimeHandle::external(runtime_handle.clone()),
455            runtime_handle,
456        )
457    }
458
459    fn create_with_runtime(
460        config: VibeEngineConfig,
461        runtime: VibeRuntimeHandle,
462        runtime_handle: Handle,
463    ) -> Result<Self, VibeEngineError> {
464        let runtime_config = config.runtime_config().clone();
465        let (async_tx, mut async_rx) =
466            channel::<VibeEngineTask>(runtime_config.async_queue_capacity);
467        let (sync_tx, mut sync_rx) = channel::<VibeEngineTask>(runtime_config.sync_queue_capacity);
468        let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();
469
470        runtime_handle.spawn(async move {
471            let sync_handle = tokio::spawn(async move {
472                while let Some(future) = sync_rx.recv().await {
473                    future.await;
474                }
475            });
476
477            let async_handle = tokio::spawn(async move {
478                while let Some(future) = async_rx.recv().await {
479                    future.await;
480                }
481            });
482
483            let (sync_ret, async_ret) = tokio::join!(sync_handle, async_handle);
484            if let Err(e) = sync_ret {
485                log_e!("create", DESC, format!("sync queue worker failed: {}", e));
486            }
487            if let Err(e) = async_ret {
488                log_e!("create", DESC, format!("async queue worker failed: {}", e));
489            }
490            let _ = shutdown_tx.send(());
491        });
492
493        let ctx = VibeEngineContext::new(config)?;
494        let ctx_arc = Arc::new(ctx);
495
496        let scheduler = VibeTaskScheduler::new(
497            runtime_handle.clone(),
498            runtime_config.priority_queue_capacity,
499        );
500
501        Ok(Self {
502            executor: VibeEngineExecutor::new(
503                ThreadPool::new(runtime_config.callback_threads),
504                async_tx,
505                sync_tx,
506                runtime,
507                shutdown_rx,
508            ),
509            ctx: ctx_arc,
510            state: Arc::new(AtomicU8::new(VibeEngineState::Running as u8)),
511            destroy_lock: Arc::new(Mutex::new(())),
512            scheduler,
513            #[cfg(feature = "net-http")]
514            http: Arc::new(std::sync::OnceLock::new()),
515        })
516    }
517
518    /// Destroys the engine and waits up to `timeout` for resources to close.
519    ///
520    /// # Returns
521    ///
522    /// `Ok(())` when shutdown finishes or the engine is already closed;
523    /// [`VibeEngineError`] on timeout, runtime, or backend close failures.
524    ///
525    /// # Examples
526    ///
527    /// ```no_run
528    /// use std::time::Duration;
529    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
530    ///
531    /// # fn demo() -> VibeResult<()> {
532    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
533    /// engine.destroy_with_timeout(Duration::from_secs(2))?;
534    /// # Ok(())
535    /// # }
536    /// ```
537    pub fn destroy_with_timeout(&self, timeout: Duration) -> Result<(), VibeEngineError> {
538        let _guard = self
539            .destroy_lock
540            .lock()
541            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
542
543        if self.state() == VibeEngineState::Closed {
544            return Ok(());
545        }
546
547        self.state
548            .store(VibeEngineState::Closing as u8, Ordering::SeqCst);
549        let deadline = Instant::now()
550            .checked_add(timeout)
551            .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError))?;
552
553        // Cancel and drop the scheduler's priority lanes first so periodic
554        // tasks observe their cancellation tokens before we wait on the
555        // executor's queues. This satisfies the B9 acceptance criterion that
556        // periodic tasks are cancelled cleanly during destroy.
557        self.scheduler.shutdown();
558
559        self.executor
560            .shutdown_queues(Self::remaining_timeout(deadline)?)?;
561        let ctx = Arc::clone(&self.ctx);
562        self.executor.block_on_with_timeout(
563            async move { ctx.close().await },
564            Self::remaining_timeout(deadline)?,
565        )?;
566
567        self.state
568            .store(VibeEngineState::Closed as u8, Ordering::SeqCst);
569        Ok(())
570    }
571
572    fn remaining_timeout(deadline: Instant) -> Result<Duration, VibeEngineError> {
573        deadline
574            .checked_duration_since(Instant::now())
575            .filter(|remaining| !remaining.is_zero())
576            .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError))
577    }
578
579    /// Destroys the engine using the default timeout and reports through a callback.
580    ///
581    /// # Returns
582    ///
583    /// This method returns `()` immediately after invoking `cb` on the callback
584    /// pool with `Result<(), VibeEngineError>`.
585    ///
586    /// # Examples
587    ///
588    /// ```no_run
589    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
590    ///
591    /// # fn demo() -> VibeResult<()> {
592    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
593    /// engine.destroy(|result| {
594    ///     let _ = result;
595    /// });
596    /// # Ok(())
597    /// # }
598    /// ```
599    pub fn destroy<CB>(&self, cb: CB)
600    where
601        CB: FnOnce(Result<(), VibeEngineError>) + Send + 'static,
602    {
603        let method_name = "destroy";
604        log_t!(method_name);
605        let cb = self.cb_pool_once(cb);
606        let result = self.destroy_with_timeout(DEFAULT_DESTROY_TIMEOUT);
607        cb(result);
608    }
609}
610
611impl VibeEngine {
612    /// Inserts a log record into the configured log backend.
613    ///
614    /// # Returns
615    ///
616    /// This method returns `()`; backend write failures are handled by the log
617    /// subsystem.
618    ///
619    /// # Examples
620    ///
621    /// ```no_run
622    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeLogLevel, VibeResult};
623    ///
624    /// # fn demo() -> VibeResult<()> {
625    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
626    /// engine.insert_log(true, VibeLogLevel::Info, "startup".into(), "ready".into());
627    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
628    /// # Ok(())
629    /// # }
630    /// ```
631    pub fn insert_log(
632        &self,
633        should_output_log: bool,
634        level: LogLevel,
635        tag: String,
636        content: String,
637    ) {
638        let create_time = platform::now();
639        let ctx = self.ctx.clone();
640        ctx.log_db_client()
641            .insert_log(should_output_log, level as i32, tag, content, create_time);
642    }
643}
644
645impl VibeEngine {
646    /// Sets or clears the listener that receives emitted log entries.
647    ///
648    /// # Returns
649    ///
650    /// This method returns `()` and schedules listener installation on the
651    /// engine runtime.
652    ///
653    /// # Examples
654    ///
655    /// ```no_run
656    /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
657    ///
658    /// # fn demo() -> VibeResult<()> {
659    /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
660    /// engine.set_log_listener(Some(Box::new(|info| {
661    ///     let _ = info;
662    /// })));
663    /// engine.set_log_listener(None);
664    /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
665    /// # Ok(())
666    /// # }
667    /// ```
668    pub fn set_log_listener(&self, listener: Option<LogListener>) {
669        let ctx = self.ctx.clone();
670        self.post(async move {
671            ctx.log_db_client().set_log_listener(listener);
672        });
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679    use crate::api::engine_config::{VibeLogBackend, VibeStoreBackend};
680    use crate::api::platform_type::VibePlatformType;
681
682    #[test]
683    fn destroy_is_idempotent_and_closes_engine() -> Result<(), VibeEngineError> {
684        let store_path = std::env::temp_dir().join(format!(
685            "vibe-ready-engine-lifecycle-{}",
686            crate::platform::now()
687        ));
688        let config = VibeEngineConfig::builder()
689            .platform(VibePlatformType::MacOS)
690            .app_name("lifecycle-test")
691            .namespace("tests")
692            .runtime_worker_threads(1)
693            .callback_threads(1)
694            .queue_capacity(8, 4)
695            .store_root_path(store_path)
696            .build();
697
698        let engine = VibeEngine::create(config)?;
699        assert_eq!(engine.capabilities(), VibeCapabilities::current());
700        assert_eq!(engine.state(), VibeEngineState::Running);
701
702        engine.destroy_with_timeout(Duration::from_secs(2))?;
703        assert_eq!(engine.state(), VibeEngineState::Closed);
704
705        engine.destroy_with_timeout(Duration::from_secs(2))?;
706        assert_eq!(engine.state(), VibeEngineState::Closed);
707        Ok(())
708    }
709
710    #[test]
711    fn create_with_runtime_handle_uses_host_runtime() -> Result<(), VibeEngineError> {
712        let runtime = tokio::runtime::Builder::new_multi_thread()
713            .worker_threads(2)
714            .enable_all()
715            .build()
716            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
717        let store_path = std::env::temp_dir().join(format!(
718            "vibe-ready-engine-external-runtime-{}",
719            crate::platform::now()
720        ));
721        let config = VibeEngineConfig::builder()
722            .platform(VibePlatformType::MacOS)
723            .app_name("external-runtime-test")
724            .namespace("tests")
725            .log_backend(VibeLogBackend::Noop)
726            .store_backend(VibeStoreBackend::Noop)
727            .callback_threads(1)
728            .queue_capacity(8, 4)
729            .store_root_path(store_path)
730            .build();
731
732        let engine = VibeEngine::create_with_runtime_handle(config, runtime.handle().clone())?;
733        assert_eq!(engine.invoke(async { 42 })?, 42);
734
735        let (tx, rx) = std::sync::mpsc::channel();
736        engine.post(async move {
737            let _ = tx.send(7);
738        });
739        let received = rx.recv_timeout(Duration::from_secs(2)).map_err(|err| {
740            VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError)
741                .with_source(err.to_string())
742        })?;
743        assert_eq!(received, 7);
744
745        engine.destroy_with_timeout(Duration::from_secs(2))?;
746        assert_eq!(runtime.block_on(async { 9 }), 9);
747        Ok(())
748    }
749
750    fn build_scheduler_config(suffix: &str) -> VibeEngineConfig {
751        let store_path = std::env::temp_dir().join(format!(
752            "vibe-ready-scheduler-{}-{}",
753            suffix,
754            crate::platform::now()
755        ));
756        VibeEngineConfig::builder()
757            .platform(VibePlatformType::MacOS)
758            .app_name("scheduler-test")
759            .namespace("tests")
760            .log_backend(VibeLogBackend::Noop)
761            .store_backend(VibeStoreBackend::Noop)
762            .runtime_worker_threads(1)
763            .callback_threads(1)
764            .queue_capacity(16, 8)
765            .priority_queue_capacity(256)
766            .store_root_path(store_path)
767            .build()
768    }
769
770    /// Acceptance #1: 周期任务在 destroy 时被正确取消。
771    #[test]
772    fn periodic_task_is_cancelled_on_destroy() -> Result<(), VibeEngineError> {
773        use std::sync::atomic::{AtomicUsize, Ordering};
774        let engine = VibeEngine::create(build_scheduler_config("periodic-cancel"))?;
775        let counter = Arc::new(AtomicUsize::new(0));
776        let counter_clone = Arc::clone(&counter);
777        let handle =
778            engine.schedule_every("periodic.tick", Duration::from_millis(20), move |_token| {
779                let c = Arc::clone(&counter_clone);
780                async move {
781                    c.fetch_add(1, Ordering::SeqCst);
782                }
783            })?;
784        std::thread::sleep(Duration::from_millis(120));
785        let runs_before_destroy = counter.load(Ordering::SeqCst);
786        assert!(runs_before_destroy >= 2, "periodic should have ticked");
787
788        engine.destroy_with_timeout(Duration::from_secs(2))?;
789
790        // After destroy the handle must report a terminal state and the
791        // counter must stop growing.
792        assert!(handle.is_finished()?, "handle finished after destroy");
793        let after = counter.load(Ordering::SeqCst);
794        std::thread::sleep(Duration::from_millis(80));
795        assert_eq!(
796            after,
797            counter.load(Ordering::SeqCst),
798            "no further ticks after destroy"
799        );
800        Ok(())
801    }
802
803    /// Acceptance #2: 高优先级任务在拥塞时延迟显著低于普通任务。
804    #[test]
805    fn high_priority_task_runs_before_queued_normal_tasks() -> Result<(), VibeEngineError> {
806        use std::sync::atomic::{AtomicUsize, Ordering};
807        let engine = VibeEngine::create(build_scheduler_config("priority"))?;
808        let order = Arc::new(Mutex::new(Vec::<u32>::new()));
809        let next_idx = Arc::new(AtomicUsize::new(0));
810        // Saturate the normal lane with 30 long-ish tasks (sequential
811        // dispatcher → ~30 * 30ms = 900ms of work) so the high-priority task,
812        // posted shortly after, wins the next dispatch cycle.
813        for _ in 0..30 {
814            let order = Arc::clone(&order);
815            let next_idx = Arc::clone(&next_idx);
816            engine.post_with_priority("normal", VibeTaskPriority::Normal, async move {
817                tokio::time::sleep(Duration::from_millis(30)).await;
818                let idx = next_idx.fetch_add(1, Ordering::SeqCst) as u32;
819                if let Ok(mut order) = order.lock() {
820                    order.push(idx);
821                }
822            })?;
823        }
824
825        // Give the dispatcher a moment to begin the first normal task, then
826        // enqueue a high-priority marker.
827        std::thread::sleep(Duration::from_millis(40));
828        let high_marker = Arc::new(Mutex::new(None::<u32>));
829        let marker_clone = Arc::clone(&high_marker);
830        let next_idx_clone = Arc::clone(&next_idx);
831        engine.post_with_priority("high", VibeTaskPriority::High, async move {
832            let idx = next_idx_clone.fetch_add(1, Ordering::SeqCst) as u32;
833            if let Ok(mut marker) = marker_clone.lock() {
834                *marker = Some(idx);
835            }
836        })?;
837
838        // Wait long enough for the high-priority task to run but far less
839        // than the time required to drain all normal tasks (~900ms).
840        std::thread::sleep(Duration::from_millis(200));
841        let high_idx = high_marker
842            .lock()
843            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
844            .ok_or_else(|| {
845                VibeEngineError::from_error_code_msg(
846                    VibeEngineErrorCode::TimeoutError,
847                    "high task did not run".to_string(),
848                )
849            })?;
850        assert!(
851            (high_idx as usize) < 15,
852            "high-priority task ran at index {high_idx}, expected to overtake majority of normal tasks"
853        );
854
855        engine.destroy_with_timeout(Duration::from_secs(5))?;
856        Ok(())
857    }
858
859    /// Acceptance #3: 取消后的任务不再产生副作用且 join 返回 Cancelled。
860    #[test]
861    fn cancelled_task_join_returns_cancelled_error() -> Result<(), VibeEngineError> {
862        use std::sync::atomic::{AtomicBool, Ordering};
863        let engine = VibeEngine::create(build_scheduler_config("cancel"))?;
864        let ran = Arc::new(AtomicBool::new(false));
865        let ran_clone = Arc::clone(&ran);
866        let handle = engine.schedule_after(
867            "delayed",
868            Duration::from_millis(200),
869            move |token| async move {
870                // Should never fire its side-effect because the cancellation
871                // is requested before the delay elapses; but if it does start,
872                // it bails out immediately on the token.
873                if token.is_cancelled() {
874                    return;
875                }
876                ran_clone.store(true, Ordering::SeqCst);
877            },
878        )?;
879
880        // Cancel before the delay elapses.
881        std::thread::sleep(Duration::from_millis(40));
882        handle.cancel();
883
884        // Join via a host runtime since we are on the test thread.
885        let join_runtime = tokio::runtime::Builder::new_current_thread()
886            .enable_all()
887            .build()
888            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
889        let join_handle = handle.clone();
890        let result = join_runtime.block_on(async move {
891            tokio::time::timeout(Duration::from_secs(2), join_handle.join()).await
892        });
893        let join_result = result.map_err(|_| {
894            VibeEngineError::from_error_code_msg(
895                VibeEngineErrorCode::TimeoutError,
896                "join did not time out".to_string(),
897            )
898        })?;
899        assert_eq!(
900            join_result.unwrap_err().code(),
901            VibeEngineErrorCode::Cancelled.code()
902        );
903        assert!(!ran.load(Ordering::SeqCst), "cancelled task did not run");
904
905        engine.destroy_with_timeout(Duration::from_secs(2))?;
906        Ok(())
907    }
908
909    /// Sanity: tasks() panel exposes scheduler activity.
910    #[test]
911    fn task_panel_lists_pending_tasks() -> Result<(), VibeEngineError> {
912        let engine = VibeEngine::create(build_scheduler_config("panel"))?;
913        let _h = engine.schedule_after(
914            "long-delay",
915            Duration::from_secs(30),
916            |_token| async move {},
917        )?;
918        let snapshot = engine.tasks().list()?;
919        assert_eq!(snapshot.len(), 1);
920        assert_eq!(snapshot[0].name, "long-delay");
921        engine.destroy_with_timeout(Duration::from_secs(2))?;
922        Ok(())
923    }
924}
925
926#[cfg(test)]
927mod strict_tests {
928    use super::*;
929    include!(concat!(
930        env!("CARGO_MANIFEST_DIR"),
931        "/test/unit/api/engine_tests.rs"
932    ));
933}