Skip to main content

trine_kv/
runtime.rs

1use std::{
2    collections::VecDeque,
3    fmt,
4    future::Future,
5    panic::{self, AssertUnwindSafe},
6    pin::Pin,
7    sync::{
8        Arc, Condvar, Mutex,
9        atomic::{AtomicBool, AtomicU64, Ordering},
10    },
11    task::{Context, Poll, Waker},
12    thread,
13    time::{Duration, Instant},
14};
15
16use crate::{
17    error::{Error, Result},
18    options::StorageMode,
19};
20
21/// Runtime strategy used by async-first database operations.
22#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
23pub enum RuntimeMode {
24    /// Use native threads for background and blocking work.
25    #[default]
26    NativeThreads,
27    /// Prefer platform async I/O when the target and feature set support it.
28    PlatformIo,
29    /// Run supported work inline on the caller's thread.
30    Inline,
31}
32
33/// Runtime configuration for a database handle.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub struct RuntimeOptions {
36    /// Selected runtime strategy.
37    pub mode: RuntimeMode,
38}
39
40/// Capabilities exposed by a selected runtime configuration.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub struct RuntimeCapabilities {
43    flags: u8,
44}
45
46const BACKGROUND_THREADS: u8 = 1 << 0;
47const COOPERATIVE_TASKS: u8 = 1 << 1;
48const BLOCKING_ADAPTER: u8 = 1 << 2;
49const CANCELLATION_TOKENS: u8 = 1 << 3;
50const TASK_JOIN: u8 = 1 << 4;
51const PLATFORM_ASYNC_IO: u8 = 1 << 5;
52
53const DEFAULT_BLOCKING_WORKERS: usize = 4;
54const DEFAULT_BLOCKING_QUEUE_DEPTH: usize = 1024;
55
56type BlockingTask = Box<dyn FnOnce() + Send + 'static>;
57
58#[derive(Debug, Clone)]
59pub(crate) struct Runtime {
60    options: RuntimeOptions,
61    blocking_pool: Option<Arc<BlockingTaskPool>>,
62}
63
64#[derive(Debug)]
65#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), allow(dead_code))]
66pub(crate) enum RuntimeTask {
67    NativeThread(thread::JoinHandle<()>),
68}
69
70pub(crate) struct BlockingResultFuture<T> {
71    state: Arc<Mutex<BlockingResultState<T>>>,
72}
73
74struct BlockingTaskPool {
75    state: Arc<BlockingTaskPoolState>,
76    workers: Mutex<BlockingWorkers>,
77}
78
79struct BlockingResultState<T> {
80    result: Option<Result<T>>,
81    waker: Option<Waker>,
82}
83
84struct BlockingTaskPoolState {
85    queue: Mutex<BlockingTaskQueue>,
86    wake: Condvar,
87    worker_count: usize,
88    queue_depth: usize,
89    submitted_tasks: AtomicU64,
90    completed_tasks: AtomicU64,
91    rejected_tasks: AtomicU64,
92    total_runtime_micros: AtomicU64,
93}
94
95#[derive(Debug, Default)]
96struct BlockingWorkers {
97    started: bool,
98    handles: Vec<thread::JoinHandle<()>>,
99}
100
101#[derive(Default)]
102struct BlockingTaskQueue {
103    tasks: VecDeque<BlockingTask>,
104    shutdown: bool,
105}
106
107/// Shareable flag used to request cancellation of cooperative work.
108#[derive(Debug, Clone, Default)]
109pub struct CancellationToken {
110    cancelled: Arc<AtomicBool>,
111}
112
113#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
114pub(crate) struct RuntimeBlockingAdapterStats {
115    pub(crate) worker_count: usize,
116    pub(crate) queue_capacity: usize,
117    pub(crate) queued_tasks: usize,
118    pub(crate) submitted_tasks: u64,
119    pub(crate) completed_tasks: u64,
120    pub(crate) rejected_tasks: u64,
121    pub(crate) total_runtime_micros: u64,
122}
123
124impl RuntimeOptions {
125    /// Uses native threads for background work and blocking adapters.
126    #[must_use]
127    pub const fn native_threads() -> Self {
128        Self {
129            mode: RuntimeMode::NativeThreads,
130        }
131    }
132
133    /// Runs supported work inline without background threads.
134    #[must_use]
135    pub const fn inline() -> Self {
136        Self {
137            mode: RuntimeMode::Inline,
138        }
139    }
140
141    /// Requests platform async I/O support when available.
142    #[must_use]
143    pub const fn platform_io() -> Self {
144        Self {
145            mode: RuntimeMode::PlatformIo,
146        }
147    }
148
149    /// Returns the capabilities implied by these runtime options.
150    #[must_use]
151    pub const fn capabilities(self) -> RuntimeCapabilities {
152        const NATIVE_THREAD_FLAGS: u8 = BACKGROUND_THREADS
153            | COOPERATIVE_TASKS
154            | BLOCKING_ADAPTER
155            | CANCELLATION_TOKENS
156            | TASK_JOIN;
157        match self.mode {
158            RuntimeMode::NativeThreads => RuntimeCapabilities::new(NATIVE_THREAD_FLAGS),
159            RuntimeMode::PlatformIo => {
160                RuntimeCapabilities::new(NATIVE_THREAD_FLAGS | platform_async_io_flag())
161            }
162            RuntimeMode::Inline => {
163                RuntimeCapabilities::new(COOPERATIVE_TASKS | CANCELLATION_TOKENS)
164            }
165        }
166    }
167}
168
169impl Default for RuntimeOptions {
170    fn default() -> Self {
171        Self::native_threads()
172    }
173}
174
175impl CancellationToken {
176    /// Creates a token in the not-cancelled state.
177    #[must_use]
178    pub fn new() -> Self {
179        Self::default()
180    }
181
182    /// Marks the token as cancelled.
183    pub fn cancel(&self) {
184        self.cancelled.store(true, Ordering::Release);
185    }
186
187    /// Returns whether cancellation has been requested.
188    #[must_use]
189    pub fn is_cancelled(&self) -> bool {
190        self.cancelled.load(Ordering::Acquire)
191    }
192}
193
194impl RuntimeCapabilities {
195    const fn new(flags: u8) -> Self {
196        Self { flags }
197    }
198
199    /// Returns whether the runtime can spawn background threads.
200    #[must_use]
201    pub const fn background_threads(self) -> bool {
202        self.has(BACKGROUND_THREADS)
203    }
204
205    /// Returns whether the runtime can cooperatively run maintenance tasks.
206    #[must_use]
207    pub const fn cooperative_tasks(self) -> bool {
208        self.has(COOPERATIVE_TASKS)
209    }
210
211    /// Returns whether the runtime can adapt blocking storage work.
212    #[must_use]
213    pub const fn blocking_adapter(self) -> bool {
214        self.has(BLOCKING_ADAPTER)
215    }
216
217    /// Returns whether cancellation tokens are supported.
218    #[must_use]
219    pub const fn cancellation_tokens(self) -> bool {
220        self.has(CANCELLATION_TOKENS)
221    }
222
223    /// Returns whether spawned tasks can be joined.
224    #[must_use]
225    pub const fn task_join(self) -> bool {
226        self.has(TASK_JOIN)
227    }
228
229    /// Returns whether at least one Trine storage operation uses platform async I/O.
230    #[must_use]
231    pub const fn platform_async_io(self) -> bool {
232        self.has(PLATFORM_ASYNC_IO)
233    }
234
235    const fn has(self, flag: u8) -> bool {
236        self.flags & flag != 0
237    }
238}
239
240const fn platform_async_io_flag() -> u8 {
241    if cfg!(all(feature = "platform-io", target_os = "linux")) {
242        PLATFORM_ASYNC_IO
243    } else {
244        0
245    }
246}
247
248impl Runtime {
249    pub(crate) fn new(options: RuntimeOptions) -> Self {
250        Self::with_blocking_limits(
251            options,
252            DEFAULT_BLOCKING_WORKERS,
253            DEFAULT_BLOCKING_QUEUE_DEPTH,
254        )
255    }
256
257    pub(crate) fn with_blocking_limits(
258        options: RuntimeOptions,
259        blocking_worker_count: usize,
260        blocking_queue_depth: usize,
261    ) -> Self {
262        let blocking_pool = if options.capabilities().blocking_adapter() {
263            Some(Arc::new(BlockingTaskPool::new(
264                blocking_worker_count,
265                blocking_queue_depth,
266            )))
267        } else {
268            None
269        };
270        Self {
271            options,
272            blocking_pool,
273        }
274    }
275
276    pub(crate) const fn capabilities(&self) -> RuntimeCapabilities {
277        self.options.capabilities()
278    }
279
280    #[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), allow(dead_code))]
281    pub(crate) fn spawn_background(
282        &self,
283        name: String,
284        task: impl FnOnce() + Send + 'static,
285    ) -> Result<RuntimeTask> {
286        match self.options.mode {
287            RuntimeMode::NativeThreads | RuntimeMode::PlatformIo => thread::Builder::new()
288                .name(name)
289                .spawn(task)
290                .map(RuntimeTask::NativeThread)
291                .map_err(Error::Io),
292            RuntimeMode::Inline => Err(Error::unsupported("runtime background threads")),
293        }
294    }
295
296    pub(crate) fn spawn_blocking(&self, task: impl FnOnce() + Send + 'static) -> Result<()> {
297        let Some(pool) = &self.blocking_pool else {
298            return Err(Error::unsupported("runtime sync adapter"));
299        };
300        pool.submit(Box::new(task))
301    }
302
303    pub(crate) fn spawn_blocking_result<T>(
304        &self,
305        task: impl FnOnce() -> Result<T> + Send + 'static,
306    ) -> Result<BlockingResultFuture<T>>
307    where
308        T: Send + 'static,
309    {
310        let state = Arc::new(Mutex::new(BlockingResultState {
311            result: None,
312            waker: None,
313        }));
314        let task_state = Arc::clone(&state);
315        self.spawn_blocking(move || {
316            let result = panic::catch_unwind(AssertUnwindSafe(task))
317                .unwrap_or_else(|_| Err(Error::runtime_busy("blocking task panicked")));
318            if let Ok(mut state) = task_state.lock() {
319                state.result = Some(result);
320                if let Some(waker) = state.waker.take() {
321                    waker.wake();
322                }
323            }
324        })?;
325        Ok(BlockingResultFuture { state })
326    }
327
328    pub(crate) fn blocking_adapter_stats(&self) -> Option<RuntimeBlockingAdapterStats> {
329        self.blocking_pool.as_ref().map(|pool| pool.stats())
330    }
331}
332
333impl<T> fmt::Debug for BlockingResultFuture<T> {
334    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
335        formatter.debug_struct("BlockingResultFuture").finish()
336    }
337}
338
339impl<T> Future for BlockingResultFuture<T> {
340    type Output = Result<T>;
341
342    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
343        let Ok(mut state) = self.state.lock() else {
344            return Poll::Ready(Err(Error::runtime_busy(
345                "blocking result state is poisoned",
346            )));
347        };
348        if let Some(result) = state.result.take() {
349            Poll::Ready(result)
350        } else {
351            state.waker = Some(context.waker().clone());
352            Poll::Pending
353        }
354    }
355}
356
357impl fmt::Debug for BlockingTaskPool {
358    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
359        let queued = self.state.queue.lock().map_or(0, |queue| queue.tasks.len());
360        let started = self.workers.lock().is_ok_and(|workers| workers.started);
361        formatter
362            .debug_struct("BlockingTaskPool")
363            .field("worker_count", &self.state.worker_count)
364            .field("queue_depth", &self.state.queue_depth)
365            .field("queued", &queued)
366            .field("started", &started)
367            .finish()
368    }
369}
370
371impl BlockingTaskPool {
372    fn new(worker_count: usize, queue_depth: usize) -> Self {
373        Self {
374            state: Arc::new(BlockingTaskPoolState {
375                queue: Mutex::new(BlockingTaskQueue::default()),
376                wake: Condvar::new(),
377                worker_count: worker_count.max(1),
378                queue_depth: queue_depth.max(1),
379                submitted_tasks: AtomicU64::new(0),
380                completed_tasks: AtomicU64::new(0),
381                rejected_tasks: AtomicU64::new(0),
382                total_runtime_micros: AtomicU64::new(0),
383            }),
384            workers: Mutex::new(BlockingWorkers::default()),
385        }
386    }
387
388    fn submit(&self, task: BlockingTask) -> Result<()> {
389        self.ensure_started()?;
390        self.state.submit(task)
391    }
392
393    fn stats(&self) -> RuntimeBlockingAdapterStats {
394        self.state.stats()
395    }
396
397    fn ensure_started(&self) -> Result<()> {
398        let mut workers = self
399            .workers
400            .lock()
401            .map_err(|_| Error::runtime_busy("blocking worker registry is poisoned"))?;
402        if workers.started {
403            return Ok(());
404        }
405
406        let mut handles = Vec::with_capacity(self.state.worker_count);
407        for worker_index in 0..self.state.worker_count {
408            let state = Arc::clone(&self.state);
409            match thread::Builder::new()
410                .name(format!("trine-kv-blocking-{worker_index}"))
411                .spawn(move || blocking_worker_loop(&state))
412            {
413                Ok(handle) => handles.push(handle),
414                Err(error) => {
415                    self.state.shutdown();
416                    for handle in handles {
417                        let _ = handle.join();
418                    }
419                    return Err(Error::Io(error));
420                }
421            }
422        }
423        workers.handles = handles;
424        workers.started = true;
425        Ok(())
426    }
427}
428
429impl Drop for BlockingTaskPool {
430    fn drop(&mut self) {
431        self.state.shutdown();
432        let current_thread = thread::current().id();
433        let Ok(mut workers) = self.workers.lock() else {
434            return;
435        };
436        for handle in workers.handles.drain(..) {
437            if handle.thread().id() == current_thread {
438                continue;
439            }
440            let _ = handle.join();
441        }
442    }
443}
444
445impl BlockingTaskPoolState {
446    fn submit(&self, task: BlockingTask) -> Result<()> {
447        let mut queue = self
448            .queue
449            .lock()
450            .map_err(|_| Error::runtime_busy("blocking task queue is poisoned"))?;
451        if queue.shutdown {
452            self.rejected_tasks.fetch_add(1, Ordering::Relaxed);
453            return Err(Error::Closed);
454        }
455        if queue.tasks.len() >= self.queue_depth {
456            self.rejected_tasks.fetch_add(1, Ordering::Relaxed);
457            return Err(Error::runtime_busy("blocking task queue is full"));
458        }
459        queue.tasks.push_back(task);
460        self.submitted_tasks.fetch_add(1, Ordering::Relaxed);
461        self.wake.notify_one();
462        Ok(())
463    }
464
465    fn next_task(&self) -> Option<BlockingTask> {
466        let Ok(mut queue) = self.queue.lock() else {
467            return None;
468        };
469        loop {
470            if let Some(task) = queue.tasks.pop_front() {
471                return Some(task);
472            }
473            if queue.shutdown {
474                return None;
475            }
476            let Ok(next_queue) = self.wake.wait(queue) else {
477                return None;
478            };
479            queue = next_queue;
480        }
481    }
482
483    fn shutdown(&self) {
484        if let Ok(mut queue) = self.queue.lock() {
485            queue.shutdown = true;
486            self.wake.notify_all();
487        }
488    }
489
490    fn record_completed(&self, runtime: Duration) {
491        self.completed_tasks.fetch_add(1, Ordering::Relaxed);
492        self.total_runtime_micros
493            .fetch_add(duration_to_micros_saturating(runtime), Ordering::Relaxed);
494    }
495
496    fn stats(&self) -> RuntimeBlockingAdapterStats {
497        RuntimeBlockingAdapterStats {
498            worker_count: self.worker_count,
499            queue_capacity: self.queue_depth,
500            queued_tasks: self.queue.lock().map_or(0, |queue| queue.tasks.len()),
501            submitted_tasks: self.submitted_tasks.load(Ordering::Acquire),
502            completed_tasks: self.completed_tasks.load(Ordering::Acquire),
503            rejected_tasks: self.rejected_tasks.load(Ordering::Acquire),
504            total_runtime_micros: self.total_runtime_micros.load(Ordering::Acquire),
505        }
506    }
507}
508
509fn blocking_worker_loop(state: &BlockingTaskPoolState) {
510    while let Some(task) = state.next_task() {
511        let started = Instant::now();
512        let _ = panic::catch_unwind(AssertUnwindSafe(task));
513        state.record_completed(started.elapsed());
514    }
515}
516
517fn duration_to_micros_saturating(duration: Duration) -> u64 {
518    u64::try_from(duration.as_micros()).unwrap_or(u64::MAX)
519}
520
521impl RuntimeTask {
522    pub(crate) fn is_current_thread(&self) -> bool {
523        match self {
524            Self::NativeThread(handle) => handle.thread().id() == thread::current().id(),
525        }
526    }
527
528    pub(crate) fn join(self) -> thread::Result<()> {
529        match self {
530            Self::NativeThread(handle) => handle.join(),
531        }
532    }
533}
534
535pub(crate) fn validate_runtime_options(
536    runtime: RuntimeOptions,
537    storage_mode: &StorageMode,
538    read_only: bool,
539    background_worker_count: usize,
540) -> Result<()> {
541    #[cfg(not(feature = "platform-io"))]
542    if matches!(runtime.mode, RuntimeMode::PlatformIo) {
543        return Err(Error::unsupported_backend(
544            "platform async I/O runtime requires the platform-io feature",
545        ));
546    }
547
548    let persistent_background_workers =
549        storage_mode.persistent_path().is_some() && !read_only && background_worker_count != 0;
550    if persistent_background_workers && !runtime.capabilities().background_threads() {
551        return Err(Error::invalid_options(
552            "background workers require runtime background threads",
553        ));
554    }
555
556    Ok(())
557}
558
559#[cfg(test)]
560mod tests {
561    use std::{
562        future::Future,
563        sync::{Arc, mpsc},
564        task::{Context, Poll, Wake, Waker},
565        thread,
566        time::Duration,
567    };
568
569    use crate::{
570        Db, DbOptions, Error, Result,
571        runtime::{CancellationToken, Runtime, RuntimeOptions},
572    };
573
574    struct ThreadWaker {
575        thread: thread::Thread,
576    }
577
578    impl Wake for ThreadWaker {
579        fn wake(self: Arc<Self>) {
580            self.thread.unpark();
581        }
582
583        fn wake_by_ref(self: &Arc<Self>) {
584            self.thread.unpark();
585        }
586    }
587
588    fn block_on_test_future<T>(future: impl Future<Output = Result<T>>) -> Result<T> {
589        let waker = Waker::from(Arc::new(ThreadWaker {
590            thread: thread::current(),
591        }));
592        let mut context = Context::from_waker(&waker);
593        let mut future = std::pin::pin!(future);
594        loop {
595            match future.as_mut().poll(&mut context) {
596                Poll::Ready(result) => return result,
597                Poll::Pending => thread::park_timeout(Duration::from_secs(1)),
598            }
599        }
600    }
601
602    #[test]
603    fn runtime_capabilities_follow_selected_mode() {
604        let native = RuntimeOptions::native_threads().capabilities();
605        assert!(native.background_threads());
606        assert!(native.cancellation_tokens());
607        assert!(native.task_join());
608        assert!(native.blocking_adapter());
609        assert!(!native.platform_async_io());
610
611        let platform = RuntimeOptions::platform_io().capabilities();
612        assert!(platform.background_threads());
613        assert!(platform.cancellation_tokens());
614        assert!(platform.task_join());
615        assert!(platform.blocking_adapter());
616        assert_eq!(
617            platform.platform_async_io(),
618            cfg!(all(feature = "platform-io", target_os = "linux"))
619        );
620
621        let inline = RuntimeOptions::inline().capabilities();
622        assert!(!inline.background_threads());
623        assert!(inline.cooperative_tasks());
624        assert!(inline.cancellation_tokens());
625        assert!(!inline.blocking_adapter());
626        assert!(!inline.platform_async_io());
627        assert!(!inline.task_join());
628    }
629
630    #[test]
631    fn cancellation_token_clones_share_state() {
632        let token = CancellationToken::new();
633        let clone = token.clone();
634
635        assert!(!token.is_cancelled());
636        clone.cancel();
637
638        assert!(token.is_cancelled());
639        assert!(clone.is_cancelled());
640    }
641
642    #[test]
643    fn native_background_task_observes_cancellation_and_joins() {
644        let runtime = Runtime::new(RuntimeOptions::native_threads());
645        let token = CancellationToken::new();
646        let worker_token = token.clone();
647        let (started_tx, started_rx) = mpsc::channel();
648        let (done_tx, done_rx) = mpsc::channel();
649
650        let task = runtime
651            .spawn_background("trine-kv-runtime-cancel-test".to_owned(), move || {
652                started_tx.send(()).expect("report worker start");
653                while !worker_token.is_cancelled() {
654                    thread::sleep(Duration::from_millis(1));
655                }
656                done_tx.send(()).expect("report worker done");
657            })
658            .expect("spawn background task");
659
660        started_rx
661            .recv_timeout(Duration::from_secs(1))
662            .expect("worker starts");
663        token.cancel();
664        done_rx
665            .recv_timeout(Duration::from_secs(1))
666            .expect("worker observes cancellation");
667        task.join().expect("worker joins");
668    }
669
670    #[test]
671    fn native_blocking_adapter_runs_tasks_on_bounded_workers() {
672        let runtime = Runtime::with_blocking_limits(RuntimeOptions::native_threads(), 1, 2);
673        let (done_tx, done_rx) = mpsc::channel();
674
675        runtime
676            .spawn_blocking(move || {
677                done_tx
678                    .send(thread::current().name().map(str::to_owned))
679                    .expect("report blocking task completion");
680            })
681            .expect("spawn blocking task");
682
683        let worker_name = done_rx
684            .recv_timeout(Duration::from_secs(1))
685            .expect("blocking task completes")
686            .expect("blocking worker has a name");
687        assert!(worker_name.starts_with("trine-kv-blocking-"));
688    }
689
690    #[test]
691    fn native_blocking_result_future_completes_on_bounded_worker() {
692        let runtime = Runtime::with_blocking_limits(RuntimeOptions::native_threads(), 1, 2);
693        let future = runtime
694            .spawn_blocking_result(|| {
695                thread::current()
696                    .name()
697                    .map(str::to_owned)
698                    .ok_or_else(|| Error::runtime_busy("blocking worker is unnamed"))
699            })
700            .expect("spawn blocking result task");
701
702        let worker_name = block_on_test_future(future).expect("blocking result completes");
703
704        assert!(worker_name.starts_with("trine-kv-blocking-"));
705    }
706
707    #[test]
708    fn native_blocking_adapter_rejects_full_queue() {
709        let runtime = Runtime::with_blocking_limits(RuntimeOptions::native_threads(), 1, 1);
710        let (started_tx, started_rx) = mpsc::channel();
711        let (release_tx, release_rx) = mpsc::channel();
712        let (queued_tx, queued_rx) = mpsc::channel();
713
714        runtime
715            .spawn_blocking(move || {
716                started_tx.send(()).expect("report blocking task start");
717                release_rx.recv().expect("wait for release");
718            })
719            .expect("spawn first blocking task");
720        started_rx
721            .recv_timeout(Duration::from_secs(1))
722            .expect("first blocking task starts");
723
724        runtime
725            .spawn_blocking(move || {
726                queued_tx.send(()).expect("report queued task completion");
727            })
728            .expect("queue second blocking task");
729
730        let error = runtime
731            .spawn_blocking(|| {})
732            .expect_err("third blocking task exceeds bounded queue");
733        assert!(matches!(error, Error::RuntimeBusy { .. }));
734        let stats = runtime
735            .blocking_adapter_stats()
736            .expect("sync adapter stats exist");
737        assert_eq!(stats.worker_count, 1);
738        assert_eq!(stats.queue_capacity, 1);
739        assert_eq!(stats.queued_tasks, 1);
740        assert_eq!(stats.submitted_tasks, 2);
741        assert_eq!(stats.completed_tasks, 0);
742        assert_eq!(stats.rejected_tasks, 1);
743        assert!(
744            queued_rx.recv_timeout(Duration::from_millis(20)).is_err(),
745            "queued task must wait until the active worker is released"
746        );
747
748        release_tx.send(()).expect("release first task");
749        queued_rx
750            .recv_timeout(Duration::from_secs(1))
751            .expect("queued task eventually runs");
752    }
753
754    #[test]
755    fn inline_runtime_rejects_blocking_adapter_tasks() {
756        let runtime = Runtime::new(RuntimeOptions::inline());
757
758        let error = runtime
759            .spawn_blocking(|| {})
760            .expect_err("inline runtime has no sync adapter");
761
762        assert!(matches!(error, Error::Unsupported { .. }));
763    }
764
765    #[cfg(not(feature = "platform-io"))]
766    #[test]
767    fn platform_io_runtime_requires_feature() {
768        let path = std::env::temp_dir().join(format!(
769            "trine-kv-runtime-no-platform-io-{}",
770            std::process::id()
771        ));
772        let mut options = DbOptions::persistent(path.clone());
773        options.runtime = RuntimeOptions::platform_io();
774        let error = Db::open_sync(options).expect_err("platform I/O requires feature");
775
776        assert!(matches!(error, Error::UnsupportedBackend { .. }));
777        let _ = std::fs::remove_dir_all(path);
778    }
779
780    #[test]
781    fn persistent_background_workers_require_thread_capability() {
782        let path = std::env::temp_dir().join(format!(
783            "trine-kv-runtime-no-threads-{}",
784            std::process::id()
785        ));
786        let mut options = DbOptions::persistent(path);
787        options.runtime = RuntimeOptions::inline();
788        options.background_worker_count = 1;
789
790        let error = Db::open_sync(options).expect_err("background threads are required");
791
792        assert!(matches!(error, Error::InvalidOptions { .. }));
793    }
794}