1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9use crate::{
10 child_label,
11 network::metered::Network as MeteredNetwork,
12 prefixed_name,
13 process::metered::Metrics as MeteredProcess,
14 signal::Signal,
15 storage::metered::Storage as MeteredStorage,
16 telemetry::metrics::{
17 add_attribute, raw, task::Label, validate_label, CounterFamily, GaugeFamily, Metric,
18 Register, Registered, Registry,
19 },
20 utils::{self, signal::Stopper, supervision::Tree, Panicker},
21 BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Name, SinkOf, Spawner as _,
22 StreamOf, Supervisor as _, METRICS_PREFIX,
23};
24#[cfg(feature = "iouring-network")]
25use crate::{
26 iouring,
27 network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
28};
29use commonware_macros::{select, stability};
30#[stability(BETA)]
31use commonware_parallel::ThreadPool;
32use commonware_utils::{sync::Mutex, NZUsize};
33use futures::future::Either;
34use governor::clock::{Clock as GClock, ReasonablyRealtime};
35use rand::{rngs::OsRng, CryptoRng, RngCore};
36#[stability(BETA)]
37use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
38use std::{
39 env,
40 future::Future,
41 net::{IpAddr, SocketAddr},
42 num::NonZeroUsize,
43 path::PathBuf,
44 sync::Arc,
45 time::{Duration, SystemTime},
46};
47use tokio::runtime::{Builder, Runtime};
48use tracing::{info_span, Instrument};
49use tracing_opentelemetry::OpenTelemetrySpanExt;
50
51#[cfg(feature = "iouring-network")]
52cfg_if::cfg_if! {
53 if #[cfg(test)] {
54 const IOURING_NETWORK_SIZE: u32 = 128;
57 } else {
58 const IOURING_NETWORK_SIZE: u32 = 1024;
59 }
60}
61
62#[derive(Debug)]
63struct Metrics {
64 tasks_spawned: CounterFamily<Label>,
65 tasks_running: GaugeFamily<Label>,
66}
67
68impl Metrics {
69 pub fn init(registry: &mut impl Register) -> Self {
70 Self {
71 tasks_spawned: registry.register(
72 "tasks_spawned",
73 "Total number of tasks spawned",
74 raw::Family::default(),
75 ),
76 tasks_running: registry.register(
77 "tasks_running",
78 "Number of tasks currently running",
79 raw::Family::default(),
80 ),
81 }
82 }
83}
84
85#[derive(Clone, Debug)]
86pub struct NetworkConfig {
87 tcp_nodelay: Option<bool>,
92
93 zero_linger: bool,
102
103 read_write_timeout: Duration,
109}
110
111impl Default for NetworkConfig {
112 fn default() -> Self {
113 Self {
114 tcp_nodelay: Some(true),
115 zero_linger: true,
116 read_write_timeout: Duration::from_secs(60),
117 }
118 }
119}
120
121#[derive(Clone)]
123pub struct Config {
124 worker_threads: usize,
130
131 global_queue_interval: Option<u32>,
138
139 max_blocking_threads: usize,
147
148 thread_stack_size: usize,
155
156 catch_panics: bool,
158
159 storage_directory: PathBuf,
161
162 maximum_buffer_size: usize,
166
167 network_cfg: NetworkConfig,
169
170 network_buffer_pool_cfg: Option<BufferPoolConfig>,
172
173 storage_buffer_pool_cfg: Option<BufferPoolConfig>,
175}
176
177impl Config {
178 pub fn new() -> Self {
180 let rng = OsRng.next_u64();
181 let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
182 Self {
183 worker_threads: 2,
184 global_queue_interval: None,
185 max_blocking_threads: 512,
186 thread_stack_size: utils::thread::system_thread_stack_size(),
187 catch_panics: false,
188 storage_directory,
189 maximum_buffer_size: 2 * 1024 * 1024, network_cfg: NetworkConfig::default(),
191 network_buffer_pool_cfg: None,
192 storage_buffer_pool_cfg: None,
193 }
194 }
195
196 pub const fn with_worker_threads(mut self, n: usize) -> Self {
199 self.worker_threads = n;
200 self
201 }
202 pub const fn with_global_queue_interval(mut self, n: u32) -> Self {
204 self.global_queue_interval = Some(n);
205 self
206 }
207 pub const fn with_max_blocking_threads(mut self, n: usize) -> Self {
209 self.max_blocking_threads = n;
210 self
211 }
212 pub const fn with_thread_stack_size(mut self, n: usize) -> Self {
214 self.thread_stack_size = n;
215 self
216 }
217 pub const fn with_catch_panics(mut self, b: bool) -> Self {
219 self.catch_panics = b;
220 self
221 }
222 pub const fn with_read_write_timeout(mut self, d: Duration) -> Self {
224 self.network_cfg.read_write_timeout = d;
225 self
226 }
227 pub const fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
229 self.network_cfg.tcp_nodelay = n;
230 self
231 }
232 pub const fn with_zero_linger(mut self, l: bool) -> Self {
234 self.network_cfg.zero_linger = l;
235 self
236 }
237 pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
239 self.storage_directory = p.into();
240 self
241 }
242 pub const fn with_maximum_buffer_size(mut self, n: usize) -> Self {
244 self.maximum_buffer_size = n;
245 self
246 }
247 pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
249 self.network_buffer_pool_cfg = Some(cfg);
250 self
251 }
252 pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
254 self.storage_buffer_pool_cfg = Some(cfg);
255 self
256 }
257
258 pub const fn worker_threads(&self) -> usize {
261 self.worker_threads
262 }
263 pub const fn global_queue_interval(&self) -> Option<u32> {
265 self.global_queue_interval
266 }
267 pub const fn max_blocking_threads(&self) -> usize {
269 self.max_blocking_threads
270 }
271 pub const fn thread_stack_size(&self) -> usize {
273 self.thread_stack_size
274 }
275 pub const fn catch_panics(&self) -> bool {
277 self.catch_panics
278 }
279 pub const fn read_write_timeout(&self) -> Duration {
281 self.network_cfg.read_write_timeout
282 }
283 pub const fn tcp_nodelay(&self) -> Option<bool> {
285 self.network_cfg.tcp_nodelay
286 }
287 pub const fn zero_linger(&self) -> bool {
289 self.network_cfg.zero_linger
290 }
291 pub const fn storage_directory(&self) -> &PathBuf {
293 &self.storage_directory
294 }
295 pub const fn maximum_buffer_size(&self) -> usize {
297 self.maximum_buffer_size
298 }
299
300 fn resolved_network_buffer_pool_config(&self) -> BufferPoolConfig {
303 self.network_buffer_pool_cfg.clone().unwrap_or_else(|| {
304 BufferPoolConfig::for_network().with_parallelism(NZUsize!(self.worker_threads))
305 })
306 }
307
308 fn resolved_storage_buffer_pool_config(&self) -> BufferPoolConfig {
311 self.storage_buffer_pool_cfg.clone().unwrap_or_else(|| {
312 BufferPoolConfig::for_storage().with_parallelism(NZUsize!(self.worker_threads))
313 })
314 }
315}
316
317impl Default for Config {
318 fn default() -> Self {
319 Self::new()
320 }
321}
322
323pub struct Executor {
325 registry: Registry,
326 metrics: Arc<Metrics>,
327 runtime: Runtime,
328 shutdown: Mutex<Stopper>,
329 panicker: Panicker,
330 thread_stack_size: usize,
331}
332
333pub struct Runner {
335 cfg: Config,
336}
337
338impl Default for Runner {
339 fn default() -> Self {
340 Self::new(Config::default())
341 }
342}
343
344impl Runner {
345 pub const fn new(cfg: Config) -> Self {
347 Self { cfg }
348 }
349}
350
351impl crate::Runner for Runner {
352 type Context = Context;
353
354 fn start<F, Fut>(self, f: F) -> Fut::Output
355 where
356 F: FnOnce(Self::Context) -> Fut,
357 Fut: Future,
358 {
359 let mut registry = Registry::new();
361 let mut runtime_registry = registry.sub_registry(METRICS_PREFIX);
362
363 let metrics = Arc::new(Metrics::init(&mut runtime_registry));
365 let mut builder = Builder::new_multi_thread();
366 builder
367 .worker_threads(self.cfg.worker_threads)
368 .max_blocking_threads(self.cfg.max_blocking_threads)
369 .thread_stack_size(self.cfg.thread_stack_size)
370 .enable_all();
371 if let Some(global_queue_interval) = self.cfg.global_queue_interval {
372 builder.global_queue_interval(global_queue_interval);
373 }
374 let runtime = builder.build().expect("failed to create Tokio runtime");
375
376 let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
378
379 let process = MeteredProcess::init(&mut runtime_registry);
384 runtime.spawn(process.collect(tokio::time::sleep));
385
386 let network_buffer_pool = BufferPool::new(
388 self.cfg.resolved_network_buffer_pool_config(),
389 &mut runtime_registry.sub_registry("network_buffer_pool"),
390 );
391 let storage_buffer_pool = BufferPool::new(
392 self.cfg.resolved_storage_buffer_pool_config(),
393 &mut runtime_registry.sub_registry("storage_buffer_pool"),
394 );
395
396 cfg_if::cfg_if! {
398 if #[cfg(feature = "iouring-storage")] {
399 let mut iouring_registry = runtime_registry.sub_registry("iouring_storage");
400 let storage = MeteredStorage::new(
401 IoUringStorage::start(
402 IoUringConfig {
403 storage_directory: self.cfg.storage_directory.clone(),
404 iouring_config: Default::default(),
405 thread_stack_size: self.cfg.thread_stack_size,
406 },
407 &mut iouring_registry,
408 storage_buffer_pool.clone(),
409 ),
410 &mut runtime_registry,
411 );
412 } else {
413 let storage = MeteredStorage::new(
414 TokioStorage::new(
415 TokioStorageConfig::new(
416 self.cfg.storage_directory.clone(),
417 self.cfg.maximum_buffer_size,
418 ),
419 storage_buffer_pool.clone(),
420 ),
421 &mut runtime_registry,
422 );
423 }
424 }
425
426 cfg_if::cfg_if! {
428 if #[cfg(feature = "iouring-network")] {
429 let mut iouring_registry = runtime_registry.sub_registry("iouring_network");
430 let config = IoUringNetworkConfig {
431 tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
432 zero_linger: self.cfg.network_cfg.zero_linger,
433 read_write_timeout: self.cfg.network_cfg.read_write_timeout,
434 iouring_config: iouring::Config {
435 size: IOURING_NETWORK_SIZE,
437 max_request_timeout: self.cfg.network_cfg.read_write_timeout,
438 shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
439 ..Default::default()
440 },
441 thread_stack_size: self.cfg.thread_stack_size,
442 ..Default::default()
443 };
444 let network = MeteredNetwork::new(
445 IoUringNetwork::start(
446 config,
447 &mut iouring_registry,
448 network_buffer_pool.clone(),
449 )
450 .unwrap(),
451 &mut runtime_registry,
452 );
453 } else {
454 let config = TokioNetworkConfig::default()
455 .with_read_timeout(self.cfg.network_cfg.read_write_timeout)
456 .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
457 .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay)
458 .with_zero_linger(self.cfg.network_cfg.zero_linger);
459 let network = MeteredNetwork::new(
460 TokioNetwork::new(config, network_buffer_pool.clone()),
461 &mut runtime_registry,
462 );
463 }
464 }
465
466 let executor = Arc::new(Executor {
468 registry,
469 metrics,
470 runtime,
471 shutdown: Mutex::new(Stopper::default()),
472 panicker,
473 thread_stack_size: self.cfg.thread_stack_size,
474 });
475
476 let label = Label::root();
478 executor.metrics.tasks_spawned.get_or_create(&label).inc();
479 let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
480
481 let context = Context {
483 storage,
484 name: label.name(),
485 attributes: Vec::new(),
486 executor: executor.clone(),
487 network,
488 network_buffer_pool,
489 storage_buffer_pool,
490 tree: Tree::root(),
491 execution: Execution::default(),
492 traced: false,
493 };
494 let output = executor.runtime.block_on(panicked.interrupt(f(context)));
495 gauge.dec();
496
497 output
498 }
499}
500
501cfg_if::cfg_if! {
502 if #[cfg(feature = "iouring-storage")] {
503 type Storage = MeteredStorage<IoUringStorage>;
504 } else {
505 type Storage = MeteredStorage<TokioStorage>;
506 }
507}
508
509cfg_if::cfg_if! {
510 if #[cfg(feature = "iouring-network")] {
511 type Network = MeteredNetwork<IoUringNetwork>;
512 } else {
513 type Network = MeteredNetwork<TokioNetwork>;
514 }
515}
516
517pub struct Context {
521 name: String,
522 attributes: Vec<(String, String)>,
523 executor: Arc<Executor>,
524 storage: Storage,
525 network: Network,
526 network_buffer_pool: BufferPool,
527 storage_buffer_pool: BufferPool,
528 tree: Arc<Tree>,
529 execution: Execution,
530 traced: bool,
531}
532
533impl Context {
534 fn metrics(&self) -> &Metrics {
536 &self.executor.metrics
537 }
538}
539
540impl crate::Spawner for Context {
541 fn dedicated(mut self) -> Self {
542 self.execution = Execution::Dedicated;
543 self
544 }
545
546 fn shared(mut self, blocking: bool) -> Self {
547 self.execution = Execution::Shared(blocking);
548 self
549 }
550
551 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
552 where
553 F: FnOnce(Self) -> Fut + Send + 'static,
554 Fut: Future<Output = T> + Send + 'static,
555 T: Send + 'static,
556 {
557 let (label, metric) = spawn_metrics!(self);
559
560 let parent = Arc::clone(&self.tree);
562 let past = self.execution;
563 let traced = self.traced;
564 self.execution = Execution::default();
565 self.traced = false;
566 let (child, aborted) = Tree::child(&parent);
567 if aborted {
568 return Handle::closed(metric);
569 }
570 self.tree = child;
571
572 let executor = self.executor.clone();
574 let future = if traced {
575 let span = info_span!("task", name = %label.name());
576 for (key, value) in &self.attributes {
577 span.set_attribute(key.clone(), value.clone());
578 }
579 Either::Left(f(self).instrument(span))
580 } else {
581 Either::Right(f(self))
582 };
583 let (f, handle) = Handle::init(
584 future,
585 metric,
586 executor.panicker.clone(),
587 Arc::clone(&parent),
588 );
589
590 if matches!(past, Execution::Dedicated) {
591 utils::thread::spawn(executor.thread_stack_size, {
592 let handle = executor.runtime.handle().clone();
594 move || {
595 handle.block_on(f);
596 }
597 });
598 } else if matches!(past, Execution::Shared(true)) {
599 executor.runtime.spawn_blocking({
600 let handle = executor.runtime.handle().clone();
602 move || {
603 handle.block_on(f);
604 }
605 });
606 } else {
607 executor.runtime.spawn(f);
608 }
609
610 if let Some(aborter) = handle.aborter() {
612 parent.register(aborter);
613 }
614
615 handle
616 }
617
618 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
619 let stop_resolved = {
620 let mut shutdown = self.executor.shutdown.lock();
621 shutdown.stop(value)
622 };
623
624 let timeout_future = timeout.map_or_else(
626 || futures::future::Either::Right(futures::future::pending()),
627 |duration| futures::future::Either::Left(self.sleep(duration)),
628 );
629 select! {
630 result = stop_resolved => {
631 result.map_err(|_| Error::Closed)?;
632 Ok(())
633 },
634 _ = timeout_future => Err(Error::Timeout),
635 }
636 }
637
638 fn stopped(&self) -> Signal {
639 self.executor.shutdown.lock().stopped()
640 }
641}
642
643#[stability(BETA)]
644impl crate::ThreadPooler for Context {
645 fn create_thread_pool(
646 &self,
647 concurrency: NonZeroUsize,
648 ) -> Result<ThreadPool, ThreadPoolBuildError> {
649 ThreadPoolBuilder::new()
650 .num_threads(concurrency.get())
651 .spawn_handler(move |thread| {
652 self.child("rayon_thread")
655 .dedicated()
656 .spawn(move |_| async move { thread.run() });
657 Ok(())
658 })
659 .build()
660 .map(Arc::new)
661 }
662}
663
664impl crate::Supervisor for Context {
665 fn child(&self, label: &'static str) -> Self {
666 let (tree, _) = Tree::child(&self.tree);
667 Self {
668 name: child_label(&self.name, label),
669 attributes: self.attributes.clone(),
670 executor: self.executor.clone(),
671 storage: self.storage.clone(),
672 network: self.network.clone(),
673 network_buffer_pool: self.network_buffer_pool.clone(),
674 storage_buffer_pool: self.storage_buffer_pool.clone(),
675 tree,
676 execution: Execution::default(),
677 traced: false,
678 }
679 }
680
681 fn with_attribute(mut self, key: &'static str, value: impl std::fmt::Display) -> Self {
682 validate_label(key);
684
685 add_attribute(&mut self.attributes, key, value);
687 self
688 }
689
690 fn name(&self) -> Name {
691 Name {
692 label: self.name.clone(),
693 attributes: self.attributes.clone(),
694 }
695 }
696}
697
698impl crate::Tracing for Context {
699 fn with_span(mut self) -> Self {
700 self.traced = true;
701 self
702 }
703}
704
705impl crate::Metrics for Context {
706 fn register<N: Into<String>, H: Into<String>, M: Metric>(
707 &self,
708 name: N,
709 help: H,
710 metric: M,
711 ) -> Registered<M> {
712 let name = name.into();
713 let help = help.into();
714 let metric = Arc::new(metric);
715 self.executor.registry.register(
716 prefixed_name(&self.name, &name),
717 help,
718 self.attributes.clone(),
719 metric,
720 )
721 }
722
723 fn encode(&self) -> String {
724 self.executor.registry.encode()
725 }
726}
727
728impl Clock for Context {
729 fn current(&self) -> SystemTime {
730 SystemTime::now()
731 }
732
733 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
734 tokio::time::sleep(duration)
735 }
736
737 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
738 let duration_until_deadline = deadline.duration_since(self.current()).unwrap_or_default();
739 tokio::time::sleep(duration_until_deadline)
740 }
741}
742
743#[cfg(feature = "external")]
744impl Pacer for Context {
745 fn pace<'a, F, T>(
746 &'a self,
747 _latency: Duration,
748 future: F,
749 ) -> impl Future<Output = T> + Send + 'a
750 where
751 F: Future<Output = T> + Send + 'a,
752 T: Send + 'a,
753 {
754 future
756 }
757}
758
759impl GClock for Context {
760 type Instant = SystemTime;
761
762 fn now(&self) -> Self::Instant {
763 self.current()
764 }
765}
766
767impl ReasonablyRealtime for Context {}
768
769impl crate::Network for Context {
770 type Listener = <Network as crate::Network>::Listener;
771
772 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
773 self.network.bind(socket).await
774 }
775
776 async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
777 self.network.dial(socket).await
778 }
779}
780
781impl crate::Resolver for Context {
782 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
783 let addrs = tokio::net::lookup_host(format!("{host}:0"))
789 .await
790 .map_err(|e| Error::ResolveFailed(e.to_string()))?;
791 Ok(addrs.map(|addr| addr.ip()).collect())
792 }
793}
794
795impl RngCore for Context {
796 fn next_u32(&mut self) -> u32 {
797 OsRng.next_u32()
798 }
799
800 fn next_u64(&mut self) -> u64 {
801 OsRng.next_u64()
802 }
803
804 fn fill_bytes(&mut self, dest: &mut [u8]) {
805 OsRng.fill_bytes(dest);
806 }
807
808 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
809 OsRng.try_fill_bytes(dest)
810 }
811}
812
813impl CryptoRng for Context {}
814
815impl crate::Storage for Context {
816 type Blob = <Storage as crate::Storage>::Blob;
817
818 async fn open_versioned(
819 &self,
820 partition: &str,
821 name: &[u8],
822 versions: std::ops::RangeInclusive<u16>,
823 ) -> Result<(Self::Blob, u64, u16), Error> {
824 self.storage.open_versioned(partition, name, versions).await
825 }
826
827 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
828 self.storage.remove(partition, name).await
829 }
830
831 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
832 self.storage.scan(partition).await
833 }
834}
835
836impl crate::BufferPooler for Context {
837 fn network_buffer_pool(&self) -> &BufferPool {
838 &self.network_buffer_pool
839 }
840
841 fn storage_buffer_pool(&self) -> &BufferPool {
842 &self.storage_buffer_pool
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use super::*;
849
850 #[test]
851 fn test_worker_threads_updates_default_buffer_pool_parallelism() {
852 let cfg = Config::new().with_worker_threads(8);
853
854 assert_eq!(cfg.worker_threads, 8);
855 let network = cfg.resolved_network_buffer_pool_config();
856 assert_eq!(network.parallelism, NZUsize!(8));
857 assert_eq!(
858 network.thread_cache_config,
859 BufferPoolConfig::for_network().thread_cache_config
860 );
861
862 let storage = cfg.resolved_storage_buffer_pool_config();
863 assert_eq!(storage.parallelism, NZUsize!(8));
864 assert_eq!(
865 storage.thread_cache_config,
866 BufferPoolConfig::for_storage().thread_cache_config
867 );
868 }
869
870 #[test]
871 fn test_default_thread_stack_size_uses_system_default() {
872 let cfg = Config::new();
873 assert_eq!(
874 cfg.thread_stack_size(),
875 utils::thread::system_thread_stack_size()
876 );
877 }
878
879 #[test]
880 fn test_thread_stack_size_override() {
881 let cfg = Config::new().with_thread_stack_size(4 * 1024 * 1024);
882 assert_eq!(cfg.thread_stack_size(), 4 * 1024 * 1024);
883 }
884
885 #[test]
886 fn test_explicit_buffer_pool_configs_override_worker_threads() {
887 let cfg = Config::new()
889 .with_network_buffer_pool_config(
890 BufferPoolConfig::for_network().with_parallelism(NZUsize!(2)),
891 )
892 .with_worker_threads(8)
893 .with_storage_buffer_pool_config(
894 BufferPoolConfig::for_storage().with_thread_cache_disabled(),
895 );
896
897 let network = cfg.resolved_network_buffer_pool_config();
898 assert_eq!(network.parallelism, NZUsize!(2));
899 assert_eq!(
900 network.thread_cache_config,
901 BufferPoolConfig::for_network().thread_cache_config
902 );
903
904 let storage = cfg.resolved_storage_buffer_pool_config();
905 assert_eq!(storage.parallelism, NZUsize!(1));
906 assert_eq!(
907 storage.thread_cache_config,
908 BufferPoolConfig::for_storage()
909 .with_thread_cache_disabled()
910 .thread_cache_config
911 );
912 }
913}