1use std::{
2 collections::VecDeque,
3 fmt,
4 future::Future,
5 panic::{self, AssertUnwindSafe},
6 pin::Pin,
7 sync::{
8 Arc, Condvar, Mutex,
9 atomic::{AtomicBool, AtomicU64, Ordering},
10 },
11 task::{Context, Poll, Waker},
12 thread,
13 time::{Duration, Instant},
14};
15
16use crate::{
17 error::{Error, Result},
18 options::StorageMode,
19};
20
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
23pub enum RuntimeMode {
24 #[default]
26 NativeThreads,
27 PlatformIo,
29 Inline,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub struct RuntimeOptions {
36 pub mode: RuntimeMode,
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub struct RuntimeCapabilities {
43 flags: u8,
44}
45
46const BACKGROUND_THREADS: u8 = 1 << 0;
47const COOPERATIVE_TASKS: u8 = 1 << 1;
48const BLOCKING_ADAPTER: u8 = 1 << 2;
49const CANCELLATION_TOKENS: u8 = 1 << 3;
50const TASK_JOIN: u8 = 1 << 4;
51const PLATFORM_ASYNC_IO: u8 = 1 << 5;
52
53const DEFAULT_BLOCKING_WORKERS: usize = 4;
54const DEFAULT_BLOCKING_QUEUE_DEPTH: usize = 1024;
55
56type BlockingTask = Box<dyn FnOnce() + Send + 'static>;
57
58#[derive(Debug, Clone)]
59pub(crate) struct Runtime {
60 options: RuntimeOptions,
61 blocking_pool: Option<Arc<BlockingTaskPool>>,
62}
63
64#[derive(Debug)]
65#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), allow(dead_code))]
66pub(crate) enum RuntimeTask {
67 NativeThread(thread::JoinHandle<()>),
68}
69
70pub(crate) struct BlockingResultFuture<T> {
71 state: Arc<Mutex<BlockingResultState<T>>>,
72}
73
74struct BlockingTaskPool {
75 state: Arc<BlockingTaskPoolState>,
76 workers: Mutex<BlockingWorkers>,
77}
78
79struct BlockingResultState<T> {
80 result: Option<Result<T>>,
81 waker: Option<Waker>,
82}
83
84struct BlockingTaskPoolState {
85 queue: Mutex<BlockingTaskQueue>,
86 wake: Condvar,
87 worker_count: usize,
88 queue_depth: usize,
89 submitted_tasks: AtomicU64,
90 completed_tasks: AtomicU64,
91 rejected_tasks: AtomicU64,
92 total_runtime_micros: AtomicU64,
93}
94
95#[derive(Debug, Default)]
96struct BlockingWorkers {
97 started: bool,
98 handles: Vec<thread::JoinHandle<()>>,
99}
100
101#[derive(Default)]
102struct BlockingTaskQueue {
103 tasks: VecDeque<BlockingTask>,
104 shutdown: bool,
105}
106
107#[derive(Debug, Clone, Default)]
109pub struct CancellationToken {
110 cancelled: Arc<AtomicBool>,
111}
112
113#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
114pub(crate) struct RuntimeBlockingAdapterStats {
115 pub(crate) worker_count: usize,
116 pub(crate) queue_capacity: usize,
117 pub(crate) queued_tasks: usize,
118 pub(crate) submitted_tasks: u64,
119 pub(crate) completed_tasks: u64,
120 pub(crate) rejected_tasks: u64,
121 pub(crate) total_runtime_micros: u64,
122}
123
124impl RuntimeOptions {
125 #[must_use]
127 pub const fn native_threads() -> Self {
128 Self {
129 mode: RuntimeMode::NativeThreads,
130 }
131 }
132
133 #[must_use]
135 pub const fn inline() -> Self {
136 Self {
137 mode: RuntimeMode::Inline,
138 }
139 }
140
141 #[must_use]
143 pub const fn platform_io() -> Self {
144 Self {
145 mode: RuntimeMode::PlatformIo,
146 }
147 }
148
149 #[must_use]
151 pub const fn capabilities(self) -> RuntimeCapabilities {
152 const NATIVE_THREAD_FLAGS: u8 = BACKGROUND_THREADS
153 | COOPERATIVE_TASKS
154 | BLOCKING_ADAPTER
155 | CANCELLATION_TOKENS
156 | TASK_JOIN;
157 match self.mode {
158 RuntimeMode::NativeThreads => RuntimeCapabilities::new(NATIVE_THREAD_FLAGS),
159 RuntimeMode::PlatformIo => {
160 RuntimeCapabilities::new(NATIVE_THREAD_FLAGS | platform_async_io_flag())
161 }
162 RuntimeMode::Inline => {
163 RuntimeCapabilities::new(COOPERATIVE_TASKS | CANCELLATION_TOKENS)
164 }
165 }
166 }
167}
168
169impl Default for RuntimeOptions {
170 fn default() -> Self {
171 Self::native_threads()
172 }
173}
174
175impl CancellationToken {
176 #[must_use]
178 pub fn new() -> Self {
179 Self::default()
180 }
181
182 pub fn cancel(&self) {
184 self.cancelled.store(true, Ordering::Release);
185 }
186
187 #[must_use]
189 pub fn is_cancelled(&self) -> bool {
190 self.cancelled.load(Ordering::Acquire)
191 }
192}
193
194impl RuntimeCapabilities {
195 const fn new(flags: u8) -> Self {
196 Self { flags }
197 }
198
199 #[must_use]
201 pub const fn background_threads(self) -> bool {
202 self.has(BACKGROUND_THREADS)
203 }
204
205 #[must_use]
207 pub const fn cooperative_tasks(self) -> bool {
208 self.has(COOPERATIVE_TASKS)
209 }
210
211 #[must_use]
213 pub const fn blocking_adapter(self) -> bool {
214 self.has(BLOCKING_ADAPTER)
215 }
216
217 #[must_use]
219 pub const fn cancellation_tokens(self) -> bool {
220 self.has(CANCELLATION_TOKENS)
221 }
222
223 #[must_use]
225 pub const fn task_join(self) -> bool {
226 self.has(TASK_JOIN)
227 }
228
229 #[must_use]
231 pub const fn platform_async_io(self) -> bool {
232 self.has(PLATFORM_ASYNC_IO)
233 }
234
235 const fn has(self, flag: u8) -> bool {
236 self.flags & flag != 0
237 }
238}
239
240const fn platform_async_io_flag() -> u8 {
241 if cfg!(all(feature = "platform-io", target_os = "linux")) {
242 PLATFORM_ASYNC_IO
243 } else {
244 0
245 }
246}
247
248impl Runtime {
249 pub(crate) fn new(options: RuntimeOptions) -> Self {
250 Self::with_blocking_limits(
251 options,
252 DEFAULT_BLOCKING_WORKERS,
253 DEFAULT_BLOCKING_QUEUE_DEPTH,
254 )
255 }
256
257 pub(crate) fn with_blocking_limits(
258 options: RuntimeOptions,
259 blocking_worker_count: usize,
260 blocking_queue_depth: usize,
261 ) -> Self {
262 let blocking_pool = if options.capabilities().blocking_adapter() {
263 Some(Arc::new(BlockingTaskPool::new(
264 blocking_worker_count,
265 blocking_queue_depth,
266 )))
267 } else {
268 None
269 };
270 Self {
271 options,
272 blocking_pool,
273 }
274 }
275
276 pub(crate) const fn capabilities(&self) -> RuntimeCapabilities {
277 self.options.capabilities()
278 }
279
280 #[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), allow(dead_code))]
281 pub(crate) fn spawn_background(
282 &self,
283 name: String,
284 task: impl FnOnce() + Send + 'static,
285 ) -> Result<RuntimeTask> {
286 match self.options.mode {
287 RuntimeMode::NativeThreads | RuntimeMode::PlatformIo => thread::Builder::new()
288 .name(name)
289 .spawn(task)
290 .map(RuntimeTask::NativeThread)
291 .map_err(Error::Io),
292 RuntimeMode::Inline => Err(Error::unsupported("runtime background threads")),
293 }
294 }
295
296 pub(crate) fn spawn_blocking(&self, task: impl FnOnce() + Send + 'static) -> Result<()> {
297 let Some(pool) = &self.blocking_pool else {
298 return Err(Error::unsupported("runtime sync adapter"));
299 };
300 pool.submit(Box::new(task))
301 }
302
303 pub(crate) fn spawn_blocking_result<T>(
304 &self,
305 task: impl FnOnce() -> Result<T> + Send + 'static,
306 ) -> Result<BlockingResultFuture<T>>
307 where
308 T: Send + 'static,
309 {
310 let state = Arc::new(Mutex::new(BlockingResultState {
311 result: None,
312 waker: None,
313 }));
314 let task_state = Arc::clone(&state);
315 self.spawn_blocking(move || {
316 let result = panic::catch_unwind(AssertUnwindSafe(task))
317 .unwrap_or_else(|_| Err(Error::runtime_busy("blocking task panicked")));
318 if let Ok(mut state) = task_state.lock() {
319 state.result = Some(result);
320 if let Some(waker) = state.waker.take() {
321 waker.wake();
322 }
323 }
324 })?;
325 Ok(BlockingResultFuture { state })
326 }
327
328 pub(crate) fn blocking_adapter_stats(&self) -> Option<RuntimeBlockingAdapterStats> {
329 self.blocking_pool.as_ref().map(|pool| pool.stats())
330 }
331}
332
333impl<T> fmt::Debug for BlockingResultFuture<T> {
334 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
335 formatter.debug_struct("BlockingResultFuture").finish()
336 }
337}
338
339impl<T> Future for BlockingResultFuture<T> {
340 type Output = Result<T>;
341
342 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
343 let Ok(mut state) = self.state.lock() else {
344 return Poll::Ready(Err(Error::runtime_busy(
345 "blocking result state is poisoned",
346 )));
347 };
348 if let Some(result) = state.result.take() {
349 Poll::Ready(result)
350 } else {
351 state.waker = Some(context.waker().clone());
352 Poll::Pending
353 }
354 }
355}
356
357impl fmt::Debug for BlockingTaskPool {
358 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
359 let queued = self.state.queue.lock().map_or(0, |queue| queue.tasks.len());
360 let started = self.workers.lock().is_ok_and(|workers| workers.started);
361 formatter
362 .debug_struct("BlockingTaskPool")
363 .field("worker_count", &self.state.worker_count)
364 .field("queue_depth", &self.state.queue_depth)
365 .field("queued", &queued)
366 .field("started", &started)
367 .finish()
368 }
369}
370
371impl BlockingTaskPool {
372 fn new(worker_count: usize, queue_depth: usize) -> Self {
373 Self {
374 state: Arc::new(BlockingTaskPoolState {
375 queue: Mutex::new(BlockingTaskQueue::default()),
376 wake: Condvar::new(),
377 worker_count: worker_count.max(1),
378 queue_depth: queue_depth.max(1),
379 submitted_tasks: AtomicU64::new(0),
380 completed_tasks: AtomicU64::new(0),
381 rejected_tasks: AtomicU64::new(0),
382 total_runtime_micros: AtomicU64::new(0),
383 }),
384 workers: Mutex::new(BlockingWorkers::default()),
385 }
386 }
387
388 fn submit(&self, task: BlockingTask) -> Result<()> {
389 self.ensure_started()?;
390 self.state.submit(task)
391 }
392
393 fn stats(&self) -> RuntimeBlockingAdapterStats {
394 self.state.stats()
395 }
396
397 fn ensure_started(&self) -> Result<()> {
398 let mut workers = self
399 .workers
400 .lock()
401 .map_err(|_| Error::runtime_busy("blocking worker registry is poisoned"))?;
402 if workers.started {
403 return Ok(());
404 }
405
406 let mut handles = Vec::with_capacity(self.state.worker_count);
407 for worker_index in 0..self.state.worker_count {
408 let state = Arc::clone(&self.state);
409 match thread::Builder::new()
410 .name(format!("trine-kv-blocking-{worker_index}"))
411 .spawn(move || blocking_worker_loop(&state))
412 {
413 Ok(handle) => handles.push(handle),
414 Err(error) => {
415 self.state.shutdown();
416 for handle in handles {
417 let _ = handle.join();
418 }
419 return Err(Error::Io(error));
420 }
421 }
422 }
423 workers.handles = handles;
424 workers.started = true;
425 Ok(())
426 }
427}
428
429impl Drop for BlockingTaskPool {
430 fn drop(&mut self) {
431 self.state.shutdown();
432 let current_thread = thread::current().id();
433 let Ok(mut workers) = self.workers.lock() else {
434 return;
435 };
436 for handle in workers.handles.drain(..) {
437 if handle.thread().id() == current_thread {
438 continue;
439 }
440 let _ = handle.join();
441 }
442 }
443}
444
445impl BlockingTaskPoolState {
446 fn submit(&self, task: BlockingTask) -> Result<()> {
447 let mut queue = self
448 .queue
449 .lock()
450 .map_err(|_| Error::runtime_busy("blocking task queue is poisoned"))?;
451 if queue.shutdown {
452 self.rejected_tasks.fetch_add(1, Ordering::Relaxed);
453 return Err(Error::Closed);
454 }
455 if queue.tasks.len() >= self.queue_depth {
456 self.rejected_tasks.fetch_add(1, Ordering::Relaxed);
457 return Err(Error::runtime_busy("blocking task queue is full"));
458 }
459 queue.tasks.push_back(task);
460 self.submitted_tasks.fetch_add(1, Ordering::Relaxed);
461 self.wake.notify_one();
462 Ok(())
463 }
464
465 fn next_task(&self) -> Option<BlockingTask> {
466 let Ok(mut queue) = self.queue.lock() else {
467 return None;
468 };
469 loop {
470 if let Some(task) = queue.tasks.pop_front() {
471 return Some(task);
472 }
473 if queue.shutdown {
474 return None;
475 }
476 let Ok(next_queue) = self.wake.wait(queue) else {
477 return None;
478 };
479 queue = next_queue;
480 }
481 }
482
483 fn shutdown(&self) {
484 if let Ok(mut queue) = self.queue.lock() {
485 queue.shutdown = true;
486 self.wake.notify_all();
487 }
488 }
489
490 fn record_completed(&self, runtime: Duration) {
491 self.completed_tasks.fetch_add(1, Ordering::Relaxed);
492 self.total_runtime_micros
493 .fetch_add(duration_to_micros_saturating(runtime), Ordering::Relaxed);
494 }
495
496 fn stats(&self) -> RuntimeBlockingAdapterStats {
497 RuntimeBlockingAdapterStats {
498 worker_count: self.worker_count,
499 queue_capacity: self.queue_depth,
500 queued_tasks: self.queue.lock().map_or(0, |queue| queue.tasks.len()),
501 submitted_tasks: self.submitted_tasks.load(Ordering::Acquire),
502 completed_tasks: self.completed_tasks.load(Ordering::Acquire),
503 rejected_tasks: self.rejected_tasks.load(Ordering::Acquire),
504 total_runtime_micros: self.total_runtime_micros.load(Ordering::Acquire),
505 }
506 }
507}
508
509fn blocking_worker_loop(state: &BlockingTaskPoolState) {
510 while let Some(task) = state.next_task() {
511 let started = Instant::now();
512 let _ = panic::catch_unwind(AssertUnwindSafe(task));
513 state.record_completed(started.elapsed());
514 }
515}
516
517fn duration_to_micros_saturating(duration: Duration) -> u64 {
518 u64::try_from(duration.as_micros()).unwrap_or(u64::MAX)
519}
520
521impl RuntimeTask {
522 pub(crate) fn is_current_thread(&self) -> bool {
523 match self {
524 Self::NativeThread(handle) => handle.thread().id() == thread::current().id(),
525 }
526 }
527
528 pub(crate) fn join(self) -> thread::Result<()> {
529 match self {
530 Self::NativeThread(handle) => handle.join(),
531 }
532 }
533}
534
535pub(crate) fn validate_runtime_options(
536 runtime: RuntimeOptions,
537 storage_mode: &StorageMode,
538 read_only: bool,
539 background_worker_count: usize,
540) -> Result<()> {
541 #[cfg(not(feature = "platform-io"))]
542 if matches!(runtime.mode, RuntimeMode::PlatformIo) {
543 return Err(Error::unsupported_backend(
544 "platform async I/O runtime requires the platform-io feature",
545 ));
546 }
547
548 let persistent_background_workers =
549 storage_mode.persistent_path().is_some() && !read_only && background_worker_count != 0;
550 if persistent_background_workers && !runtime.capabilities().background_threads() {
551 return Err(Error::invalid_options(
552 "background workers require runtime background threads",
553 ));
554 }
555
556 Ok(())
557}
558
559#[cfg(test)]
560mod tests {
561 use std::{
562 future::Future,
563 sync::{Arc, mpsc},
564 task::{Context, Poll, Wake, Waker},
565 thread,
566 time::Duration,
567 };
568
569 use crate::{
570 Db, DbOptions, Error, Result,
571 runtime::{CancellationToken, Runtime, RuntimeOptions},
572 };
573
574 struct ThreadWaker {
575 thread: thread::Thread,
576 }
577
578 impl Wake for ThreadWaker {
579 fn wake(self: Arc<Self>) {
580 self.thread.unpark();
581 }
582
583 fn wake_by_ref(self: &Arc<Self>) {
584 self.thread.unpark();
585 }
586 }
587
588 fn block_on_test_future<T>(future: impl Future<Output = Result<T>>) -> Result<T> {
589 let waker = Waker::from(Arc::new(ThreadWaker {
590 thread: thread::current(),
591 }));
592 let mut context = Context::from_waker(&waker);
593 let mut future = std::pin::pin!(future);
594 loop {
595 match future.as_mut().poll(&mut context) {
596 Poll::Ready(result) => return result,
597 Poll::Pending => thread::park_timeout(Duration::from_secs(1)),
598 }
599 }
600 }
601
602 #[test]
603 fn runtime_capabilities_follow_selected_mode() {
604 let native = RuntimeOptions::native_threads().capabilities();
605 assert!(native.background_threads());
606 assert!(native.cancellation_tokens());
607 assert!(native.task_join());
608 assert!(native.blocking_adapter());
609 assert!(!native.platform_async_io());
610
611 let platform = RuntimeOptions::platform_io().capabilities();
612 assert!(platform.background_threads());
613 assert!(platform.cancellation_tokens());
614 assert!(platform.task_join());
615 assert!(platform.blocking_adapter());
616 assert_eq!(
617 platform.platform_async_io(),
618 cfg!(all(feature = "platform-io", target_os = "linux"))
619 );
620
621 let inline = RuntimeOptions::inline().capabilities();
622 assert!(!inline.background_threads());
623 assert!(inline.cooperative_tasks());
624 assert!(inline.cancellation_tokens());
625 assert!(!inline.blocking_adapter());
626 assert!(!inline.platform_async_io());
627 assert!(!inline.task_join());
628 }
629
630 #[test]
631 fn cancellation_token_clones_share_state() {
632 let token = CancellationToken::new();
633 let clone = token.clone();
634
635 assert!(!token.is_cancelled());
636 clone.cancel();
637
638 assert!(token.is_cancelled());
639 assert!(clone.is_cancelled());
640 }
641
642 #[test]
643 fn native_background_task_observes_cancellation_and_joins() {
644 let runtime = Runtime::new(RuntimeOptions::native_threads());
645 let token = CancellationToken::new();
646 let worker_token = token.clone();
647 let (started_tx, started_rx) = mpsc::channel();
648 let (done_tx, done_rx) = mpsc::channel();
649
650 let task = runtime
651 .spawn_background("trine-kv-runtime-cancel-test".to_owned(), move || {
652 started_tx.send(()).expect("report worker start");
653 while !worker_token.is_cancelled() {
654 thread::sleep(Duration::from_millis(1));
655 }
656 done_tx.send(()).expect("report worker done");
657 })
658 .expect("spawn background task");
659
660 started_rx
661 .recv_timeout(Duration::from_secs(1))
662 .expect("worker starts");
663 token.cancel();
664 done_rx
665 .recv_timeout(Duration::from_secs(1))
666 .expect("worker observes cancellation");
667 task.join().expect("worker joins");
668 }
669
670 #[test]
671 fn native_blocking_adapter_runs_tasks_on_bounded_workers() {
672 let runtime = Runtime::with_blocking_limits(RuntimeOptions::native_threads(), 1, 2);
673 let (done_tx, done_rx) = mpsc::channel();
674
675 runtime
676 .spawn_blocking(move || {
677 done_tx
678 .send(thread::current().name().map(str::to_owned))
679 .expect("report blocking task completion");
680 })
681 .expect("spawn blocking task");
682
683 let worker_name = done_rx
684 .recv_timeout(Duration::from_secs(1))
685 .expect("blocking task completes")
686 .expect("blocking worker has a name");
687 assert!(worker_name.starts_with("trine-kv-blocking-"));
688 }
689
690 #[test]
691 fn native_blocking_result_future_completes_on_bounded_worker() {
692 let runtime = Runtime::with_blocking_limits(RuntimeOptions::native_threads(), 1, 2);
693 let future = runtime
694 .spawn_blocking_result(|| {
695 thread::current()
696 .name()
697 .map(str::to_owned)
698 .ok_or_else(|| Error::runtime_busy("blocking worker is unnamed"))
699 })
700 .expect("spawn blocking result task");
701
702 let worker_name = block_on_test_future(future).expect("blocking result completes");
703
704 assert!(worker_name.starts_with("trine-kv-blocking-"));
705 }
706
707 #[test]
708 fn native_blocking_adapter_rejects_full_queue() {
709 let runtime = Runtime::with_blocking_limits(RuntimeOptions::native_threads(), 1, 1);
710 let (started_tx, started_rx) = mpsc::channel();
711 let (release_tx, release_rx) = mpsc::channel();
712 let (queued_tx, queued_rx) = mpsc::channel();
713
714 runtime
715 .spawn_blocking(move || {
716 started_tx.send(()).expect("report blocking task start");
717 release_rx.recv().expect("wait for release");
718 })
719 .expect("spawn first blocking task");
720 started_rx
721 .recv_timeout(Duration::from_secs(1))
722 .expect("first blocking task starts");
723
724 runtime
725 .spawn_blocking(move || {
726 queued_tx.send(()).expect("report queued task completion");
727 })
728 .expect("queue second blocking task");
729
730 let error = runtime
731 .spawn_blocking(|| {})
732 .expect_err("third blocking task exceeds bounded queue");
733 assert!(matches!(error, Error::RuntimeBusy { .. }));
734 let stats = runtime
735 .blocking_adapter_stats()
736 .expect("sync adapter stats exist");
737 assert_eq!(stats.worker_count, 1);
738 assert_eq!(stats.queue_capacity, 1);
739 assert_eq!(stats.queued_tasks, 1);
740 assert_eq!(stats.submitted_tasks, 2);
741 assert_eq!(stats.completed_tasks, 0);
742 assert_eq!(stats.rejected_tasks, 1);
743 assert!(
744 queued_rx.recv_timeout(Duration::from_millis(20)).is_err(),
745 "queued task must wait until the active worker is released"
746 );
747
748 release_tx.send(()).expect("release first task");
749 queued_rx
750 .recv_timeout(Duration::from_secs(1))
751 .expect("queued task eventually runs");
752 }
753
754 #[test]
755 fn inline_runtime_rejects_blocking_adapter_tasks() {
756 let runtime = Runtime::new(RuntimeOptions::inline());
757
758 let error = runtime
759 .spawn_blocking(|| {})
760 .expect_err("inline runtime has no sync adapter");
761
762 assert!(matches!(error, Error::Unsupported { .. }));
763 }
764
765 #[cfg(not(feature = "platform-io"))]
766 #[test]
767 fn platform_io_runtime_requires_feature() {
768 let path = std::env::temp_dir().join(format!(
769 "trine-kv-runtime-no-platform-io-{}",
770 std::process::id()
771 ));
772 let mut options = DbOptions::persistent(path.clone());
773 options.runtime = RuntimeOptions::platform_io();
774 let error = Db::open_sync(options).expect_err("platform I/O requires feature");
775
776 assert!(matches!(error, Error::UnsupportedBackend { .. }));
777 let _ = std::fs::remove_dir_all(path);
778 }
779
780 #[test]
781 fn persistent_background_workers_require_thread_capability() {
782 let path = std::env::temp_dir().join(format!(
783 "trine-kv-runtime-no-threads-{}",
784 std::process::id()
785 ));
786 let mut options = DbOptions::persistent(path);
787 options.runtime = RuntimeOptions::inline();
788 options.background_worker_count = 1;
789
790 let error = Db::open_sync(options).expect_err("background threads are required");
791
792 assert!(matches!(error, Error::InvalidOptions { .. }));
793 }
794}