1#[cfg(not(feature = "iouring-network"))]
2use crate::network::tokio::{Config as TokioNetworkConfig, Network as TokioNetwork};
3#[cfg(feature = "iouring-storage")]
4use crate::storage::iouring::{Config as IoUringConfig, Storage as IoUringStorage};
5#[cfg(not(feature = "iouring-storage"))]
6use crate::storage::tokio::{Config as TokioStorageConfig, Storage as TokioStorage};
7#[cfg(feature = "external")]
8use crate::Pacer;
9#[cfg(feature = "iouring-network")]
10use crate::{
11 iouring,
12 network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
13};
14use crate::{
15 network::metered::Network as MeteredNetwork,
16 process::metered::Metrics as MeteredProcess,
17 signal::Signal,
18 storage::metered::Storage as MeteredStorage,
19 telemetry::metrics::task::Label,
20 utils::{
21 self, add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard,
22 },
23 BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, SinkOf,
24 Spawner as _, StreamOf, METRICS_PREFIX,
25};
26use commonware_macros::{select, stability};
27#[stability(BETA)]
28use commonware_parallel::ThreadPool;
29use commonware_utils::{sync::Mutex, NZUsize};
30use futures::future::Either;
31use governor::clock::{Clock as GClock, ReasonablyRealtime};
32use prometheus_client::{
33 metrics::{counter::Counter, family::Family, gauge::Gauge},
34 registry::{Metric, Registry as PrometheusRegistry},
35};
36use rand::{rngs::OsRng, CryptoRng, RngCore};
37#[stability(BETA)]
38use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
39use std::{
40 borrow::Cow,
41 env,
42 future::Future,
43 net::{IpAddr, SocketAddr},
44 num::NonZeroUsize,
45 path::PathBuf,
46 sync::Arc,
47 time::{Duration, SystemTime},
48};
49use tokio::runtime::{Builder, Runtime};
50use tracing::{info_span, Instrument};
51use tracing_opentelemetry::OpenTelemetrySpanExt;
52
53#[cfg(feature = "iouring-network")]
54cfg_if::cfg_if! {
55 if #[cfg(test)] {
56 const IOURING_NETWORK_SIZE: u32 = 128;
59 } else {
60 const IOURING_NETWORK_SIZE: u32 = 1024;
61 }
62}
63
64#[derive(Debug)]
65struct Metrics {
66 tasks_spawned: Family<Label, Counter>,
67 tasks_running: Family<Label, Gauge>,
68}
69
70impl Metrics {
71 pub fn init(registry: &mut PrometheusRegistry) -> Self {
72 let metrics = Self {
73 tasks_spawned: Family::default(),
74 tasks_running: Family::default(),
75 };
76 registry.register(
77 "tasks_spawned",
78 "Total number of tasks spawned",
79 metrics.tasks_spawned.clone(),
80 );
81 registry.register(
82 "tasks_running",
83 "Number of tasks currently running",
84 metrics.tasks_running.clone(),
85 );
86 metrics
87 }
88}
89
90#[derive(Clone, Debug)]
91pub struct NetworkConfig {
92 tcp_nodelay: Option<bool>,
97
98 zero_linger: bool,
107
108 read_write_timeout: Duration,
114}
115
116impl Default for NetworkConfig {
117 fn default() -> Self {
118 Self {
119 tcp_nodelay: Some(true),
120 zero_linger: true,
121 read_write_timeout: Duration::from_secs(60),
122 }
123 }
124}
125
126#[derive(Clone)]
128pub struct Config {
129 worker_threads: usize,
135
136 max_blocking_threads: usize,
144
145 thread_stack_size: usize,
152
153 catch_panics: bool,
155
156 storage_directory: PathBuf,
158
159 maximum_buffer_size: usize,
163
164 network_cfg: NetworkConfig,
166
167 network_buffer_pool_cfg: Option<BufferPoolConfig>,
169
170 storage_buffer_pool_cfg: Option<BufferPoolConfig>,
172}
173
174impl Config {
175 pub fn new() -> Self {
177 let rng = OsRng.next_u64();
178 let storage_directory = env::temp_dir().join(format!("commonware_tokio_runtime_{rng}"));
179 Self {
180 worker_threads: 2,
181 max_blocking_threads: 512,
182 thread_stack_size: utils::thread::system_thread_stack_size(),
183 catch_panics: false,
184 storage_directory,
185 maximum_buffer_size: 2 * 1024 * 1024, network_cfg: NetworkConfig::default(),
187 network_buffer_pool_cfg: None,
188 storage_buffer_pool_cfg: None,
189 }
190 }
191
192 pub const fn with_worker_threads(mut self, n: usize) -> Self {
195 self.worker_threads = n;
196 self
197 }
198 pub const fn with_max_blocking_threads(mut self, n: usize) -> Self {
200 self.max_blocking_threads = n;
201 self
202 }
203 pub const fn with_thread_stack_size(mut self, n: usize) -> Self {
205 self.thread_stack_size = n;
206 self
207 }
208 pub const fn with_catch_panics(mut self, b: bool) -> Self {
210 self.catch_panics = b;
211 self
212 }
213 pub const fn with_read_write_timeout(mut self, d: Duration) -> Self {
215 self.network_cfg.read_write_timeout = d;
216 self
217 }
218 pub const fn with_tcp_nodelay(mut self, n: Option<bool>) -> Self {
220 self.network_cfg.tcp_nodelay = n;
221 self
222 }
223 pub const fn with_zero_linger(mut self, l: bool) -> Self {
225 self.network_cfg.zero_linger = l;
226 self
227 }
228 pub fn with_storage_directory(mut self, p: impl Into<PathBuf>) -> Self {
230 self.storage_directory = p.into();
231 self
232 }
233 pub const fn with_maximum_buffer_size(mut self, n: usize) -> Self {
235 self.maximum_buffer_size = n;
236 self
237 }
238 pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
240 self.network_buffer_pool_cfg = Some(cfg);
241 self
242 }
243 pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
245 self.storage_buffer_pool_cfg = Some(cfg);
246 self
247 }
248
249 pub const fn worker_threads(&self) -> usize {
252 self.worker_threads
253 }
254 pub const fn max_blocking_threads(&self) -> usize {
256 self.max_blocking_threads
257 }
258 pub const fn thread_stack_size(&self) -> usize {
260 self.thread_stack_size
261 }
262 pub const fn catch_panics(&self) -> bool {
264 self.catch_panics
265 }
266 pub const fn read_write_timeout(&self) -> Duration {
268 self.network_cfg.read_write_timeout
269 }
270 pub const fn tcp_nodelay(&self) -> Option<bool> {
272 self.network_cfg.tcp_nodelay
273 }
274 pub const fn zero_linger(&self) -> bool {
276 self.network_cfg.zero_linger
277 }
278 pub const fn storage_directory(&self) -> &PathBuf {
280 &self.storage_directory
281 }
282 pub const fn maximum_buffer_size(&self) -> usize {
284 self.maximum_buffer_size
285 }
286
287 fn resolved_network_buffer_pool_config(&self) -> BufferPoolConfig {
290 self.network_buffer_pool_cfg.clone().unwrap_or_else(|| {
291 BufferPoolConfig::for_network()
292 .with_thread_cache_for_parallelism(NZUsize!(self.worker_threads))
293 })
294 }
295
296 fn resolved_storage_buffer_pool_config(&self) -> BufferPoolConfig {
299 self.storage_buffer_pool_cfg.clone().unwrap_or_else(|| {
300 BufferPoolConfig::for_storage()
301 .with_thread_cache_for_parallelism(NZUsize!(self.worker_threads))
302 })
303 }
304}
305
306impl Default for Config {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312pub struct Executor {
314 registry: Mutex<Registry>,
315 metrics: Arc<Metrics>,
316 runtime: Runtime,
317 shutdown: Mutex<Stopper>,
318 panicker: Panicker,
319 thread_stack_size: usize,
320}
321
322pub struct Runner {
324 cfg: Config,
325}
326
327impl Default for Runner {
328 fn default() -> Self {
329 Self::new(Config::default())
330 }
331}
332
333impl Runner {
334 pub const fn new(cfg: Config) -> Self {
336 Self { cfg }
337 }
338}
339
340impl crate::Runner for Runner {
341 type Context = Context;
342
343 fn start<F, Fut>(self, f: F) -> Fut::Output
344 where
345 F: FnOnce(Self::Context) -> Fut,
346 Fut: Future,
347 {
348 let mut registry = Registry::new();
350 let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
351
352 let metrics = Arc::new(Metrics::init(runtime_registry));
354 let runtime = Builder::new_multi_thread()
355 .worker_threads(self.cfg.worker_threads)
356 .max_blocking_threads(self.cfg.max_blocking_threads)
357 .thread_stack_size(self.cfg.thread_stack_size)
358 .enable_all()
359 .build()
360 .expect("failed to create Tokio runtime");
361
362 let (panicker, panicked) = Panicker::new(self.cfg.catch_panics);
364
365 let process = MeteredProcess::init(runtime_registry);
370 runtime.spawn(process.collect(tokio::time::sleep));
371
372 let network_buffer_pool = BufferPool::new(
374 self.cfg.resolved_network_buffer_pool_config(),
375 runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
376 );
377 let storage_buffer_pool = BufferPool::new(
378 self.cfg.resolved_storage_buffer_pool_config(),
379 runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
380 );
381
382 cfg_if::cfg_if! {
384 if #[cfg(feature = "iouring-storage")] {
385 let iouring_registry =
386 runtime_registry.sub_registry_with_prefix("iouring_storage");
387 let storage = MeteredStorage::new(
388 IoUringStorage::start(
389 IoUringConfig {
390 storage_directory: self.cfg.storage_directory.clone(),
391 iouring_config: Default::default(),
392 thread_stack_size: self.cfg.thread_stack_size,
393 },
394 iouring_registry,
395 storage_buffer_pool.clone(),
396 ),
397 runtime_registry,
398 );
399 } else {
400 let storage = MeteredStorage::new(
401 TokioStorage::new(
402 TokioStorageConfig::new(
403 self.cfg.storage_directory.clone(),
404 self.cfg.maximum_buffer_size,
405 ),
406 storage_buffer_pool.clone(),
407 ),
408 runtime_registry,
409 );
410 }
411 }
412
413 cfg_if::cfg_if! {
415 if #[cfg(feature = "iouring-network")] {
416 let iouring_registry =
417 runtime_registry.sub_registry_with_prefix("iouring_network");
418 let config = IoUringNetworkConfig {
419 tcp_nodelay: self.cfg.network_cfg.tcp_nodelay,
420 zero_linger: self.cfg.network_cfg.zero_linger,
421 read_write_timeout: self.cfg.network_cfg.read_write_timeout,
422 iouring_config: iouring::Config {
423 size: IOURING_NETWORK_SIZE,
425 max_request_timeout: self.cfg.network_cfg.read_write_timeout,
426 shutdown_timeout: Some(self.cfg.network_cfg.read_write_timeout),
427 ..Default::default()
428 },
429 thread_stack_size: self.cfg.thread_stack_size,
430 ..Default::default()
431 };
432 let network = MeteredNetwork::new(
433 IoUringNetwork::start(
434 config,
435 iouring_registry,
436 network_buffer_pool.clone(),
437 )
438 .unwrap(),
439 runtime_registry,
440 );
441 } else {
442 let config = TokioNetworkConfig::default()
443 .with_read_timeout(self.cfg.network_cfg.read_write_timeout)
444 .with_write_timeout(self.cfg.network_cfg.read_write_timeout)
445 .with_tcp_nodelay(self.cfg.network_cfg.tcp_nodelay)
446 .with_zero_linger(self.cfg.network_cfg.zero_linger);
447 let network = MeteredNetwork::new(
448 TokioNetwork::new(config, network_buffer_pool.clone()),
449 runtime_registry,
450 );
451 }
452 }
453
454 let executor = Arc::new(Executor {
456 registry: Mutex::new(registry),
457 metrics,
458 runtime,
459 shutdown: Mutex::new(Stopper::default()),
460 panicker,
461 thread_stack_size: self.cfg.thread_stack_size,
462 });
463
464 let label = Label::root();
466 executor.metrics.tasks_spawned.get_or_create(&label).inc();
467 let gauge = executor.metrics.tasks_running.get_or_create(&label).clone();
468
469 let context = Context {
471 storage,
472 name: label.name(),
473 attributes: Vec::new(),
474 scope: None,
475 executor: executor.clone(),
476 network,
477 network_buffer_pool,
478 storage_buffer_pool,
479 tree: Tree::root(),
480 execution: Execution::default(),
481 traced: false,
482 };
483 let output = executor.runtime.block_on(panicked.interrupt(f(context)));
484 gauge.dec();
485
486 output
487 }
488}
489
490cfg_if::cfg_if! {
491 if #[cfg(feature = "iouring-storage")] {
492 type Storage = MeteredStorage<IoUringStorage>;
493 } else {
494 type Storage = MeteredStorage<TokioStorage>;
495 }
496}
497
498cfg_if::cfg_if! {
499 if #[cfg(feature = "iouring-network")] {
500 type Network = MeteredNetwork<IoUringNetwork>;
501 } else {
502 type Network = MeteredNetwork<TokioNetwork>;
503 }
504}
505
506pub struct Context {
510 name: String,
511 attributes: Vec<(String, String)>,
512 scope: Option<Arc<ScopeGuard>>,
513 executor: Arc<Executor>,
514 storage: Storage,
515 network: Network,
516 network_buffer_pool: BufferPool,
517 storage_buffer_pool: BufferPool,
518 tree: Arc<Tree>,
519 execution: Execution,
520 traced: bool,
521}
522
523impl Clone for Context {
524 fn clone(&self) -> Self {
525 let (child, _) = Tree::child(&self.tree);
526 Self {
527 name: self.name.clone(),
528 attributes: self.attributes.clone(),
529 scope: self.scope.clone(),
530 executor: self.executor.clone(),
531 storage: self.storage.clone(),
532 network: self.network.clone(),
533 network_buffer_pool: self.network_buffer_pool.clone(),
534 storage_buffer_pool: self.storage_buffer_pool.clone(),
535 tree: child,
536 execution: Execution::default(),
537 traced: false,
538 }
539 }
540}
541
542impl Context {
543 fn metrics(&self) -> &Metrics {
545 &self.executor.metrics
546 }
547}
548
549impl crate::Spawner for Context {
550 fn dedicated(mut self) -> Self {
551 self.execution = Execution::Dedicated;
552 self
553 }
554
555 fn shared(mut self, blocking: bool) -> Self {
556 self.execution = Execution::Shared(blocking);
557 self
558 }
559
560 fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
561 where
562 F: FnOnce(Self) -> Fut + Send + 'static,
563 Fut: Future<Output = T> + Send + 'static,
564 T: Send + 'static,
565 {
566 let (label, metric) = spawn_metrics!(self);
568
569 let parent = Arc::clone(&self.tree);
571 let past = self.execution;
572 let traced = self.traced;
573 self.execution = Execution::default();
574 self.traced = false;
575 let (child, aborted) = Tree::child(&parent);
576 if aborted {
577 return Handle::closed(metric);
578 }
579 self.tree = child;
580
581 let executor = self.executor.clone();
583 let future = if traced {
584 let span = info_span!("task", name = %label.name());
585 for (key, value) in &self.attributes {
586 span.set_attribute(key.clone(), value.clone());
587 }
588 Either::Left(f(self).instrument(span))
589 } else {
590 Either::Right(f(self))
591 };
592 let (f, handle) = Handle::init(
593 future,
594 metric,
595 executor.panicker.clone(),
596 Arc::clone(&parent),
597 );
598
599 if matches!(past, Execution::Dedicated) {
600 utils::thread::spawn(executor.thread_stack_size, {
601 let handle = executor.runtime.handle().clone();
603 move || {
604 handle.block_on(f);
605 }
606 });
607 } else if matches!(past, Execution::Shared(true)) {
608 executor.runtime.spawn_blocking({
609 let handle = executor.runtime.handle().clone();
611 move || {
612 handle.block_on(f);
613 }
614 });
615 } else {
616 executor.runtime.spawn(f);
617 }
618
619 if let Some(aborter) = handle.aborter() {
621 parent.register(aborter);
622 }
623
624 handle
625 }
626
627 async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
628 let stop_resolved = {
629 let mut shutdown = self.executor.shutdown.lock();
630 shutdown.stop(value)
631 };
632
633 let timeout_future = timeout.map_or_else(
635 || futures::future::Either::Right(futures::future::pending()),
636 |duration| futures::future::Either::Left(self.sleep(duration)),
637 );
638 select! {
639 result = stop_resolved => {
640 result.map_err(|_| Error::Closed)?;
641 Ok(())
642 },
643 _ = timeout_future => Err(Error::Timeout),
644 }
645 }
646
647 fn stopped(&self) -> Signal {
648 self.executor.shutdown.lock().stopped()
649 }
650}
651
652#[stability(BETA)]
653impl crate::ThreadPooler for Context {
654 fn create_thread_pool(
655 &self,
656 concurrency: NonZeroUsize,
657 ) -> Result<ThreadPool, ThreadPoolBuildError> {
658 ThreadPoolBuilder::new()
659 .num_threads(concurrency.get())
660 .spawn_handler(move |thread| {
661 self.with_label("rayon_thread")
664 .dedicated()
665 .spawn(move |_| async move { thread.run() });
666 Ok(())
667 })
668 .build()
669 .map(Arc::new)
670 }
671}
672
673impl crate::Metrics for Context {
674 fn label(&self) -> String {
675 self.name.clone()
676 }
677
678 fn with_label(&self, label: &str) -> Self {
679 let name = {
681 let prefix = self.name.clone();
682 if prefix.is_empty() {
683 label.to_string()
684 } else {
685 format!("{prefix}_{label}")
686 }
687 };
688 Self {
689 name,
690 ..self.clone()
691 }
692 }
693
694 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
695 let mut attributes = self.attributes.clone();
696 add_attribute(&mut attributes, key, value);
697 Self {
698 attributes,
699 ..self.clone()
700 }
701 }
702
703 fn with_scope(&self) -> Self {
704 if self.scope.is_some() {
706 return self.clone();
707 }
708
709 let executor = self.executor.clone();
712 let scope_id = executor.registry.lock().create_scope();
713 let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
714 executor.registry.lock().remove_scope(id);
715 }));
716 Self {
717 scope: Some(guard),
718 ..self.clone()
719 }
720 }
721
722 fn with_span(&self) -> Self {
723 Self {
724 traced: true,
725 ..self.clone()
726 }
727 }
728
729 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
730 let name = name.into();
731 let prefixed_name = {
732 let prefix = &self.name;
733 if prefix.is_empty() {
734 name
735 } else {
736 format!("{}_{}", *prefix, name)
737 }
738 };
739
740 let mut registry = self.executor.registry.lock();
742 let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
743 let sub_registry = self
744 .attributes
745 .iter()
746 .fold(scoped, |reg, (k, v): &(String, String)| {
747 reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
748 });
749 sub_registry.register(prefixed_name, help, metric);
750 }
751
752 fn encode(&self) -> String {
753 self.executor.registry.lock().encode()
754 }
755}
756
757impl Clock for Context {
758 fn current(&self) -> SystemTime {
759 SystemTime::now()
760 }
761
762 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
763 tokio::time::sleep(duration)
764 }
765
766 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
767 let duration_until_deadline = deadline.duration_since(self.current()).unwrap_or_default();
768 tokio::time::sleep(duration_until_deadline)
769 }
770}
771
772#[cfg(feature = "external")]
773impl Pacer for Context {
774 fn pace<'a, F, T>(
775 &'a self,
776 _latency: Duration,
777 future: F,
778 ) -> impl Future<Output = T> + Send + 'a
779 where
780 F: Future<Output = T> + Send + 'a,
781 T: Send + 'a,
782 {
783 future
785 }
786}
787
788impl GClock for Context {
789 type Instant = SystemTime;
790
791 fn now(&self) -> Self::Instant {
792 self.current()
793 }
794}
795
796impl ReasonablyRealtime for Context {}
797
798impl crate::Network for Context {
799 type Listener = <Network as crate::Network>::Listener;
800
801 async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
802 self.network.bind(socket).await
803 }
804
805 async fn dial(&self, socket: SocketAddr) -> Result<(SinkOf<Self>, StreamOf<Self>), Error> {
806 self.network.dial(socket).await
807 }
808}
809
810impl crate::Resolver for Context {
811 async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
812 let addrs = tokio::net::lookup_host(format!("{host}:0"))
818 .await
819 .map_err(|e| Error::ResolveFailed(e.to_string()))?;
820 Ok(addrs.map(|addr| addr.ip()).collect())
821 }
822}
823
824impl RngCore for Context {
825 fn next_u32(&mut self) -> u32 {
826 OsRng.next_u32()
827 }
828
829 fn next_u64(&mut self) -> u64 {
830 OsRng.next_u64()
831 }
832
833 fn fill_bytes(&mut self, dest: &mut [u8]) {
834 OsRng.fill_bytes(dest);
835 }
836
837 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
838 OsRng.try_fill_bytes(dest)
839 }
840}
841
842impl CryptoRng for Context {}
843
844impl crate::Storage for Context {
845 type Blob = <Storage as crate::Storage>::Blob;
846
847 async fn open_versioned(
848 &self,
849 partition: &str,
850 name: &[u8],
851 versions: std::ops::RangeInclusive<u16>,
852 ) -> Result<(Self::Blob, u64, u16), Error> {
853 self.storage.open_versioned(partition, name, versions).await
854 }
855
856 async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
857 self.storage.remove(partition, name).await
858 }
859
860 async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
861 self.storage.scan(partition).await
862 }
863}
864
865impl crate::BufferPooler for Context {
866 fn network_buffer_pool(&self) -> &BufferPool {
867 &self.network_buffer_pool
868 }
869
870 fn storage_buffer_pool(&self) -> &BufferPool {
871 &self.storage_buffer_pool
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878
879 #[test]
880 fn test_worker_threads_updates_default_buffer_pool_parallelism() {
881 let cfg = Config::new().with_worker_threads(8);
882
883 assert_eq!(cfg.worker_threads, 8);
884 assert_eq!(
885 cfg.resolved_network_buffer_pool_config()
886 .thread_cache_config,
887 BufferPoolConfig::for_network()
888 .with_thread_cache_for_parallelism(NZUsize!(8))
889 .thread_cache_config
890 );
891 assert_eq!(
892 cfg.resolved_storage_buffer_pool_config()
893 .thread_cache_config,
894 BufferPoolConfig::for_storage()
895 .with_thread_cache_for_parallelism(NZUsize!(8))
896 .thread_cache_config
897 );
898 }
899
900 #[test]
901 fn test_default_thread_stack_size_uses_system_default() {
902 let cfg = Config::new();
903 assert_eq!(
904 cfg.thread_stack_size(),
905 utils::thread::system_thread_stack_size()
906 );
907 }
908
909 #[test]
910 fn test_thread_stack_size_override() {
911 let cfg = Config::new().with_thread_stack_size(4 * 1024 * 1024);
912 assert_eq!(cfg.thread_stack_size(), 4 * 1024 * 1024);
913 }
914
915 #[test]
916 fn test_explicit_buffer_pool_configs_override_worker_threads() {
917 let cfg = Config::new()
919 .with_network_buffer_pool_config(
920 BufferPoolConfig::for_network().with_thread_cache_for_parallelism(NZUsize!(2)),
921 )
922 .with_worker_threads(8)
923 .with_storage_buffer_pool_config(
924 BufferPoolConfig::for_storage().with_thread_cache_disabled(),
925 );
926
927 assert_eq!(
928 cfg.resolved_network_buffer_pool_config()
929 .thread_cache_config,
930 BufferPoolConfig::for_network()
931 .with_thread_cache_for_parallelism(NZUsize!(2))
932 .thread_cache_config
933 );
934 assert_eq!(
935 cfg.resolved_storage_buffer_pool_config()
936 .thread_cache_config,
937 BufferPoolConfig::for_storage()
938 .with_thread_cache_disabled()
939 .thread_cache_config
940 );
941 }
942}