vibe_ready/api/engine.rs
1use crate::api::capabilities::VibeCapabilities;
2use crate::api::engine_config::VibeEngineConfig;
3use crate::api::engine_context::VibeEngineContext;
4use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
5use crate::api::engine_executor::{VibeEngineExecutor, VibeEngineTask, VibeRuntimeHandle};
6use crate::api::scheduler::{
7 VibeCancellationToken, VibeTaskHandle, VibeTaskPanel, VibeTaskPriority, VibeTaskScheduler,
8};
9use crate::log::log_def::{LogListener, DESC};
10use crate::log::log_level::LogLevel;
11use crate::store::kv_store::VibeKvStore;
12use crate::{log_e, log_t, platform};
13use std::future::Future;
14use std::sync::atomic::{AtomicU8, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17use threadpool::ThreadPool;
18use tokio::runtime::Handle;
19use tokio::sync::mpsc::channel;
20
21const DEFAULT_DESTROY_TIMEOUT: Duration = Duration::from_secs(5);
22
23#[repr(u8)]
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25/// Lifecycle state of a [`VibeEngine`].
26pub enum VibeEngineState {
27 /// The engine value has been constructed but is not accepting work yet.
28 Created = 0,
29 /// The engine is ready to accept tasks and storage operations.
30 Running = 1,
31 /// The engine is shutting down resources and no longer accepts new work.
32 Closing = 2,
33 /// The engine has released its runtime-owned resources.
34 Closed = 3,
35}
36
37impl VibeEngineState {
38 fn from_u8(value: u8) -> Self {
39 match value {
40 0 => Self::Created,
41 1 => Self::Running,
42 2 => Self::Closing,
43 3 => Self::Closed,
44 _ => Self::Closed,
45 }
46 }
47}
48
49/// Main runtime facade for task execution, logging, and SDK context access.
50pub struct VibeEngine {
51 executor: VibeEngineExecutor,
52 /// Shared engine context for advanced integrations that need low-level clients.
53 pub ctx: Arc<VibeEngineContext>,
54 state: Arc<AtomicU8>,
55 destroy_lock: Arc<Mutex<()>>,
56 scheduler: Arc<VibeTaskScheduler>,
57 #[cfg(feature = "net-http")]
58 http: Arc<std::sync::OnceLock<crate::net::VibeHttpClient>>,
59}
60
61impl VibeEngine {
62 /// Returns compile-time capabilities enabled for this crate build.
63 ///
64 /// # Returns
65 ///
66 /// A [`VibeCapabilities`] snapshot describing enabled storage, logging,
67 /// and platform capabilities.
68 ///
69 /// # Examples
70 ///
71 /// ```no_run
72 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
73 ///
74 /// # fn demo() -> VibeResult<()> {
75 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
76 /// let capabilities = engine.capabilities();
77 /// assert_eq!(capabilities.log_store, cfg!(feature = "log-diesel"));
78 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
79 /// # Ok(())
80 /// # }
81 /// ```
82 pub fn capabilities(&self) -> VibeCapabilities {
83 VibeCapabilities::current()
84 }
85
86 /// Returns the current engine lifecycle state.
87 ///
88 /// # Returns
89 ///
90 /// A [`VibeEngineState`] value such as [`VibeEngineState::Running`] or
91 /// [`VibeEngineState::Closed`].
92 ///
93 /// # Examples
94 ///
95 /// ```no_run
96 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeEngineState, VibeResult};
97 ///
98 /// # fn demo() -> VibeResult<()> {
99 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
100 /// assert_eq!(engine.state(), VibeEngineState::Running);
101 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
102 /// # Ok(())
103 /// # }
104 /// ```
105 pub fn state(&self) -> VibeEngineState {
106 VibeEngineState::from_u8(self.state.load(Ordering::SeqCst))
107 }
108
109 /// Clones the engine executor for advanced task and callback integrations.
110 ///
111 /// # Returns
112 ///
113 /// A cheap clone of [`VibeEngineExecutor`] sharing the engine runtime.
114 ///
115 /// # Examples
116 ///
117 /// ```no_run
118 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
119 ///
120 /// # fn demo() -> VibeResult<()> {
121 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
122 /// let executor = engine.executor();
123 /// executor.post(async {})?;
124 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
125 /// # Ok(())
126 /// # }
127 /// ```
128 pub fn executor(&self) -> VibeEngineExecutor {
129 self.executor.clone()
130 }
131
132 /// Creates a high-level key-value store facade bound to this engine.
133 ///
134 /// # Returns
135 ///
136 /// A [`VibeKvStore`] that performs blocking-friendly operations through
137 /// the engine executor.
138 ///
139 /// # Examples
140 ///
141 /// ```no_run
142 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
143 ///
144 /// # fn demo() -> VibeResult<()> {
145 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
146 /// let store = engine.store();
147 /// store.set_str("theme", "dark")?;
148 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
149 /// # Ok(())
150 /// # }
151 /// ```
152 pub fn store(&self) -> VibeKvStore {
153 VibeKvStore::new(self.ctx.db_client().clone(), self.executor.clone())
154 }
155
156 /// Returns a shared HTTP client bound to this engine, building it on first use.
157 ///
158 /// The client is created once with default configuration and cached; later
159 /// calls return cheap clones that share the same connection pool. Requires
160 /// the `net-http` feature.
161 ///
162 /// # Returns
163 ///
164 /// `Ok(VibeHttpClient)` on success, or [`VibeEngineError`] if the client
165 /// could not be constructed.
166 ///
167 /// # Examples
168 ///
169 /// ```no_run
170 /// # #[cfg(feature = "net-http")]
171 /// # async fn demo(engine: &vibe_ready::VibeEngine) -> vibe_ready::VibeResult<()> {
172 /// let client = engine.http()?;
173 /// let response = client.get("https://example.com").await?;
174 /// assert!(response.status() > 0);
175 /// # Ok(())
176 /// # }
177 /// ```
178 #[cfg(feature = "net-http")]
179 pub fn http(&self) -> Result<crate::net::VibeHttpClient, VibeEngineError> {
180 if let Some(client) = self.http.get() {
181 return Ok(client.clone());
182 }
183 let client = crate::net::VibeHttpClient::new()?;
184 let _ = self.http.set(client.clone());
185 Ok(self.http.get().cloned().unwrap_or(client))
186 }
187
188 /// Runs a future on the engine runtime and waits for its result.
189 ///
190 /// Use this for short async operations where the caller needs the return
191 /// value synchronously.
192 ///
193 /// # Returns
194 ///
195 /// `Ok(F)` with the future output, or [`VibeEngineError`] if the engine is
196 /// not running or the task cannot be delivered.
197 ///
198 /// # Examples
199 ///
200 /// ```no_run
201 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
202 ///
203 /// # fn demo() -> VibeResult<()> {
204 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
205 /// let answer = engine.invoke(async { 42 })?;
206 /// assert_eq!(answer, 42);
207 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
208 /// # Ok(())
209 /// # }
210 /// ```
211 pub fn invoke<T, F>(&self, future: T) -> Result<F, VibeEngineError>
212 where
213 T: Future<Output = F> + Send + 'static,
214 F: Send + 'static,
215 {
216 if self.state() != VibeEngineState::Running {
217 return Err(VibeEngineError::from_error_code(
218 VibeEngineErrorCode::PostError,
219 ));
220 }
221 self.executor.invoke(future)
222 }
223
224 /// Posts a fire-and-forget future to the engine runtime.
225 ///
226 /// The method logs failures instead of returning them, making it suitable
227 /// for background work where the caller does not need a result.
228 ///
229 /// # Returns
230 ///
231 /// This method returns `()`; delivery errors are written to the SDK log.
232 ///
233 /// # Examples
234 ///
235 /// ```no_run
236 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
237 ///
238 /// # fn demo() -> VibeResult<()> {
239 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
240 /// engine.post(async {
241 /// // perform background work here
242 /// });
243 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
244 /// # Ok(())
245 /// # }
246 /// ```
247 pub fn post<T>(&self, future: T)
248 where
249 T: Future<Output = ()> + Send + 'static,
250 {
251 if self.state() != VibeEngineState::Running {
252 log_e!("post", DESC, "engine is not running");
253 return;
254 }
255 if let Err(error) = self.executor.post(future) {
256 log_e!("post", DESC, format!("executor post error: {}", error));
257 }
258 }
259
260 /// Wraps a one-argument callback so it runs on the engine callback pool.
261 ///
262 /// # Returns
263 ///
264 /// A `FnOnce(R)` wrapper that schedules `cb` on the callback thread pool.
265 ///
266 /// # Examples
267 ///
268 /// ```no_run
269 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
270 ///
271 /// # fn demo() -> VibeResult<()> {
272 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
273 /// let callback = engine.cb_pool_once(|value: i32| assert_eq!(value, 7));
274 /// callback(7);
275 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
276 /// # Ok(())
277 /// # }
278 /// ```
279 pub fn cb_pool_once<F, R>(&self, cb: F) -> impl FnOnce(R)
280 where
281 F: FnOnce(R) + Send + 'static,
282 R: Send + 'static,
283 {
284 self.executor.callback().once(cb)
285 }
286
287 /// Wraps a two-argument callback so it runs on the engine callback pool.
288 ///
289 /// # Returns
290 ///
291 /// A `FnOnce(R1, R2)` wrapper that schedules `cb` on the callback thread pool.
292 ///
293 /// # Examples
294 ///
295 /// ```no_run
296 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
297 ///
298 /// # fn demo() -> VibeResult<()> {
299 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
300 /// let callback = engine.cb_pool_once2(|left: i32, right: i32| assert_eq!(left + right, 3));
301 /// callback(1, 2);
302 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub fn cb_pool_once2<F, R1, R2>(&self, cb: F) -> impl FnOnce(R1, R2)
307 where
308 F: FnOnce(R1, R2) + Send + 'static,
309 R1: Send + 'static,
310 R2: Send + 'static,
311 {
312 self.executor.callback().once2(cb)
313 }
314}
315
316impl VibeEngine {
317 /// Posts a future to a dedicated priority lane.
318 ///
319 /// Tasks submitted to a higher priority lane run before lower-priority
320 /// tasks queued at the same time. Returns a [`VibeTaskHandle`] that can
321 /// be cancelled or `await`ed.
322 pub fn post_with_priority<F>(
323 &self,
324 name: impl Into<String>,
325 priority: VibeTaskPriority,
326 future: F,
327 ) -> Result<VibeTaskHandle, VibeEngineError>
328 where
329 F: std::future::Future<Output = ()> + Send + 'static,
330 {
331 if self.state() != VibeEngineState::Running {
332 return Err(VibeEngineError::from_error_code(
333 VibeEngineErrorCode::PostError,
334 ));
335 }
336 self.scheduler.post_with_priority(name, priority, future)
337 }
338
339 /// Schedule a one-shot task to run after `delay`.
340 ///
341 /// The builder receives a [`VibeCancellationToken`] so the user task can
342 /// abort cooperatively when the handle is cancelled.
343 pub fn schedule_after<F, Fut>(
344 &self,
345 name: impl Into<String>,
346 delay: Duration,
347 builder: F,
348 ) -> Result<VibeTaskHandle, VibeEngineError>
349 where
350 F: FnOnce(VibeCancellationToken) -> Fut + Send + 'static,
351 Fut: std::future::Future<Output = ()> + Send + 'static,
352 {
353 if self.state() != VibeEngineState::Running {
354 return Err(VibeEngineError::from_error_code(
355 VibeEngineErrorCode::PostError,
356 ));
357 }
358 self.scheduler.schedule_after(name, delay, builder)
359 }
360
361 /// Schedule a periodic task. The builder is invoked once every `period`
362 /// until the returned handle is cancelled or the engine is destroyed.
363 pub fn schedule_every<F, Fut>(
364 &self,
365 name: impl Into<String>,
366 period: Duration,
367 builder: F,
368 ) -> Result<VibeTaskHandle, VibeEngineError>
369 where
370 F: FnMut(VibeCancellationToken) -> Fut + Send + 'static,
371 Fut: std::future::Future<Output = ()> + Send + 'static,
372 {
373 if self.state() != VibeEngineState::Running {
374 return Err(VibeEngineError::from_error_code(
375 VibeEngineErrorCode::PostError,
376 ));
377 }
378 self.scheduler.schedule_every(name, period, builder)
379 }
380
381 /// Diagnostic panel exposing live snapshots of scheduler-tracked tasks.
382 pub fn tasks(&self) -> VibeTaskPanel {
383 self.scheduler.panel()
384 }
385}
386
387impl VibeEngine {
388 /// Creates an engine with a Tokio runtime owned by vibe-ready.
389 ///
390 /// # Returns
391 ///
392 /// `Ok(VibeEngine)` when configuration is valid and storage/logging
393 /// backends open successfully, otherwise [`VibeEngineError`].
394 ///
395 /// # Examples
396 ///
397 /// ```no_run
398 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
399 ///
400 /// # fn demo() -> VibeResult<()> {
401 /// let engine = VibeEngine::create(VibeEngineConfig::builder().app_name("demo").build())?;
402 /// engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
403 /// # Ok(())
404 /// # }
405 /// ```
406 pub fn create(config: VibeEngineConfig) -> Result<Self, VibeEngineError> {
407 config.validate()?;
408 let runtime_config = config.runtime_config().clone();
409 let runtime = Arc::new(
410 tokio::runtime::Builder::new_multi_thread()
411 .worker_threads(runtime_config.worker_threads)
412 .enable_all()
413 .build()
414 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?,
415 );
416 let handle = runtime.handle().clone();
417
418 Self::create_with_runtime(config, VibeRuntimeHandle::owned(runtime), handle)
419 }
420
421 /// Creates an engine using a Tokio runtime owned by the host application.
422 ///
423 /// The host runtime must stay alive for the lifetime of the engine. Destroying
424 /// the engine closes vibe-ready resources, but does not shut down this runtime.
425 ///
426 /// # Returns
427 ///
428 /// `Ok(VibeEngine)` bound to `runtime_handle`, or [`VibeEngineError`] if
429 /// validation or backend initialization fails.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
435 ///
436 /// # fn demo() -> VibeResult<()> {
437 /// let runtime = tokio::runtime::Runtime::new().expect("create runtime");
438 /// let engine = VibeEngine::create_with_runtime_handle(
439 /// VibeEngineConfig::builder().app_name("hosted").build(),
440 /// runtime.handle().clone(),
441 /// )?;
442 /// engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
443 /// # Ok(())
444 /// # }
445 /// ```
446 pub fn create_with_runtime_handle(
447 config: VibeEngineConfig,
448 runtime_handle: Handle,
449 ) -> Result<Self, VibeEngineError> {
450 config.validate()?;
451
452 Self::create_with_runtime(
453 config,
454 VibeRuntimeHandle::external(runtime_handle.clone()),
455 runtime_handle,
456 )
457 }
458
459 fn create_with_runtime(
460 config: VibeEngineConfig,
461 runtime: VibeRuntimeHandle,
462 runtime_handle: Handle,
463 ) -> Result<Self, VibeEngineError> {
464 let runtime_config = config.runtime_config().clone();
465 let (async_tx, mut async_rx) =
466 channel::<VibeEngineTask>(runtime_config.async_queue_capacity);
467 let (sync_tx, mut sync_rx) = channel::<VibeEngineTask>(runtime_config.sync_queue_capacity);
468 let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();
469
470 runtime_handle.spawn(async move {
471 let sync_handle = tokio::spawn(async move {
472 while let Some(future) = sync_rx.recv().await {
473 future.await;
474 }
475 });
476
477 let async_handle = tokio::spawn(async move {
478 while let Some(future) = async_rx.recv().await {
479 future.await;
480 }
481 });
482
483 let (sync_ret, async_ret) = tokio::join!(sync_handle, async_handle);
484 if let Err(e) = sync_ret {
485 log_e!("create", DESC, format!("sync queue worker failed: {}", e));
486 }
487 if let Err(e) = async_ret {
488 log_e!("create", DESC, format!("async queue worker failed: {}", e));
489 }
490 let _ = shutdown_tx.send(());
491 });
492
493 let ctx = VibeEngineContext::new(config)?;
494 let ctx_arc = Arc::new(ctx);
495
496 let scheduler = VibeTaskScheduler::new(
497 runtime_handle.clone(),
498 runtime_config.priority_queue_capacity,
499 );
500
501 Ok(Self {
502 executor: VibeEngineExecutor::new(
503 ThreadPool::new(runtime_config.callback_threads),
504 async_tx,
505 sync_tx,
506 runtime,
507 shutdown_rx,
508 ),
509 ctx: ctx_arc,
510 state: Arc::new(AtomicU8::new(VibeEngineState::Running as u8)),
511 destroy_lock: Arc::new(Mutex::new(())),
512 scheduler,
513 #[cfg(feature = "net-http")]
514 http: Arc::new(std::sync::OnceLock::new()),
515 })
516 }
517
518 /// Destroys the engine and waits up to `timeout` for resources to close.
519 ///
520 /// # Returns
521 ///
522 /// `Ok(())` when shutdown finishes or the engine is already closed;
523 /// [`VibeEngineError`] on timeout, runtime, or backend close failures.
524 ///
525 /// # Examples
526 ///
527 /// ```no_run
528 /// use std::time::Duration;
529 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
530 ///
531 /// # fn demo() -> VibeResult<()> {
532 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
533 /// engine.destroy_with_timeout(Duration::from_secs(2))?;
534 /// # Ok(())
535 /// # }
536 /// ```
537 pub fn destroy_with_timeout(&self, timeout: Duration) -> Result<(), VibeEngineError> {
538 let _guard = self
539 .destroy_lock
540 .lock()
541 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
542
543 if self.state() == VibeEngineState::Closed {
544 return Ok(());
545 }
546
547 self.state
548 .store(VibeEngineState::Closing as u8, Ordering::SeqCst);
549 let deadline = Instant::now()
550 .checked_add(timeout)
551 .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError))?;
552
553 // Cancel and drop the scheduler's priority lanes first so periodic
554 // tasks observe their cancellation tokens before we wait on the
555 // executor's queues. This satisfies the B9 acceptance criterion that
556 // periodic tasks are cancelled cleanly during destroy.
557 self.scheduler.shutdown();
558
559 self.executor
560 .shutdown_queues(Self::remaining_timeout(deadline)?)?;
561 let ctx = Arc::clone(&self.ctx);
562 self.executor.block_on_with_timeout(
563 async move { ctx.close().await },
564 Self::remaining_timeout(deadline)?,
565 )?;
566
567 self.state
568 .store(VibeEngineState::Closed as u8, Ordering::SeqCst);
569 Ok(())
570 }
571
572 fn remaining_timeout(deadline: Instant) -> Result<Duration, VibeEngineError> {
573 deadline
574 .checked_duration_since(Instant::now())
575 .filter(|remaining| !remaining.is_zero())
576 .ok_or_else(|| VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError))
577 }
578
579 /// Destroys the engine using the default timeout and reports through a callback.
580 ///
581 /// # Returns
582 ///
583 /// This method returns `()` immediately after invoking `cb` on the callback
584 /// pool with `Result<(), VibeEngineError>`.
585 ///
586 /// # Examples
587 ///
588 /// ```no_run
589 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
590 ///
591 /// # fn demo() -> VibeResult<()> {
592 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
593 /// engine.destroy(|result| {
594 /// let _ = result;
595 /// });
596 /// # Ok(())
597 /// # }
598 /// ```
599 pub fn destroy<CB>(&self, cb: CB)
600 where
601 CB: FnOnce(Result<(), VibeEngineError>) + Send + 'static,
602 {
603 let method_name = "destroy";
604 log_t!(method_name);
605 let cb = self.cb_pool_once(cb);
606 let result = self.destroy_with_timeout(DEFAULT_DESTROY_TIMEOUT);
607 cb(result);
608 }
609}
610
611impl VibeEngine {
612 /// Inserts a log record into the configured log backend.
613 ///
614 /// # Returns
615 ///
616 /// This method returns `()`; backend write failures are handled by the log
617 /// subsystem.
618 ///
619 /// # Examples
620 ///
621 /// ```no_run
622 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeLogLevel, VibeResult};
623 ///
624 /// # fn demo() -> VibeResult<()> {
625 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
626 /// engine.insert_log(true, VibeLogLevel::Info, "startup".into(), "ready".into());
627 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
628 /// # Ok(())
629 /// # }
630 /// ```
631 pub fn insert_log(
632 &self,
633 should_output_log: bool,
634 level: LogLevel,
635 tag: String,
636 content: String,
637 ) {
638 let create_time = platform::now();
639 let ctx = self.ctx.clone();
640 ctx.log_db_client()
641 .insert_log(should_output_log, level as i32, tag, content, create_time);
642 }
643}
644
645impl VibeEngine {
646 /// Sets or clears the listener that receives emitted log entries.
647 ///
648 /// # Returns
649 ///
650 /// This method returns `()` and schedules listener installation on the
651 /// engine runtime.
652 ///
653 /// # Examples
654 ///
655 /// ```no_run
656 /// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
657 ///
658 /// # fn demo() -> VibeResult<()> {
659 /// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
660 /// engine.set_log_listener(Some(Box::new(|info| {
661 /// let _ = info;
662 /// })));
663 /// engine.set_log_listener(None);
664 /// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
665 /// # Ok(())
666 /// # }
667 /// ```
668 pub fn set_log_listener(&self, listener: Option<LogListener>) {
669 let ctx = self.ctx.clone();
670 self.post(async move {
671 ctx.log_db_client().set_log_listener(listener);
672 });
673 }
674}
675
676#[cfg(test)]
677mod tests {
678 use super::*;
679 use crate::api::engine_config::{VibeLogBackend, VibeStoreBackend};
680 use crate::api::platform_type::VibePlatformType;
681
682 #[test]
683 fn destroy_is_idempotent_and_closes_engine() -> Result<(), VibeEngineError> {
684 let store_path = std::env::temp_dir().join(format!(
685 "vibe-ready-engine-lifecycle-{}",
686 crate::platform::now()
687 ));
688 let config = VibeEngineConfig::builder()
689 .platform(VibePlatformType::MacOS)
690 .app_name("lifecycle-test")
691 .namespace("tests")
692 .runtime_worker_threads(1)
693 .callback_threads(1)
694 .queue_capacity(8, 4)
695 .store_root_path(store_path)
696 .build();
697
698 let engine = VibeEngine::create(config)?;
699 assert_eq!(engine.capabilities(), VibeCapabilities::current());
700 assert_eq!(engine.state(), VibeEngineState::Running);
701
702 engine.destroy_with_timeout(Duration::from_secs(2))?;
703 assert_eq!(engine.state(), VibeEngineState::Closed);
704
705 engine.destroy_with_timeout(Duration::from_secs(2))?;
706 assert_eq!(engine.state(), VibeEngineState::Closed);
707 Ok(())
708 }
709
710 #[test]
711 fn create_with_runtime_handle_uses_host_runtime() -> Result<(), VibeEngineError> {
712 let runtime = tokio::runtime::Builder::new_multi_thread()
713 .worker_threads(2)
714 .enable_all()
715 .build()
716 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
717 let store_path = std::env::temp_dir().join(format!(
718 "vibe-ready-engine-external-runtime-{}",
719 crate::platform::now()
720 ));
721 let config = VibeEngineConfig::builder()
722 .platform(VibePlatformType::MacOS)
723 .app_name("external-runtime-test")
724 .namespace("tests")
725 .log_backend(VibeLogBackend::Noop)
726 .store_backend(VibeStoreBackend::Noop)
727 .callback_threads(1)
728 .queue_capacity(8, 4)
729 .store_root_path(store_path)
730 .build();
731
732 let engine = VibeEngine::create_with_runtime_handle(config, runtime.handle().clone())?;
733 assert_eq!(engine.invoke(async { 42 })?, 42);
734
735 let (tx, rx) = std::sync::mpsc::channel();
736 engine.post(async move {
737 let _ = tx.send(7);
738 });
739 let received = rx.recv_timeout(Duration::from_secs(2)).map_err(|err| {
740 VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError)
741 .with_source(err.to_string())
742 })?;
743 assert_eq!(received, 7);
744
745 engine.destroy_with_timeout(Duration::from_secs(2))?;
746 assert_eq!(runtime.block_on(async { 9 }), 9);
747 Ok(())
748 }
749
750 fn build_scheduler_config(suffix: &str) -> VibeEngineConfig {
751 let store_path = std::env::temp_dir().join(format!(
752 "vibe-ready-scheduler-{}-{}",
753 suffix,
754 crate::platform::now()
755 ));
756 VibeEngineConfig::builder()
757 .platform(VibePlatformType::MacOS)
758 .app_name("scheduler-test")
759 .namespace("tests")
760 .log_backend(VibeLogBackend::Noop)
761 .store_backend(VibeStoreBackend::Noop)
762 .runtime_worker_threads(1)
763 .callback_threads(1)
764 .queue_capacity(16, 8)
765 .priority_queue_capacity(256)
766 .store_root_path(store_path)
767 .build()
768 }
769
770 /// Acceptance #1: 周期任务在 destroy 时被正确取消。
771 #[test]
772 fn periodic_task_is_cancelled_on_destroy() -> Result<(), VibeEngineError> {
773 use std::sync::atomic::{AtomicUsize, Ordering};
774 let engine = VibeEngine::create(build_scheduler_config("periodic-cancel"))?;
775 let counter = Arc::new(AtomicUsize::new(0));
776 let counter_clone = Arc::clone(&counter);
777 let handle =
778 engine.schedule_every("periodic.tick", Duration::from_millis(20), move |_token| {
779 let c = Arc::clone(&counter_clone);
780 async move {
781 c.fetch_add(1, Ordering::SeqCst);
782 }
783 })?;
784 std::thread::sleep(Duration::from_millis(120));
785 let runs_before_destroy = counter.load(Ordering::SeqCst);
786 assert!(runs_before_destroy >= 2, "periodic should have ticked");
787
788 engine.destroy_with_timeout(Duration::from_secs(2))?;
789
790 // After destroy the handle must report a terminal state and the
791 // counter must stop growing.
792 assert!(handle.is_finished()?, "handle finished after destroy");
793 let after = counter.load(Ordering::SeqCst);
794 std::thread::sleep(Duration::from_millis(80));
795 assert_eq!(
796 after,
797 counter.load(Ordering::SeqCst),
798 "no further ticks after destroy"
799 );
800 Ok(())
801 }
802
803 /// Acceptance #2: 高优先级任务在拥塞时延迟显著低于普通任务。
804 #[test]
805 fn high_priority_task_runs_before_queued_normal_tasks() -> Result<(), VibeEngineError> {
806 use std::sync::atomic::{AtomicUsize, Ordering};
807 let engine = VibeEngine::create(build_scheduler_config("priority"))?;
808 let order = Arc::new(Mutex::new(Vec::<u32>::new()));
809 let next_idx = Arc::new(AtomicUsize::new(0));
810 // Saturate the normal lane with 30 long-ish tasks (sequential
811 // dispatcher → ~30 * 30ms = 900ms of work) so the high-priority task,
812 // posted shortly after, wins the next dispatch cycle.
813 for _ in 0..30 {
814 let order = Arc::clone(&order);
815 let next_idx = Arc::clone(&next_idx);
816 engine.post_with_priority("normal", VibeTaskPriority::Normal, async move {
817 tokio::time::sleep(Duration::from_millis(30)).await;
818 let idx = next_idx.fetch_add(1, Ordering::SeqCst) as u32;
819 if let Ok(mut order) = order.lock() {
820 order.push(idx);
821 }
822 })?;
823 }
824
825 // Give the dispatcher a moment to begin the first normal task, then
826 // enqueue a high-priority marker.
827 std::thread::sleep(Duration::from_millis(40));
828 let high_marker = Arc::new(Mutex::new(None::<u32>));
829 let marker_clone = Arc::clone(&high_marker);
830 let next_idx_clone = Arc::clone(&next_idx);
831 engine.post_with_priority("high", VibeTaskPriority::High, async move {
832 let idx = next_idx_clone.fetch_add(1, Ordering::SeqCst) as u32;
833 if let Ok(mut marker) = marker_clone.lock() {
834 *marker = Some(idx);
835 }
836 })?;
837
838 // Wait long enough for the high-priority task to run but far less
839 // than the time required to drain all normal tasks (~900ms).
840 std::thread::sleep(Duration::from_millis(200));
841 let high_idx = high_marker
842 .lock()
843 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
844 .ok_or_else(|| {
845 VibeEngineError::from_error_code_msg(
846 VibeEngineErrorCode::TimeoutError,
847 "high task did not run".to_string(),
848 )
849 })?;
850 assert!(
851 (high_idx as usize) < 15,
852 "high-priority task ran at index {high_idx}, expected to overtake majority of normal tasks"
853 );
854
855 engine.destroy_with_timeout(Duration::from_secs(5))?;
856 Ok(())
857 }
858
859 /// Acceptance #3: 取消后的任务不再产生副作用且 join 返回 Cancelled。
860 #[test]
861 fn cancelled_task_join_returns_cancelled_error() -> Result<(), VibeEngineError> {
862 use std::sync::atomic::{AtomicBool, Ordering};
863 let engine = VibeEngine::create(build_scheduler_config("cancel"))?;
864 let ran = Arc::new(AtomicBool::new(false));
865 let ran_clone = Arc::clone(&ran);
866 let handle = engine.schedule_after(
867 "delayed",
868 Duration::from_millis(200),
869 move |token| async move {
870 // Should never fire its side-effect because the cancellation
871 // is requested before the delay elapses; but if it does start,
872 // it bails out immediately on the token.
873 if token.is_cancelled() {
874 return;
875 }
876 ran_clone.store(true, Ordering::SeqCst);
877 },
878 )?;
879
880 // Cancel before the delay elapses.
881 std::thread::sleep(Duration::from_millis(40));
882 handle.cancel();
883
884 // Join via a host runtime since we are on the test thread.
885 let join_runtime = tokio::runtime::Builder::new_current_thread()
886 .enable_all()
887 .build()
888 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
889 let join_handle = handle.clone();
890 let result = join_runtime.block_on(async move {
891 tokio::time::timeout(Duration::from_secs(2), join_handle.join()).await
892 });
893 let join_result = result.map_err(|_| {
894 VibeEngineError::from_error_code_msg(
895 VibeEngineErrorCode::TimeoutError,
896 "join did not time out".to_string(),
897 )
898 })?;
899 assert_eq!(
900 join_result.unwrap_err().code(),
901 VibeEngineErrorCode::Cancelled.code()
902 );
903 assert!(!ran.load(Ordering::SeqCst), "cancelled task did not run");
904
905 engine.destroy_with_timeout(Duration::from_secs(2))?;
906 Ok(())
907 }
908
909 /// Sanity: tasks() panel exposes scheduler activity.
910 #[test]
911 fn task_panel_lists_pending_tasks() -> Result<(), VibeEngineError> {
912 let engine = VibeEngine::create(build_scheduler_config("panel"))?;
913 let _h = engine.schedule_after(
914 "long-delay",
915 Duration::from_secs(30),
916 |_token| async move {},
917 )?;
918 let snapshot = engine.tasks().list()?;
919 assert_eq!(snapshot.len(), 1);
920 assert_eq!(snapshot[0].name, "long-delay");
921 engine.destroy_with_timeout(Duration::from_secs(2))?;
922 Ok(())
923 }
924}
925
926#[cfg(test)]
927mod strict_tests {
928 use super::*;
929 include!(concat!(
930 env!("CARGO_MANIFEST_DIR"),
931 "/test/unit/api/engine_tests.rs"
932 ));
933}