Skip to main content

datafusion_app/executor/
dedicated.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    fmt::Display,
20    sync::{Arc, OnceLock},
21    time::Duration,
22};
23
24use futures::{
25    future::{BoxFuture, Shared},
26    Future, FutureExt, TryFutureExt,
27};
28use log::{info, warn};
29use parking_lot::RwLock;
30use tokio::{
31    runtime::Handle,
32    sync::{oneshot::error::RecvError, Notify},
33    task::JoinSet,
34};
35
36use crate::config::ExecutionConfig;
37
38use super::io::register_io_runtime;
39
40const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5);
41
42/// Errors occurring when polling [`DedicatedExecutor::spawn`].
43#[derive(Debug)]
44#[allow(missing_docs)]
45pub enum JobError {
46    WorkerGone,
47    Panic { msg: String },
48}
49
50impl Display for JobError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            JobError::WorkerGone => {
54                write!(f, "Worker thread gone, executor was likely shut down")
55            }
56            JobError::Panic { msg } => write!(f, "Panic: {}", msg),
57        }
58    }
59}
60
61/// Manages a separate tokio runtime (thread pool) for executing tasks.
62///
63/// A `DedicatedExecutor` runs futures (and any `tasks` that are
64/// `tokio::task::spawned` by them) on a separate tokio Executor
65///
66/// # Background
67///
68/// Tokio has the notion of the "current" runtime, which runs the current future
69/// and any tasks spawned by it. Typically, this is the runtime created by
70/// `tokio::main` and is used for the main application logic and I/O handling
71///
72/// For CPU bound work, such as DataFusion plan execution, it is important to
73/// run on a separate thread pool to avoid blocking the I/O handling for extended
74/// periods of time in order to avoid long poll latencies (which decreases the
75/// throughput of small requests under concurrent load).
76///
77/// # IO Scheduling
78///
79/// I/O, such as network calls, should not be performed on the runtime managed
80/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running
81/// CPU tasks will not be preempted and can therefore starve servicing of other
82/// tasks. This manifests in long poll-latencies, where a task is ready to run
83/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as
84/// there is no external party waiting on a response, however, for I/O tasks,
85/// long poll latencies can prevent timely servicing of IO, which can have a
86/// significant detrimental effect.
87///
88/// # Details
89///
90/// The worker thread priority is set to low so that such tasks do
91/// not starve other more important tasks (such as answering health checks)
92///
93/// Follows the example from stack overflow and spawns a new
94/// thread to install a Tokio runtime "context"
95/// <https://stackoverflow.com/questions/62536566>
96///
97/// # Trouble Shooting:
98///
99/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread!
100///
101/// This means that IO was attempted on a tokio runtime that was not registered
102/// for IO. One solution is to run the task using [DedicatedExecutor::spawn].
103///
104/// ## "Cannot drop a runtime in a context where blocking is not allowed"`
105///
106/// If you try to use this structure from an async context you see something like
107/// thread 'plan::stringset::tests::test_builder_plan' panicked at 'Cannot
108/// drop a runtime in a context where blocking is not allowed. This
109/// happens when a runtime is dropped from within an asynchronous
110/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
111#[derive(Clone)]
112pub struct DedicatedExecutor {
113    state: Arc<RwLock<State>>,
114
115    /// Used for testing.
116    ///
117    /// This will ignore explicit shutdown requests.
118    testing: bool,
119}
120
121/// [`DedicatedExecutor`] for testing purposes.
122static TESTING_EXECUTOR: OnceLock<DedicatedExecutor> = OnceLock::new();
123
124impl DedicatedExecutor {
125    /// Creates a new `DedicatedExecutor` with a dedicated tokio
126    /// executor that is separate from the threadpool created via
127    /// `[tokio::main]` or similar.
128    ///
129    /// See the documentation on [`DedicatedExecutor`] for more details.
130    ///
131    /// If [`DedicatedExecutor::new`] is called from an existing tokio runtime,
132    /// it will assume that the existing runtime should be used for I/O, and is
133    /// thus set, via [`register_io_runtime`] by all threads spawned by the
134    /// executor. This will allow scheduling IO outside the context of
135    /// [`DedicatedExecutor`] using [`spawn_io`].
136    pub fn new(
137        name: &str,
138        config: ExecutionConfig,
139        runtime_builder: tokio::runtime::Builder,
140    ) -> Self {
141        Self::new_inner(name, config, runtime_builder, false)
142    }
143
144    fn new_inner(
145        name: &str,
146        config: ExecutionConfig,
147        runtime_builder: tokio::runtime::Builder,
148        testing: bool,
149    ) -> Self {
150        let name = name.to_owned();
151
152        let notify_shutdown = Arc::new(Notify::new());
153        let notify_shutdown_captured = Arc::clone(&notify_shutdown);
154
155        let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel();
156        let (tx_handle, rx_handle) = std::sync::mpsc::channel();
157
158        let io_handle = tokio::runtime::Handle::try_current().ok();
159        let thread = std::thread::Builder::new()
160            .name(format!("{name} driver"))
161            .spawn(move || {
162                // also register the IO runtime for the current thread, since it might be used as well (esp. for the
163                // current thread RT)
164                register_io_runtime(io_handle.clone());
165
166                info!(
167                    "Creating DedicatedExecutor with {} threads",
168                    config.dedicated_executor_threads
169                );
170
171                let mut runtime_builder = runtime_builder;
172                let runtime = runtime_builder
173                    .worker_threads(config.dedicated_executor_threads)
174                    .on_thread_start(move || register_io_runtime(io_handle.clone()))
175                    .build()
176                    .expect("Creating tokio runtime");
177
178                runtime.block_on(async move {
179                    // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread
180                    // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is
181                    // started right after the constructor finishes and the new runtime calls
182                    // `notify_shutdown_captured.notified().await`.
183                    //
184                    // Tokio provides an API for that by calling `enable` on the `notified` future (this requires
185                    // pinning though).
186                    let shutdown = notify_shutdown_captured.notified();
187                    let mut shutdown = std::pin::pin!(shutdown);
188                    shutdown.as_mut().enable();
189
190                    if tx_handle.send(Handle::current()).is_err() {
191                        return;
192                    }
193                    shutdown.await;
194                });
195
196                runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
197
198                // send shutdown "done" signal
199                tx_shutdown.send(()).ok();
200            })
201            .expect("executor setup");
202
203        let handle = rx_handle.recv().expect("driver started");
204
205        // Start tokio metrics collection for the dedicated executor runtime
206        #[cfg(feature = "observability")]
207        let metrics_collector = {
208            use crate::observability::TokioMetricsCollector;
209            use std::time::Duration;
210
211            Some(TokioMetricsCollector::start(
212                handle.clone(),
213                "cpu_runtime".to_string(),
214                Duration::from_secs(10),
215            ))
216        };
217
218        let state = State {
219            handle: Some(handle),
220            start_shutdown: notify_shutdown,
221            completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(),
222            thread: Some(thread),
223            #[cfg(feature = "observability")]
224            _metrics_collector: metrics_collector,
225        };
226
227        Self {
228            state: Arc::new(RwLock::new(state)),
229            testing,
230        }
231    }
232
233    /// Create new executor for testing purposes.
234    ///
235    /// Internal state may be shared with other tests.
236    pub fn new_testing() -> Self {
237        TESTING_EXECUTOR
238            .get_or_init(|| {
239                let mut runtime_builder = tokio::runtime::Builder::new_current_thread();
240
241                // only enable `time` but NOT the IO integration since IO shouldn't run on the DataFusion runtime
242                // See:
243                // - https://github.com/influxdata/influxdb_iox/issues/10803
244                // - https://github.com/influxdata/influxdb_iox/pull/11030
245                runtime_builder.enable_time();
246
247                Self::new_inner("testing", ExecutionConfig::default(), runtime_builder, true)
248            })
249            .clone()
250    }
251
252    /// Runs the specified [`Future`] (and any tasks it spawns) on the thread
253    /// pool managed by this `DedicatedExecutor`.
254    ///
255    /// # Notes
256    ///
257    /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when
258    /// it is dropped. Thus, you need ensure the returned future lives until it
259    /// completes (call `await`) or you wish to cancel it.
260    ///
261    /// Currently all tasks are added to the tokio executor immediately and
262    /// compete for the threadpool's resources.
263    pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>>
264    where
265        T: Future + Send + 'static,
266        T::Output: Send + 'static,
267    {
268        let handle = {
269            let state = self.state.read();
270            state.handle.clone()
271        };
272
273        let Some(handle) = handle else {
274            return futures::future::err(JobError::WorkerGone).boxed();
275        };
276
277        // use JoinSet implement "cancel on drop"
278        let mut join_set = JoinSet::new();
279        join_set.spawn_on(task, &handle);
280        async move {
281            join_set
282                .join_next()
283                .await
284                .expect("just spawned task")
285                .map_err(|e| match e.try_into_panic() {
286                    Ok(e) => {
287                        let s = if let Some(s) = e.downcast_ref::<String>() {
288                            s.clone()
289                        } else if let Some(s) = e.downcast_ref::<&str>() {
290                            s.to_string()
291                        } else {
292                            "unknown internal error".to_string()
293                        };
294
295                        JobError::Panic { msg: s }
296                    }
297                    Err(_) => JobError::WorkerGone,
298                })
299        }
300        .boxed()
301    }
302
303    /// signals shutdown of this executor and any Clones
304    pub fn shutdown(&self) {
305        if self.testing {
306            return;
307        }
308
309        // hang up the channel which will cause the dedicated thread
310        // to quit
311        let mut state = self.state.write();
312        state.handle = None;
313        state.start_shutdown.notify_one();
314    }
315
316    /// Stops all subsequent task executions, and waits for the worker
317    /// thread to complete. Note this will shutdown all clones of this
318    /// `DedicatedExecutor` as well.
319    ///
320    /// Only the first all to `join` will actually wait for the
321    /// executing thread to complete. All other calls to join will
322    /// complete immediately.
323    ///
324    /// # Panic / Drop
325    /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call
326    /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see
327    /// <https://github.com/rust-lang/futures-rs/issues/2575>.
328    pub async fn join(&self) {
329        if self.testing {
330            return;
331        }
332
333        self.shutdown();
334
335        // get handle mutex is held
336        let handle = {
337            let state = self.state.read();
338            state.completed_shutdown.clone()
339        };
340
341        // wait for completion while not holding the mutex to avoid
342        // deadlocks
343        handle.await.expect("Thread died?")
344    }
345}
346
347/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
348/// them) on a separate tokio Executor.
349///
350/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for
351/// [`start_shutdown`](Self::start_shutdown) and signals the completion via
352/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side).
353struct State {
354    /// Runtime handle.
355    ///
356    /// This is `None` when the executor is shutting down.
357    handle: Option<Handle>,
358
359    /// If notified, the executor tokio runtime will begin to shutdown.
360    ///
361    /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and
362    /// locking of the state. Just using a proper async signal is nicer.
363    start_shutdown: Arc<Notify>,
364
365    /// Receiver side indicating that shutdown is complete.
366    completed_shutdown: Shared<BoxFuture<'static, Result<(), Arc<RecvError>>>>,
367
368    /// The inner thread that can be used to join during drop.
369    thread: Option<std::thread::JoinHandle<()>>,
370
371    /// Tokio metrics collector for the dedicated executor runtime.
372    #[cfg(feature = "observability")]
373    _metrics_collector: Option<crate::observability::TokioMetricsCollector>,
374}
375
376// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones
377// share their inner state.
378impl Drop for State {
379    fn drop(&mut self) {
380        if self.handle.is_some() {
381            warn!("DedicatedExecutor dropped without calling shutdown()");
382            self.handle = None;
383            self.start_shutdown.notify_one();
384        }
385
386        // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575
387        if !std::thread::panicking() && self.completed_shutdown.clone().now_or_never().is_none() {
388            warn!("DedicatedExecutor dropped without waiting for worker termination",);
389        }
390
391        // join thread but don't care about the results
392        self.thread.take().expect("not dropped yet").join().ok();
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use crate::executor::io::spawn_io;
399
400    use super::*;
401    use std::{
402        panic::panic_any,
403        sync::{Arc, Barrier},
404        time::Duration,
405    };
406    use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier};
407
408    /// Wait for the barrier and then return `result`
409    async fn do_work(result: usize, barrier: Arc<Barrier>) -> usize {
410        barrier.wait();
411        result
412    }
413
414    /// Wait for the barrier and then return `result`
415    async fn do_work_async(result: usize, barrier: Arc<AsyncBarrier>) -> usize {
416        barrier.wait().await;
417        result
418    }
419
420    fn exec() -> DedicatedExecutor {
421        exec_with_threads(1)
422    }
423
424    fn exec2() -> DedicatedExecutor {
425        exec_with_threads(2)
426    }
427
428    fn exec_with_threads(threads: usize) -> DedicatedExecutor {
429        let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
430        runtime_builder.worker_threads(threads);
431        runtime_builder.enable_all();
432
433        DedicatedExecutor::new(
434            "Test DedicatedExecutor",
435            ExecutionConfig::default(),
436            runtime_builder,
437        )
438    }
439
440    async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) {
441        let io_runtime_id = std::thread::current().id();
442        dedicated
443            .spawn(async move {
444                let dedicated_id = std::thread::current().id();
445                let spawned = spawn_io(async move { std::thread::current().id() }).await;
446
447                assert_ne!(dedicated_id, spawned);
448                assert_eq!(io_runtime_id, spawned);
449            })
450            .await
451            .unwrap();
452    }
453
454    #[tokio::test]
455    async fn basic() {
456        let barrier = Arc::new(Barrier::new(2));
457
458        let exec = exec();
459        let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier)));
460
461        // Note the dedicated task will never complete if it runs on
462        // the main tokio thread (as this test is not using the
463        // 'multithreaded' version of the executor and the call to
464        // barrier.wait actually blocks the tokio thread)
465        barrier.wait();
466
467        // should be able to get the result
468        assert_eq!(dedicated_task.await.unwrap(), 42);
469
470        exec.join().await;
471    }
472
473    #[tokio::test]
474    async fn basic_clone() {
475        let barrier = Arc::new(Barrier::new(2));
476        let exec = exec();
477        // Run task on clone should work fine
478        let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier)));
479        barrier.wait();
480        assert_eq!(dedicated_task.await.unwrap(), 42);
481
482        exec.join().await;
483    }
484
485    #[tokio::test]
486    async fn drop_empty_exec() {
487        exec();
488    }
489
490    #[tokio::test]
491    async fn drop_clone() {
492        let barrier = Arc::new(Barrier::new(2));
493        let exec = exec();
494
495        drop(exec.clone());
496
497        let task = exec.spawn(do_work(42, Arc::clone(&barrier)));
498        barrier.wait();
499        assert_eq!(task.await.unwrap(), 42);
500
501        exec.join().await;
502    }
503
504    #[tokio::test]
505    #[should_panic(expected = "foo")]
506    async fn just_panic() {
507        struct S(DedicatedExecutor);
508
509        impl Drop for S {
510            fn drop(&mut self) {
511                self.0.join().now_or_never();
512            }
513        }
514
515        let exec = exec();
516        let _s = S(exec);
517
518        // this must not lead to a double-panic and SIGILL
519        panic!("foo")
520    }
521
522    #[tokio::test]
523    async fn multi_task() {
524        let barrier = Arc::new(Barrier::new(3));
525
526        // make an executor with two threads
527        let exec = exec2();
528        let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier)));
529        let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier)));
530
531        // block main thread until completion of other two tasks
532        barrier.wait();
533
534        // should be able to get the result
535        assert_eq!(dedicated_task1.await.unwrap(), 11);
536        assert_eq!(dedicated_task2.await.unwrap(), 42);
537
538        exec.join().await;
539    }
540
541    #[tokio::test]
542    async fn tokio_spawn() {
543        let exec = exec2();
544
545        // spawn a task that spawns to other tasks and ensure they run on the dedicated
546        // executor
547        let dedicated_task = exec.spawn(async move {
548            // spawn separate tasks
549            let t1 = tokio::task::spawn(async { 25usize });
550            t1.await.unwrap()
551        });
552
553        // Validate the inner task ran to completion (aka it did not panic)
554        assert_eq!(dedicated_task.await.unwrap(), 25);
555
556        exec.join().await;
557    }
558
559    #[tokio::test]
560    async fn panic_on_executor_str() {
561        let exec = exec();
562        let dedicated_task = exec.spawn(async move {
563            if true {
564                panic!("At the disco, on the dedicated task scheduler");
565            } else {
566                42
567            }
568        });
569
570        // should not be able to get the result
571        let err = dedicated_task.await.unwrap_err();
572        assert_eq!(
573            err.to_string(),
574            "Panic: At the disco, on the dedicated task scheduler",
575        );
576
577        exec.join().await;
578    }
579
580    #[tokio::test]
581    async fn panic_on_executor_string() {
582        let exec = exec();
583        let dedicated_task = exec.spawn(async move {
584            if true {
585                panic!("{} {}", 1, 2);
586            } else {
587                42
588            }
589        });
590
591        // should not be able to get the result
592        let err = dedicated_task.await.unwrap_err();
593        assert_eq!(err.to_string(), "Panic: 1 2",);
594
595        exec.join().await;
596    }
597
598    #[tokio::test]
599    async fn panic_on_executor_other() {
600        let exec = exec();
601        let dedicated_task = exec.spawn(async move {
602            if true {
603                panic_any(1)
604            } else {
605                42
606            }
607        });
608
609        // should not be able to get the result
610        let err = dedicated_task.await.unwrap_err();
611        assert_eq!(err.to_string(), "Panic: unknown internal error",);
612
613        exec.join().await;
614    }
615
616    #[tokio::test]
617    async fn executor_shutdown_while_task_running() {
618        let barrier_1 = Arc::new(Barrier::new(2));
619        let captured_1 = Arc::clone(&barrier_1);
620        let barrier_2 = Arc::new(Barrier::new(2));
621        let captured_2 = Arc::clone(&barrier_2);
622
623        let exec = exec();
624        let dedicated_task = exec.spawn(async move {
625            captured_1.wait();
626            do_work(42, captured_2).await
627        });
628        barrier_1.wait();
629
630        exec.shutdown();
631        // block main thread until completion of the outstanding task
632        barrier_2.wait();
633
634        // task should complete successfully
635        assert_eq!(dedicated_task.await.unwrap(), 42);
636
637        exec.join().await;
638    }
639
640    #[tokio::test]
641    async fn executor_submit_task_after_shutdown() {
642        let exec = exec();
643
644        // Simulate trying to submit tasks once executor has shutdown
645        exec.shutdown();
646        let dedicated_task = exec.spawn(async { 11 });
647
648        // task should complete, but return an error
649        let err = dedicated_task.await.unwrap_err();
650        assert_eq!(
651            err.to_string(),
652            "Worker thread gone, executor was likely shut down"
653        );
654
655        exec.join().await;
656    }
657
658    #[tokio::test]
659    async fn executor_submit_task_after_clone_shutdown() {
660        let exec = exec();
661
662        // shutdown the clone (but not the exec)
663        exec.clone().join().await;
664
665        // Simulate trying to submit tasks once executor has shutdown
666        let dedicated_task = exec.spawn(async { 11 });
667
668        // task should complete, but return an error
669        let err = dedicated_task.await.unwrap_err();
670        assert_eq!(
671            err.to_string(),
672            "Worker thread gone, executor was likely shut down"
673        );
674
675        exec.join().await;
676    }
677
678    #[tokio::test]
679    async fn executor_join() {
680        let exec = exec();
681        // test it doesn't hang
682        exec.join().await;
683    }
684
685    #[tokio::test]
686    async fn executor_join2() {
687        let exec = exec();
688        // test it doesn't hang
689        exec.join().await;
690        exec.join().await;
691    }
692
693    #[tokio::test]
694    #[allow(clippy::redundant_clone)]
695    async fn executor_clone_join() {
696        let exec = exec();
697        // test it doesn't hang
698        exec.clone().join().await;
699        exec.clone().join().await;
700        exec.join().await;
701    }
702
703    #[tokio::test]
704    async fn drop_receiver() {
705        // create empty executor
706        let exec = exec();
707
708        // create first blocked task
709        let barrier1_pre = Arc::new(AsyncBarrier::new(2));
710        let barrier1_pre_captured = Arc::clone(&barrier1_pre);
711        let barrier1_post = Arc::new(AsyncBarrier::new(2));
712        let barrier1_post_captured = Arc::clone(&barrier1_post);
713        let dedicated_task1 = exec.spawn(async move {
714            barrier1_pre_captured.wait().await;
715            do_work_async(11, barrier1_post_captured).await
716        });
717        barrier1_pre.wait().await;
718
719        // create second blocked task
720        let barrier2_pre = Arc::new(AsyncBarrier::new(2));
721        let barrier2_pre_captured = Arc::clone(&barrier2_pre);
722        let barrier2_post = Arc::new(AsyncBarrier::new(2));
723        let barrier2_post_captured = Arc::clone(&barrier2_post);
724        let dedicated_task2 = exec.spawn(async move {
725            barrier2_pre_captured.wait().await;
726            do_work_async(22, barrier2_post_captured).await
727        });
728        barrier2_pre.wait().await;
729
730        // cancel task
731        drop(dedicated_task1);
732
733        // cancelation might take a short while
734        tokio::time::timeout(Duration::from_secs(1), async {
735            loop {
736                if Arc::strong_count(&barrier1_post) == 1 {
737                    return;
738                }
739                tokio::time::sleep(Duration::from_millis(10)).await
740            }
741        })
742        .await
743        .unwrap();
744
745        // unblock other task
746        barrier2_post.wait().await;
747        assert_eq!(dedicated_task2.await.unwrap(), 22);
748        tokio::time::timeout(Duration::from_secs(1), async {
749            loop {
750                if Arc::strong_count(&barrier2_post) == 1 {
751                    return;
752                }
753                tokio::time::sleep(Duration::from_millis(10)).await
754            }
755        })
756        .await
757        .unwrap();
758
759        exec.join().await;
760    }
761
762    #[tokio::test]
763    async fn test_io_runtime_multi_thread() {
764        let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
765        runtime_builder.worker_threads(1);
766
767        let dedicated = DedicatedExecutor::new(
768            "Test DedicatedExecutor",
769            ExecutionConfig::default(),
770            runtime_builder,
771        );
772        test_io_runtime_multi_thread_impl(dedicated).await;
773    }
774
775    #[tokio::test]
776    async fn test_io_runtime_current_thread() {
777        let runtime_builder = tokio::runtime::Builder::new_current_thread();
778
779        let dedicated = DedicatedExecutor::new(
780            "Test DedicatedExecutor",
781            ExecutionConfig::default(),
782            runtime_builder,
783        );
784        test_io_runtime_multi_thread_impl(dedicated).await;
785    }
786
787    #[tokio::test]
788    async fn test_that_testing_executor_prevents_io() {
789        let exec = DedicatedExecutor::new_testing();
790
791        let io_disabled = exec
792            .spawn(async move {
793                // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics
794                TcpListener::bind("127.0.0.1:0")
795                    .catch_unwind()
796                    .await
797                    .is_err()
798            })
799            .await
800            .unwrap();
801
802        assert!(io_disabled)
803    }
804}