datafusion_dft/execution/executor/
dedicated.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    fmt::Display,
20    sync::{Arc, OnceLock},
21    time::Duration,
22};
23
24use futures::{
25    future::{BoxFuture, Shared},
26    Future, FutureExt, TryFutureExt,
27};
28use log::{info, warn};
29use parking_lot::RwLock;
30use tokio::{
31    runtime::Handle,
32    sync::{oneshot::error::RecvError, Notify},
33    task::JoinSet,
34};
35
36use crate::config::ExecutionConfig;
37
38use super::io::register_io_runtime;
39
40const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5);
41
42/// Errors occurring when polling [`DedicatedExecutor::spawn`].
43#[derive(Debug)]
44#[allow(missing_docs)]
45pub enum JobError {
46    WorkerGone,
47    Panic { msg: String },
48}
49
50impl Display for JobError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            JobError::WorkerGone => {
54                write!(f, "Worker thread gone, executor was likely shut down")
55            }
56            JobError::Panic { msg } => write!(f, "Panic: {}", msg),
57        }
58    }
59}
60
61/// Manages a separate tokio runtime (thread pool) for executing tasks.
62///
63/// A `DedicatedExecutor` runs futures (and any `tasks` that are
64/// `tokio::task::spawned` by them) on a separate tokio Executor
65///
66/// # Background
67///
68/// Tokio has the notion of the "current" runtime, which runs the current future
69/// and any tasks spawned by it. Typically, this is the runtime created by
70/// `tokio::main` and is used for the main application logic and I/O handling
71///
72/// For CPU bound work, such as DataFusion plan execution, it is important to
73/// run on a separate thread pool to avoid blocking the I/O handling for extended
74/// periods of time in order to avoid long poll latencies (which decreases the
75/// throughput of small requests under concurrent load).
76///
77/// # IO Scheduling
78///
79/// I/O, such as network calls, should not be performed on the runtime managed
80/// by [`DedicatedExecutor`]. As tokio is a cooperative scheduler, long-running
81/// CPU tasks will not be preempted and can therefore starve servicing of other
82/// tasks. This manifests in long poll-latencies, where a task is ready to run
83/// but isn't being scheduled to run. For CPU-bound work this isn't a problem as
84/// there is no external party waiting on a response, however, for I/O tasks,
85/// long poll latencies can prevent timely servicing of IO, which can have a
86/// significant detrimental effect.
87///
88/// # Details
89///
90/// The worker thread priority is set to low so that such tasks do
91/// not starve other more important tasks (such as answering health checks)
92///
93/// Follows the example from stack overflow and spawns a new
94/// thread to install a Tokio runtime "context"
95/// <https://stackoverflow.com/questions/62536566>
96///
97/// # Trouble Shooting:
98///
99/// ## "No IO runtime registered. Call `register_io_runtime`/`register_current_runtime_for_io` in current thread!
100///
101/// This means that IO was attempted on a tokio runtime that was not registered
102/// for IO. One solution is to run the task using [DedicatedExecutor::spawn].
103///
104/// ## "Cannot drop a runtime in a context where blocking is not allowed"`
105///
106/// If you try to use this structure from an async context you see something like
107/// thread 'plan::stringset::tests::test_builder_plan' panicked at 'Cannot
108/// drop a runtime in a context where blocking is not allowed. This
109/// happens when a runtime is dropped from within an asynchronous
110/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
111#[derive(Clone)]
112pub struct DedicatedExecutor {
113    state: Arc<RwLock<State>>,
114
115    /// Used for testing.
116    ///
117    /// This will ignore explicit shutdown requests.
118    testing: bool,
119}
120
121/// [`DedicatedExecutor`] for testing purposes.
122static TESTING_EXECUTOR: OnceLock<DedicatedExecutor> = OnceLock::new();
123
124impl DedicatedExecutor {
125    /// Creates a new `DedicatedExecutor` with a dedicated tokio
126    /// executor that is separate from the threadpool created via
127    /// `[tokio::main]` or similar.
128    ///
129    /// See the documentation on [`DedicatedExecutor`] for more details.
130    ///
131    /// If [`DedicatedExecutor::new`] is called from an existing tokio runtime,
132    /// it will assume that the existing runtime should be used for I/O, and is
133    /// thus set, via [`register_io_runtime`] by all threads spawned by the
134    /// executor. This will allow scheduling IO outside the context of
135    /// [`DedicatedExecutor`] using [`spawn_io`].
136    pub fn new(
137        name: &str,
138        config: ExecutionConfig,
139        runtime_builder: tokio::runtime::Builder,
140    ) -> Self {
141        Self::new_inner(name, config, runtime_builder, false)
142    }
143
144    fn new_inner(
145        name: &str,
146        config: ExecutionConfig,
147        runtime_builder: tokio::runtime::Builder,
148        testing: bool,
149    ) -> Self {
150        let name = name.to_owned();
151
152        let notify_shutdown = Arc::new(Notify::new());
153        let notify_shutdown_captured = Arc::clone(&notify_shutdown);
154
155        let (tx_shutdown, rx_shutdown) = tokio::sync::oneshot::channel();
156        let (tx_handle, rx_handle) = std::sync::mpsc::channel();
157
158        let io_handle = tokio::runtime::Handle::try_current().ok();
159        let thread = std::thread::Builder::new()
160            .name(format!("{name} driver"))
161            .spawn(move || {
162                // also register the IO runtime for the current thread, since it might be used as well (esp. for the
163                // current thread RT)
164                register_io_runtime(io_handle.clone());
165
166                info!(
167                    "Creating DedicatedExecutor with {} threads",
168                    config.dedicated_executor_threads
169                );
170
171                let mut runtime_builder = runtime_builder;
172                let runtime = runtime_builder
173                    .worker_threads(config.dedicated_executor_threads)
174                    .on_thread_start(move || register_io_runtime(io_handle.clone()))
175                    .build()
176                    .expect("Creating tokio runtime");
177
178                runtime.block_on(async move {
179                    // Enable the "notified" receiver BEFORE sending the runtime handle back to the constructor thread
180                    // (i.e .the one that runs `new`) to avoid the potential (but unlikely) race that the shutdown is
181                    // started right after the constructor finishes and the new runtime calls
182                    // `notify_shutdown_captured.notified().await`.
183                    //
184                    // Tokio provides an API for that by calling `enable` on the `notified` future (this requires
185                    // pinning though).
186                    let shutdown = notify_shutdown_captured.notified();
187                    let mut shutdown = std::pin::pin!(shutdown);
188                    shutdown.as_mut().enable();
189
190                    if tx_handle.send(Handle::current()).is_err() {
191                        return;
192                    }
193                    shutdown.await;
194                });
195
196                runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
197
198                // send shutdown "done" signal
199                tx_shutdown.send(()).ok();
200            })
201            .expect("executor setup");
202
203        let handle = rx_handle.recv().expect("driver started");
204
205        let state = State {
206            handle: Some(handle),
207            start_shutdown: notify_shutdown,
208            completed_shutdown: rx_shutdown.map_err(Arc::new).boxed().shared(),
209            thread: Some(thread),
210        };
211
212        Self {
213            state: Arc::new(RwLock::new(state)),
214            testing,
215        }
216    }
217
218    /// Create new executor for testing purposes.
219    ///
220    /// Internal state may be shared with other tests.
221    pub fn new_testing() -> Self {
222        TESTING_EXECUTOR
223            .get_or_init(|| {
224                let mut runtime_builder = tokio::runtime::Builder::new_current_thread();
225
226                // only enable `time` but NOT the IO integration since IO shouldn't run on the DataFusion runtime
227                // See:
228                // - https://github.com/influxdata/influxdb_iox/issues/10803
229                // - https://github.com/influxdata/influxdb_iox/pull/11030
230                runtime_builder.enable_time();
231
232                Self::new_inner("testing", ExecutionConfig::default(), runtime_builder, true)
233            })
234            .clone()
235    }
236
237    /// Runs the specified [`Future`] (and any tasks it spawns) on the thread
238    /// pool managed by this `DedicatedExecutor`.
239    ///
240    /// # Notes
241    ///
242    /// UNLIKE [`tokio::task::spawn`], the returned future is **cancelled** when
243    /// it is dropped. Thus, you need ensure the returned future lives until it
244    /// completes (call `await`) or you wish to cancel it.
245    ///
246    /// Currently all tasks are added to the tokio executor immediately and
247    /// compete for the threadpool's resources.
248    pub fn spawn<T>(&self, task: T) -> impl Future<Output = Result<T::Output, JobError>>
249    where
250        T: Future + Send + 'static,
251        T::Output: Send + 'static,
252    {
253        let handle = {
254            let state = self.state.read();
255            state.handle.clone()
256        };
257
258        let Some(handle) = handle else {
259            return futures::future::err(JobError::WorkerGone).boxed();
260        };
261
262        // use JoinSet implement "cancel on drop"
263        let mut join_set = JoinSet::new();
264        join_set.spawn_on(task, &handle);
265        async move {
266            join_set
267                .join_next()
268                .await
269                .expect("just spawned task")
270                .map_err(|e| match e.try_into_panic() {
271                    Ok(e) => {
272                        let s = if let Some(s) = e.downcast_ref::<String>() {
273                            s.clone()
274                        } else if let Some(s) = e.downcast_ref::<&str>() {
275                            s.to_string()
276                        } else {
277                            "unknown internal error".to_string()
278                        };
279
280                        JobError::Panic { msg: s }
281                    }
282                    Err(_) => JobError::WorkerGone,
283                })
284        }
285        .boxed()
286    }
287
288    /// signals shutdown of this executor and any Clones
289    pub fn shutdown(&self) {
290        if self.testing {
291            return;
292        }
293
294        // hang up the channel which will cause the dedicated thread
295        // to quit
296        let mut state = self.state.write();
297        state.handle = None;
298        state.start_shutdown.notify_one();
299    }
300
301    /// Stops all subsequent task executions, and waits for the worker
302    /// thread to complete. Note this will shutdown all clones of this
303    /// `DedicatedExecutor` as well.
304    ///
305    /// Only the first all to `join` will actually wait for the
306    /// executing thread to complete. All other calls to join will
307    /// complete immediately.
308    ///
309    /// # Panic / Drop
310    /// [`DedicatedExecutor`] implements shutdown on [`Drop`]. You should just use this behavior and NOT call
311    /// [`join`](Self::join) manually during [`Drop`] or panics because this might lead to another panic, see
312    /// <https://github.com/rust-lang/futures-rs/issues/2575>.
313    pub async fn join(&self) {
314        if self.testing {
315            return;
316        }
317
318        self.shutdown();
319
320        // get handle mutex is held
321        let handle = {
322            let state = self.state.read();
323            state.completed_shutdown.clone()
324        };
325
326        // wait for completion while not holding the mutex to avoid
327        // deadlocks
328        handle.await.expect("Thread died?")
329    }
330}
331
332/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
333/// them) on a separate tokio Executor.
334///
335/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for
336/// [`start_shutdown`](Self::start_shutdown) and signals the completion via
337/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side).
338struct State {
339    /// Runtime handle.
340    ///
341    /// This is `None` when the executor is shutting down.
342    handle: Option<Handle>,
343
344    /// If notified, the executor tokio runtime will begin to shutdown.
345    ///
346    /// We could implement this by checking `handle.is_none()` in regular intervals but requires regular wake-ups and
347    /// locking of the state. Just using a proper async signal is nicer.
348    start_shutdown: Arc<Notify>,
349
350    /// Receiver side indicating that shutdown is complete.
351    completed_shutdown: Shared<BoxFuture<'static, Result<(), Arc<RecvError>>>>,
352
353    /// The inner thread that can be used to join during drop.
354    thread: Option<std::thread::JoinHandle<()>>,
355}
356
357// IMPORTANT: Implement `Drop` for `State`, NOT for `DedicatedExecutor`, because the executor can be cloned and clones
358// share their inner state.
359impl Drop for State {
360    fn drop(&mut self) {
361        if self.handle.is_some() {
362            warn!("DedicatedExecutor dropped without calling shutdown()");
363            self.handle = None;
364            self.start_shutdown.notify_one();
365        }
366
367        // do NOT poll the shared future if we are panicking due to https://github.com/rust-lang/futures-rs/issues/2575
368        if !std::thread::panicking() && self.completed_shutdown.clone().now_or_never().is_none() {
369            warn!("DedicatedExecutor dropped without waiting for worker termination",);
370        }
371
372        // join thread but don't care about the results
373        self.thread.take().expect("not dropped yet").join().ok();
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use crate::execution::executor::io::spawn_io;
380
381    use super::*;
382    use std::{
383        panic::panic_any,
384        sync::{Arc, Barrier},
385        time::Duration,
386    };
387    use tokio::{net::TcpListener, sync::Barrier as AsyncBarrier};
388
389    /// Wait for the barrier and then return `result`
390    async fn do_work(result: usize, barrier: Arc<Barrier>) -> usize {
391        barrier.wait();
392        result
393    }
394
395    /// Wait for the barrier and then return `result`
396    async fn do_work_async(result: usize, barrier: Arc<AsyncBarrier>) -> usize {
397        barrier.wait().await;
398        result
399    }
400
401    fn exec() -> DedicatedExecutor {
402        exec_with_threads(1)
403    }
404
405    fn exec2() -> DedicatedExecutor {
406        exec_with_threads(2)
407    }
408
409    fn exec_with_threads(threads: usize) -> DedicatedExecutor {
410        let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
411        runtime_builder.worker_threads(threads);
412        runtime_builder.enable_all();
413
414        DedicatedExecutor::new(
415            "Test DedicatedExecutor",
416            ExecutionConfig::default(),
417            runtime_builder,
418        )
419    }
420
421    async fn test_io_runtime_multi_thread_impl(dedicated: DedicatedExecutor) {
422        let io_runtime_id = std::thread::current().id();
423        dedicated
424            .spawn(async move {
425                let dedicated_id = std::thread::current().id();
426                let spawned = spawn_io(async move { std::thread::current().id() }).await;
427
428                assert_ne!(dedicated_id, spawned);
429                assert_eq!(io_runtime_id, spawned);
430            })
431            .await
432            .unwrap();
433    }
434
435    #[tokio::test]
436    async fn basic() {
437        let barrier = Arc::new(Barrier::new(2));
438
439        let exec = exec();
440        let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier)));
441
442        // Note the dedicated task will never complete if it runs on
443        // the main tokio thread (as this test is not using the
444        // 'multithreaded' version of the executor and the call to
445        // barrier.wait actually blocks the tokio thread)
446        barrier.wait();
447
448        // should be able to get the result
449        assert_eq!(dedicated_task.await.unwrap(), 42);
450
451        exec.join().await;
452    }
453
454    #[tokio::test]
455    async fn basic_clone() {
456        let barrier = Arc::new(Barrier::new(2));
457        let exec = exec();
458        // Run task on clone should work fine
459        let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier)));
460        barrier.wait();
461        assert_eq!(dedicated_task.await.unwrap(), 42);
462
463        exec.join().await;
464    }
465
466    #[tokio::test]
467    async fn drop_empty_exec() {
468        exec();
469    }
470
471    #[tokio::test]
472    async fn drop_clone() {
473        let barrier = Arc::new(Barrier::new(2));
474        let exec = exec();
475
476        drop(exec.clone());
477
478        let task = exec.spawn(do_work(42, Arc::clone(&barrier)));
479        barrier.wait();
480        assert_eq!(task.await.unwrap(), 42);
481
482        exec.join().await;
483    }
484
485    #[tokio::test]
486    #[should_panic(expected = "foo")]
487    async fn just_panic() {
488        struct S(DedicatedExecutor);
489
490        impl Drop for S {
491            fn drop(&mut self) {
492                self.0.join().now_or_never();
493            }
494        }
495
496        let exec = exec();
497        let _s = S(exec);
498
499        // this must not lead to a double-panic and SIGILL
500        panic!("foo")
501    }
502
503    #[tokio::test]
504    async fn multi_task() {
505        let barrier = Arc::new(Barrier::new(3));
506
507        // make an executor with two threads
508        let exec = exec2();
509        let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier)));
510        let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier)));
511
512        // block main thread until completion of other two tasks
513        barrier.wait();
514
515        // should be able to get the result
516        assert_eq!(dedicated_task1.await.unwrap(), 11);
517        assert_eq!(dedicated_task2.await.unwrap(), 42);
518
519        exec.join().await;
520    }
521
522    #[tokio::test]
523    async fn tokio_spawn() {
524        let exec = exec2();
525
526        // spawn a task that spawns to other tasks and ensure they run on the dedicated
527        // executor
528        let dedicated_task = exec.spawn(async move {
529            // spawn separate tasks
530            let t1 = tokio::task::spawn(async { 25usize });
531            t1.await.unwrap()
532        });
533
534        // Validate the inner task ran to completion (aka it did not panic)
535        assert_eq!(dedicated_task.await.unwrap(), 25);
536
537        exec.join().await;
538    }
539
540    #[tokio::test]
541    async fn panic_on_executor_str() {
542        let exec = exec();
543        let dedicated_task = exec.spawn(async move {
544            if true {
545                panic!("At the disco, on the dedicated task scheduler");
546            } else {
547                42
548            }
549        });
550
551        // should not be able to get the result
552        let err = dedicated_task.await.unwrap_err();
553        assert_eq!(
554            err.to_string(),
555            "Panic: At the disco, on the dedicated task scheduler",
556        );
557
558        exec.join().await;
559    }
560
561    #[tokio::test]
562    async fn panic_on_executor_string() {
563        let exec = exec();
564        let dedicated_task = exec.spawn(async move {
565            if true {
566                panic!("{} {}", 1, 2);
567            } else {
568                42
569            }
570        });
571
572        // should not be able to get the result
573        let err = dedicated_task.await.unwrap_err();
574        assert_eq!(err.to_string(), "Panic: 1 2",);
575
576        exec.join().await;
577    }
578
579    #[tokio::test]
580    async fn panic_on_executor_other() {
581        let exec = exec();
582        let dedicated_task = exec.spawn(async move {
583            if true {
584                panic_any(1)
585            } else {
586                42
587            }
588        });
589
590        // should not be able to get the result
591        let err = dedicated_task.await.unwrap_err();
592        assert_eq!(err.to_string(), "Panic: unknown internal error",);
593
594        exec.join().await;
595    }
596
597    #[tokio::test]
598    async fn executor_shutdown_while_task_running() {
599        let barrier_1 = Arc::new(Barrier::new(2));
600        let captured_1 = Arc::clone(&barrier_1);
601        let barrier_2 = Arc::new(Barrier::new(2));
602        let captured_2 = Arc::clone(&barrier_2);
603
604        let exec = exec();
605        let dedicated_task = exec.spawn(async move {
606            captured_1.wait();
607            do_work(42, captured_2).await
608        });
609        barrier_1.wait();
610
611        exec.shutdown();
612        // block main thread until completion of the outstanding task
613        barrier_2.wait();
614
615        // task should complete successfully
616        assert_eq!(dedicated_task.await.unwrap(), 42);
617
618        exec.join().await;
619    }
620
621    #[tokio::test]
622    async fn executor_submit_task_after_shutdown() {
623        let exec = exec();
624
625        // Simulate trying to submit tasks once executor has shutdown
626        exec.shutdown();
627        let dedicated_task = exec.spawn(async { 11 });
628
629        // task should complete, but return an error
630        let err = dedicated_task.await.unwrap_err();
631        assert_eq!(
632            err.to_string(),
633            "Worker thread gone, executor was likely shut down"
634        );
635
636        exec.join().await;
637    }
638
639    #[tokio::test]
640    async fn executor_submit_task_after_clone_shutdown() {
641        let exec = exec();
642
643        // shutdown the clone (but not the exec)
644        exec.clone().join().await;
645
646        // Simulate trying to submit tasks once executor has shutdown
647        let dedicated_task = exec.spawn(async { 11 });
648
649        // task should complete, but return an error
650        let err = dedicated_task.await.unwrap_err();
651        assert_eq!(
652            err.to_string(),
653            "Worker thread gone, executor was likely shut down"
654        );
655
656        exec.join().await;
657    }
658
659    #[tokio::test]
660    async fn executor_join() {
661        let exec = exec();
662        // test it doesn't hang
663        exec.join().await;
664    }
665
666    #[tokio::test]
667    async fn executor_join2() {
668        let exec = exec();
669        // test it doesn't hang
670        exec.join().await;
671        exec.join().await;
672    }
673
674    #[tokio::test]
675    #[allow(clippy::redundant_clone)]
676    async fn executor_clone_join() {
677        let exec = exec();
678        // test it doesn't hang
679        exec.clone().join().await;
680        exec.clone().join().await;
681        exec.join().await;
682    }
683
684    #[tokio::test]
685    async fn drop_receiver() {
686        // create empty executor
687        let exec = exec();
688
689        // create first blocked task
690        let barrier1_pre = Arc::new(AsyncBarrier::new(2));
691        let barrier1_pre_captured = Arc::clone(&barrier1_pre);
692        let barrier1_post = Arc::new(AsyncBarrier::new(2));
693        let barrier1_post_captured = Arc::clone(&barrier1_post);
694        let dedicated_task1 = exec.spawn(async move {
695            barrier1_pre_captured.wait().await;
696            do_work_async(11, barrier1_post_captured).await
697        });
698        barrier1_pre.wait().await;
699
700        // create second blocked task
701        let barrier2_pre = Arc::new(AsyncBarrier::new(2));
702        let barrier2_pre_captured = Arc::clone(&barrier2_pre);
703        let barrier2_post = Arc::new(AsyncBarrier::new(2));
704        let barrier2_post_captured = Arc::clone(&barrier2_post);
705        let dedicated_task2 = exec.spawn(async move {
706            barrier2_pre_captured.wait().await;
707            do_work_async(22, barrier2_post_captured).await
708        });
709        barrier2_pre.wait().await;
710
711        // cancel task
712        drop(dedicated_task1);
713
714        // cancelation might take a short while
715        tokio::time::timeout(Duration::from_secs(1), async {
716            loop {
717                if Arc::strong_count(&barrier1_post) == 1 {
718                    return;
719                }
720                tokio::time::sleep(Duration::from_millis(10)).await
721            }
722        })
723        .await
724        .unwrap();
725
726        // unblock other task
727        barrier2_post.wait().await;
728        assert_eq!(dedicated_task2.await.unwrap(), 22);
729        tokio::time::timeout(Duration::from_secs(1), async {
730            loop {
731                if Arc::strong_count(&barrier2_post) == 1 {
732                    return;
733                }
734                tokio::time::sleep(Duration::from_millis(10)).await
735            }
736        })
737        .await
738        .unwrap();
739
740        exec.join().await;
741    }
742
743    #[tokio::test]
744    async fn test_io_runtime_multi_thread() {
745        let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
746        runtime_builder.worker_threads(1);
747
748        let dedicated = DedicatedExecutor::new(
749            "Test DedicatedExecutor",
750            ExecutionConfig::default(),
751            runtime_builder,
752        );
753        test_io_runtime_multi_thread_impl(dedicated).await;
754    }
755
756    #[tokio::test]
757    async fn test_io_runtime_current_thread() {
758        let runtime_builder = tokio::runtime::Builder::new_current_thread();
759
760        let dedicated = DedicatedExecutor::new(
761            "Test DedicatedExecutor",
762            ExecutionConfig::default(),
763            runtime_builder,
764        );
765        test_io_runtime_multi_thread_impl(dedicated).await;
766    }
767
768    #[tokio::test]
769    async fn test_that_testing_executor_prevents_io() {
770        let exec = DedicatedExecutor::new_testing();
771
772        let io_disabled = exec
773            .spawn(async move {
774                // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics
775                TcpListener::bind("127.0.0.1:0")
776                    .catch_unwind()
777                    .await
778                    .is_err()
779            })
780            .await
781            .unwrap();
782
783        assert!(io_disabled)
784    }
785}