tor_rtmock/
task.rs

1//! Executor for running tests with mocked environment
2//!
3//! See [`MockExecutor`]
4
5use std::any::Any;
6use std::cell::Cell;
7use std::collections::VecDeque;
8use std::fmt::{self, Debug, Display};
9use std::future::Future;
10use std::io::{self, Write as _};
11use std::iter;
12use std::mem;
13use std::panic::{catch_unwind, panic_any, AssertUnwindSafe};
14use std::pin::Pin;
15use std::sync::{Arc, Mutex, MutexGuard, Weak};
16use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
17
18use futures::future::Map;
19use futures::pin_mut;
20use futures::task::{FutureObj, Spawn, SpawnError};
21use futures::FutureExt as _;
22
23use assert_matches::assert_matches;
24use educe::Educe;
25use itertools::Either;
26use itertools::{chain, izip};
27use slotmap_careful::DenseSlotMap;
28use std::backtrace::Backtrace;
29use strum::EnumIter;
30
31// NB: when using traced_test, the trace! and error! output here is generally suppressed
32// in tests of other crates.  To see it, you can write something like this
33// (in the dev-dependencies of the crate whose tests you're running):
34//    tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
35use tracing::{error, trace};
36
37use oneshot_fused_workaround::{self as oneshot, Canceled, Receiver};
38use tor_error::error_report;
39use tor_rtcompat::{Blocking, ToplevelBlockOn};
40
41use Poll::*;
42use TaskState::*;
43
44/// Type-erased future, one for each of our (normal) tasks
45type TaskFuture = FutureObj<'static, ()>;
46
47/// Future for the argument to `block_on`, which is handled specially
48type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
49
50//---------- principal data structures ----------
51
52/// Executor for running tests with mocked environment
53///
54/// For test cases which don't actually wait for anything in the real world.
55///
56/// This is the executor.
57/// It implements [`Spawn`] and [`ToplevelBlockOn`]
58///
59/// It will usually be used as part of a `MockRuntime`.
60///
61/// To run futures, call [`ToplevelBlockOn::block_on`]
62///
63/// # Restricted environment
64///
65/// Tests run with this executor must not attempt to block
66/// on anything "outside":
67/// every future that anything awaits must (eventually) be woken directly
68/// *by some other task* in the same test case.
69///
70/// (By directly we mean that the [`Waker::wake`] call is made
71/// by that waking future, before that future itself awaits anything.)
72///
73/// # Panics
74///
75/// The executor will panic
76/// if the toplevel future (passed to `block_on`)
77/// doesn't complete (without externally blocking),
78/// but instead waits for something.
79///
80/// The executor will malfunction or panic if reentered.
81/// (Eg, if `block_on` is reentered.)
82#[derive(Clone, Default, Educe)]
83#[educe(Debug)]
84pub struct MockExecutor {
85    /// Mutable state
86    #[educe(Debug(ignore))]
87    shared: Arc<Shared>,
88}
89
90/// Shared state and ancillary information
91///
92/// This is always within an `Arc`.
93#[derive(Default)]
94struct Shared {
95    /// Shared state
96    data: Mutex<Data>,
97    /// Condition variable for thread scheduling
98    ///
99    /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
100    /// is modified.
101    thread_condvar: std::sync::Condvar,
102}
103
104/// Task id, module to hide `Ti` alias
105mod task_id {
106    slotmap_careful::new_key_type! {
107        /// Task ID, usually called `TaskId`
108        ///
109        /// Short name in special `task_id` module so that [`Debug`] is nice
110        pub(super) struct Ti;
111    }
112}
113use task_id::Ti as TaskId;
114
115/// Executor's state
116///
117/// ### Task state machine
118///
119/// A task is created in `tasks`, `Awake`, so also in `awake`.
120///
121/// When we poll it, we take it out of `awake` and set it to `Asleep`,
122/// and then call `poll()`.
123/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
124/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
125///
126/// The task's future is of course also present here in this data structure.
127/// However, during poll we must release the lock,
128/// so we cannot borrow the future from `Data`.
129/// Instead, we move it out.  So `Task.fut` is an `Option`.
130///
131/// ### "Main" task - the argument to `block_on`
132///
133/// The signature of `BlockOn::block_on` accepts a non-`'static` future
134/// (and a non-`Send`/`Sync` one).
135///
136/// So we cannot store that future in `Data` because `Data` is `'static`.
137/// Instead, this main task future is passed as an argument down the call stack.
138/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
139#[derive(Educe, derive_more::Debug)]
140#[educe(Default)]
141struct Data {
142    /// Tasks
143    ///
144    /// Includes tasks spawned with `spawn`,
145    /// and also the future passed to `block_on`.
146    #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
147    tasks: DenseSlotMap<TaskId, Task>,
148
149    /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
150    ///
151    /// Tasks are pushed onto the *back* when woken,
152    /// so back is the most recently woken.
153    #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
154    awake: VecDeque<TaskId>,
155
156    /// If a future from `progress_until_stalled` exists
157    progressing_until_stalled: Option<ProgressingUntilStalled>,
158
159    /// Scheduling policy
160    scheduling: SchedulingPolicy,
161
162    /// (Sub)thread we want to run now
163    ///
164    /// At any one time only one thread is meant to be running.
165    /// Other threads are blocked in condvar wait, waiting for this to change.
166    ///
167    /// **Modified only** within
168    /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
169    /// which takes responsibility for preserving the following **invariants**:
170    ///
171    ///  1. no-one but the named thread is allowed to modify this field.
172    ///  2. after modifying this field, signal `thread_condvar`
173    #[educe(Default(expression = "ThreadDescriptor::Executor"))]
174    thread_to_run: ThreadDescriptor,
175}
176
177/// How we should schedule?
178#[derive(Debug, Clone, Default, EnumIter)]
179#[non_exhaustive]
180pub enum SchedulingPolicy {
181    /// Task *most* recently woken is run
182    ///
183    /// This is the default.
184    ///
185    /// It will expose starvation bugs if a task never sleeps.
186    /// (Which is a good thing in tests.)
187    #[default]
188    Stack,
189    /// Task *least* recently woken is run.
190    Queue,
191}
192
193/// Record of a single task
194///
195/// Tracks a spawned task, or the main task (the argument to `block_on`).
196///
197/// Stored in [`Data`]`.tasks`.
198struct Task {
199    /// For debugging output
200    desc: String,
201    /// Has this been woken via a waker?  (And is it in `Data.awake`?)
202    ///
203    /// **Set to `Awake` only by [`Task::set_awake`]**,
204    /// preserving the invariant that
205    /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
206    state: TaskState,
207    /// The actual future (or a placeholder for it)
208    ///
209    /// May be `None` because we've temporarily moved it out so we can poll it,
210    /// or if this is a Subthread task which is currently running sync code
211    /// (in which case we're blocked in the executor waiting to be
212    /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
213    fut: Option<TaskFutureInfo>,
214    /// Is this task actually a [`Subthread`](MockExecutor::subthread_spawn)?
215    ///
216    /// Subthread tasks do not end when `fut` is `Ready` -
217    /// instead, `fut` is `Some` when the thread is within `subthread_block_on_future`.
218    /// The rest of the time this is `None`, but we don't run the executor,
219    /// because `Data.thread_to_run` is `ThreadDescriptor::Task(this_task)`.
220    is_subthread: Option<IsSubthread>,
221}
222
223/// A future as stored in our record of a [`Task`]
224enum TaskFutureInfo {
225    /// The [`Future`].  All is normal.
226    Normal(TaskFuture),
227    /// The future isn't here because this task is the main future for `block_on`
228    Main,
229}
230
231/// State of a task - do we think it needs to be polled?
232///
233/// Stored in [`Task`]`.state`.
234#[derive(Debug)]
235enum TaskState {
236    /// Awake - needs to be polled
237    ///
238    /// Established by [`waker.wake()`](Waker::wake)
239    Awake,
240    /// Asleep - does *not* need to be polled
241    ///
242    /// Established each time just before we call the future's [`poll`](Future::poll)
243    Asleep(Vec<SleepLocation>),
244}
245
246/// Actual implementor of `Wake` for use in a `Waker`
247///
248/// Futures (eg, channels from [`futures`]) will use this to wake a task
249/// when it should be polled.
250///
251/// This type must not be `Cloned` with the `Data` lock held.
252/// Consequently, a `Waker` mustn't either.
253struct ActualWaker {
254    /// Executor state
255    ///
256    /// The Waker mustn't to hold a strong reference to the executor,
257    /// since typically a task holds a future that holds a Waker,
258    /// and the executor holds the task - so that would be a cycle.
259    data: Weak<Shared>,
260
261    /// Which task this is
262    id: TaskId,
263}
264
265/// State used for an in-progress call to
266/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
267///
268/// If present in [`Data`], an (async) call to `progress_until_stalled`
269/// is in progress.
270///
271/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
272/// is a normal-ish future.
273/// It can be polled in the normal way.
274/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
275///
276/// The future is made ready, and woken (via `waker`),
277/// by bespoke code in the task executor loop.
278///
279/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
280/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
281#[derive(Debug)]
282struct ProgressingUntilStalled {
283    /// Have we, in fact, stalled?
284    ///
285    /// Made `Ready` by special code in the executor loop
286    finished: Poll<()>,
287
288    /// Waker
289    ///
290    /// Signalled by special code in the executor loop
291    waker: Option<Waker>,
292}
293
294/// Future from
295/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
296///
297/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
298///
299/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
300/// There can only be one at a time.
301#[derive(Educe)]
302#[educe(Debug)]
303struct ProgressUntilStalledFuture {
304    /// Executor's state; this future's state is in `.progressing_until_stalled`
305    #[educe(Debug(ignore))]
306    shared: Arc<Shared>,
307}
308
309/// Identifies a thread we know about - the executor thread, or a Subthread
310///
311/// Not related to `std::thread::ThreadId`.
312///
313/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
314///
315/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
316/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
317/// That's OK (and documented).
318#[derive(Copy, Clone, Eq, PartialEq, derive_more::Debug)]
319enum ThreadDescriptor {
320    /// Foreign - neither the (running) executor, nor a Subthread
321    #[debug("FOREIGN")]
322    Foreign,
323    /// The executor.
324    #[debug("Exe")]
325    Executor,
326    /// This task, which is a Subthread.
327    #[debug("{_0:?}")]
328    Subthread(TaskId),
329}
330
331/// Marker indicating that this task is a Subthread, not an async task.
332///
333/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
334#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
335struct IsSubthread;
336
337thread_local! {
338    /// Identifies this thread.
339    pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
340        Cell::new(ThreadDescriptor::Foreign)
341    };
342}
343
344//---------- creation ----------
345
346impl MockExecutor {
347    /// Make a `MockExecutor` with default parameters
348    pub fn new() -> Self {
349        Self::default()
350    }
351
352    /// Make a `MockExecutor` with a specific `SchedulingPolicy`
353    pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
354        Data {
355            scheduling,
356            ..Default::default()
357        }
358        .into()
359    }
360}
361
362impl From<Data> for MockExecutor {
363    fn from(data: Data) -> MockExecutor {
364        let shared = Shared {
365            data: Mutex::new(data),
366            thread_condvar: std::sync::Condvar::new(),
367        };
368        MockExecutor {
369            shared: Arc::new(shared),
370        }
371    }
372}
373
374//---------- spawning ----------
375
376impl MockExecutor {
377    /// Spawn a task and return something to identify it
378    ///
379    /// `desc` should `Display` as some kind of short string (ideally without spaces)
380    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
381    ///
382    /// The returned value is an opaque task identifier which is very cheap to clone
383    /// and which can be used by the caller in debug logging,
384    /// if it's desired to correlate with the debug output from `MockExecutor`.
385    /// Most callers will want to ignore it.
386    ///
387    /// This method is infallible.  (The `MockExecutor` cannot be shut down.)
388    pub fn spawn_identified(
389        &self,
390        desc: impl Display,
391        fut: impl Future<Output = ()> + Send + 'static,
392    ) -> impl Debug + Clone + Send + 'static {
393        self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
394    }
395
396    /// Spawn a task and return its output for further usage
397    ///
398    /// `desc` should `Display` as some kind of short string (ideally without spaces)
399    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
400    pub fn spawn_join<T: Debug + Send + 'static>(
401        &self,
402        desc: impl Display,
403        fut: impl Future<Output = T> + Send + 'static,
404    ) -> impl Future<Output = T> {
405        let (tx, rx) = oneshot::channel();
406        self.spawn_identified(desc, async move {
407            let res = fut.await;
408            tx.send(res)
409                .expect("Failed to send future's output, did future panic?");
410        });
411        rx.map(|m| m.expect("Failed to receive future's output"))
412    }
413
414    /// Spawn a task and return its `TaskId`
415    ///
416    /// Convenience method for use by `spawn_identified` and `spawn_obj`.
417    /// The future passed to `block_on` is not handled here.
418    fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
419        let mut data = self.shared.lock();
420        data.insert_task(desc, TaskFutureInfo::Normal(fut), None)
421    }
422}
423
424impl Data {
425    /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
426    fn insert_task(
427        &mut self,
428        desc: String,
429        fut: TaskFutureInfo,
430        is_subthread: Option<IsSubthread>,
431    ) -> TaskId {
432        let state = Awake;
433        let id = self.tasks.insert(Task {
434            state,
435            desc,
436            fut: Some(fut),
437            is_subthread,
438        });
439        self.awake.push_back(id);
440        trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
441        id
442    }
443}
444
445impl Spawn for MockExecutor {
446    fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
447        self.spawn_internal("spawn_obj".into(), future);
448        Ok(())
449    }
450}
451
452impl MockExecutor {
453    /// Implementation of `spawn_blocking` and `blocking_io`
454    fn spawn_thread_inner<F, T>(&self, f: F) -> <Self as Blocking>::ThreadHandle<T>
455    where
456        F: FnOnce() -> T + Send + 'static,
457        T: Send + 'static,
458    {
459        // For the mock executor, everything runs on the same thread.
460        // If we need something more complex in the future, we can change this.
461        let (tx, rx) = oneshot::channel();
462        self.spawn_identified("Blocking".to_string(), async move {
463            match tx.send(f()) {
464                Ok(()) => (),
465                Err(_) => panic!("Failed to send future's output, did future panic?"),
466            }
467        });
468        rx.map(Box::new(|m| m.expect("Failed to receive future's output")))
469    }
470}
471
472impl Blocking for MockExecutor {
473    type ThreadHandle<T: Send + 'static> =
474        Map<Receiver<T>, Box<dyn FnOnce(Result<T, Canceled>) -> T>>;
475
476    fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
477    where
478        F: FnOnce() -> T + Send + 'static,
479        T: Send + 'static,
480    {
481        assert_matches!(
482            THREAD_DESCRIPTOR.get(),
483            ThreadDescriptor::Executor | ThreadDescriptor::Subthread(_),
484 "MockExecutor::spawn_blocking_io only allowed from future or subthread, being run by this executor"
485        );
486        self.spawn_thread_inner(f)
487    }
488
489    fn reenter_block_on<F>(&self, future: F) -> F::Output
490    where
491        F: Future + Send + 'static,
492        F::Output: Send + 'static,
493    {
494        self.subthread_block_on_future(future)
495    }
496
497    fn blocking_io<F, T>(&self, f: F) -> impl Future<Output = T>
498    where
499        F: FnOnce() -> T + Send + 'static,
500        T: Send + 'static,
501    {
502        assert_eq!(
503            THREAD_DESCRIPTOR.get(),
504            ThreadDescriptor::Executor,
505            "MockExecutor::blocking_io only allowed from future being polled by this executor"
506        );
507        self.spawn_thread_inner(f)
508    }
509}
510
511//---------- block_on ----------
512
513impl ToplevelBlockOn for MockExecutor {
514    fn block_on<F>(&self, input_fut: F) -> F::Output
515    where
516        F: Future,
517    {
518        let mut value: Option<F::Output> = None;
519
520        // Box this just so that we can conveniently control precisely when it's dropped.
521        // (We could do this with Option and Pin::set but that seems clumsier.)
522        let mut input_fut = Box::pin(input_fut);
523
524        let run_store_fut = {
525            let value = &mut value;
526            let input_fut = &mut input_fut;
527            async {
528                trace!("MockExecutor block_on future...");
529                let t = input_fut.await;
530                trace!("MockExecutor block_on future returned...");
531                *value = Some(t);
532                trace!("MockExecutor block_on future exiting.");
533            }
534        };
535
536        {
537            pin_mut!(run_store_fut);
538
539            let main_id = self
540                .shared
541                .lock()
542                .insert_task("main".into(), TaskFutureInfo::Main, None);
543            trace!("MockExecutor {main_id:?} is task for block_on");
544            self.execute_to_completion(run_store_fut);
545        }
546
547        #[allow(clippy::let_and_return)] // clarity
548        let value = value.take().unwrap_or_else(|| {
549            // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
550            // use the latter, so that the debug dump is prefixed by this message.
551            let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
552            // write to tracing too, so the tracing log is clear about when we crashed
553            error!("all futures blocked, crashing...");
554
555            // Sequencing here is subtle.
556            //
557            // We should do the dump before dropping the input future, because the input
558            // future is likely to own things that, when dropped, wake up other tasks,
559            // rendering the dump inaccurate.
560            //
561            // But also, dropping the input future may well drop a ProgressUntilStalledFuture
562            // which then reenters us.  More generally, we mustn't call user code
563            // with the lock held.
564            //
565            // And, we mustn't panic with the data lock held.
566            //
567            // If value was Some, then this closure is dropped without being called,
568            // which drops the future after it has yielded the value, which is correct.
569            {
570                let mut data = self.shared.lock();
571                data.debug_dump();
572            }
573            drop(input_fut);
574
575            panic!(
576                r"
577all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
578"
579            );
580        });
581
582        value
583    }
584}
585
586//---------- execution - core implementation ----------
587
588impl MockExecutor {
589    /// Keep polling tasks until nothing more can be done
590    ///
591    /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
592    fn execute_to_completion(&self, mut main_fut: MainFuture) {
593        trace!("MockExecutor execute_to_completion...");
594        loop {
595            self.execute_until_first_stall(main_fut.as_mut());
596
597            // Handle `progressing_until_stalled`
598            let pus_waker = {
599                let mut data = self.shared.lock();
600                let pus = &mut data.progressing_until_stalled;
601                trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
602                let Some(pus) = pus else {
603                    // No progressing_until_stalled, we're actually done.
604                    break;
605                };
606                assert_eq!(
607                    pus.finished, Pending,
608                    "ProgressingUntilStalled finished twice?!"
609                );
610                pus.finished = Ready(());
611
612                // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
613                let waker = pus
614                    .waker
615                    .take()
616                    .expect("ProgressUntilStalledFuture not ever polled!");
617                drop(data);
618                let waker_copy = waker.clone();
619                let mut data = self.shared.lock();
620
621                let pus = &mut data.progressing_until_stalled;
622                if let Some(double) = mem::replace(
623                    &mut pus
624                        .as_mut()
625                        .expect("progressing_until_stalled updated under our feet!")
626                        .waker,
627                    Some(waker),
628                ) {
629                    panic!("double progressing_until_stalled.waker! {double:?}");
630                }
631
632                waker_copy
633            };
634            pus_waker.wake();
635        }
636        trace!("MockExecutor execute_to_completion done");
637    }
638
639    /// Keep polling tasks until `awake` is empty
640    ///
641    /// (Ignores `progressing_until_stalled` - so if one is active,
642    /// will return when all other tasks have blocked.)
643    ///
644    /// # Panics
645    ///
646    /// Might malfunction or panic if called reentrantly
647    fn execute_until_first_stall(&self, main_fut: MainFuture) {
648        trace!("MockExecutor execute_until_first_stall ...");
649
650        assert_eq!(
651            THREAD_DESCRIPTOR.get(),
652            ThreadDescriptor::Foreign,
653            "MockExecutor executor re-entered"
654        );
655        THREAD_DESCRIPTOR.set(ThreadDescriptor::Executor);
656
657        let r = catch_unwind(AssertUnwindSafe(|| self.executor_main_loop(main_fut)));
658
659        THREAD_DESCRIPTOR.set(ThreadDescriptor::Foreign);
660
661        match r {
662            Ok(()) => trace!("MockExecutor execute_until_first_stall done."),
663            Err(e) => {
664                trace!("MockExecutor executor, or async task, panicked!");
665                panic_any(e)
666            }
667        }
668    }
669
670    /// Keep polling tasks until `awake` is empty (inner, executor main loop)
671    ///
672    /// This is only called from [`MockExecutor::execute_until_first_stall`],
673    /// so it could also be called `execute_until_first_stall_inner`.
674    #[allow(clippy::cognitive_complexity)]
675    fn executor_main_loop(&self, mut main_fut: MainFuture) {
676        'outer: loop {
677            // Take a `Awake` task off `awake` and make it `Asleep`
678            let (id, mut fut) = 'inner: loop {
679                let mut data = self.shared.lock();
680                let Some(id) = data.schedule() else {
681                    break 'outer;
682                };
683                let Some(task) = data.tasks.get_mut(id) else {
684                    trace!("MockExecutor {id:?} vanished");
685                    continue;
686                };
687                task.state = Asleep(vec![]);
688                let fut = task.fut.take().expect("future missing from task!");
689                break 'inner (id, fut);
690            };
691
692            // Poll the selected task
693            let waker = ActualWaker {
694                data: Arc::downgrade(&self.shared),
695                id,
696            }
697            .new_waker();
698            trace!("MockExecutor {id:?} polling...");
699            let mut cx = Context::from_waker(&waker);
700            let r = match &mut fut {
701                TaskFutureInfo::Normal(fut) => fut.poll_unpin(&mut cx),
702                TaskFutureInfo::Main => main_fut.as_mut().poll(&mut cx),
703            };
704
705            // Deal with the returned `Poll`
706            let _fut_drop_late;
707            {
708                let mut data = self.shared.lock();
709                let task = data
710                    .tasks
711                    .get_mut(id)
712                    .expect("task vanished while we were polling it");
713
714                match (r, task.is_subthread) {
715                    (Pending, _) => {
716                        trace!("MockExecutor {id:?} -> Pending");
717                        if task.fut.is_some() {
718                            panic!("task reinserted while we polled it?!");
719                        }
720                        // The task might have been woken *by its own poll method*.
721                        // That's why we set it to `Asleep` *earlier* rather than here.
722                        // All we need to do is put the future back.
723                        task.fut = Some(fut);
724                    }
725                    (Ready(()), None) => {
726                        trace!("MockExecutor {id:?} -> Ready");
727                        // Oh, it finished!
728                        // It might be in `awake`, but that's allowed to contain stale tasks,
729                        // so we *don't* need to scan that list and remove it.
730                        data.tasks.remove(id);
731                        // It is important that we don't drop `fut` until we have released
732                        // the data lock, since it is an external type and might try to reenter
733                        // us (eg by calling spawn).  If we do that here, we risk deadlock.
734                        // So, move `fut` to a variable with scope outside the block with `data`.
735                        _fut_drop_late = fut;
736                    }
737                    (Ready(()), Some(IsSubthread)) => {
738                        trace!("MockExecutor {id:?} -> Ready, waking Subthread");
739                        // Task was blocking on the future given to .subthread_block_on_future().
740                        // That future has completed and stored its result where the Subthread
741                        // can see it.  Now we need to wake up that thread, and let it run
742                        // until it blocks again.
743                        //
744                        // We leave `.fut` as `None`.
745                        // subthread_block_on_future is responsible for filling it in again.
746
747                        self.shared.thread_context_switch(
748                            data,
749                            ThreadDescriptor::Executor,
750                            ThreadDescriptor::Subthread(id),
751                        );
752
753                        // Now, if the Subthread still exists, that's because it's waiting
754                        // in subthread_block_on_future again, in which case `fut` is `Some`.
755                        // Or it might have ended, in which case it's not in `tasks` any more.
756                        // We can go back to scheduling futures.
757
758                        // `fut` contains the future passed to `subthread_block_on_future`,
759                        // ie it owns an external type.  See above.
760                        _fut_drop_late = fut;
761                    }
762                }
763            }
764        }
765    }
766}
767
768impl Data {
769    /// Return the next task to run
770    ///
771    /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
772    /// The caller must restore the invariant!
773    fn schedule(&mut self) -> Option<TaskId> {
774        use SchedulingPolicy as SP;
775        match self.scheduling {
776            SP::Stack => self.awake.pop_back(),
777            SP::Queue => self.awake.pop_front(),
778        }
779    }
780}
781
782impl ActualWaker {
783    /// Obtain a strong reference to the executor's data
784    fn upgrade_data(&self) -> Option<Arc<Shared>> {
785        self.data.upgrade()
786    }
787
788    /// Wake the task corresponding to this `ActualWaker`
789    ///
790    /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
791    fn wake(&self) {
792        let Some(data) = self.upgrade_data() else {
793            // The executor is gone!  Don't try to wake.
794            return;
795        };
796        let mut data = data.lock();
797        let data = &mut *data;
798        trace!("MockExecutor {:?} wake", &self.id);
799        let Some(task) = data.tasks.get_mut(self.id) else {
800            return;
801        };
802        task.set_awake(self.id, &mut data.awake);
803    }
804}
805
806//---------- "progress until stalled" functionality ----------
807
808impl MockExecutor {
809    /// Run tasks in the current executor until every other task is waiting
810    ///
811    /// # Panics
812    ///
813    /// Might malfunction or panic if more than one such call is running at once.
814    ///
815    /// (Ie, you must `.await` or drop the returned `Future`
816    /// before calling this method again.)
817    ///
818    /// Must be called and awaited within a future being run by `self`.
819    pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
820        let mut data = self.shared.lock();
821        assert!(
822            data.progressing_until_stalled.is_none(),
823            "progress_until_stalled called more than once"
824        );
825        trace!("MockExecutor progress_until_stalled...");
826        data.progressing_until_stalled = Some(ProgressingUntilStalled {
827            finished: Pending,
828            waker: None,
829        });
830        ProgressUntilStalledFuture {
831            shared: self.shared.clone(),
832        }
833    }
834}
835
836impl Future for ProgressUntilStalledFuture {
837    type Output = ();
838
839    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
840        let waker = cx.waker().clone();
841        let mut data = self.shared.lock();
842        let pus = data.progressing_until_stalled.as_mut();
843        trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
844        let pus = pus.expect("ProgressingUntilStalled missing");
845        pus.waker = Some(waker);
846        pus.finished
847    }
848}
849
850impl Drop for ProgressUntilStalledFuture {
851    fn drop(&mut self) {
852        self.shared.lock().progressing_until_stalled = None;
853    }
854}
855
856//---------- (sub)threads ----------
857
858impl MockExecutor {
859    /// Spawn a "Subthread", for processing in a sync context
860    ///
861    /// `call` will be run on a separate thread, called a "Subthread".
862    ///
863    /// But it will **not run simultaneously** with the executor,
864    /// nor with other Subthreads.
865    /// So Subthreads are somewhat like coroutines.
866    ///
867    /// `call` must be capable of making progress without waiting for any other Subthreads.
868    /// `call` may wait for async futures, using
869    /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
870    ///
871    /// Subthreads may be used for cpubound activity,
872    /// or synchronous IO (such as large volumes of disk activity),
873    /// provided that the synchronous code will reliably make progress,
874    /// without waiting (directly or indirectly) for any async task or Subthread -
875    /// except via `subthread_block_on_future`.
876    ///
877    /// # Subthreads vs raw `std::thread` threads
878    ///
879    /// Programs using `MockExecutor` may use `std::thread` threads directly.
880    /// However, this is not recommended.  There are severe limitations:
881    ///
882    ///  * Only a Subthread can re-enter the async context from sync code:
883    ///    this must be done with
884    ///    using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
885    ///    (Re-entering the executor with
886    ///    [`block_on`](tor_rtcompat::ToplevelBlockOn::block_on)
887    ///    is not allowed.)
888    ///  * If async tasks want to suspend waiting for synchronous code,
889    ///    the synchronous code must run on a Subthread.
890    ///    This allows the `MockExecutor` to know when
891    ///    that synchronous code is still making progress.
892    ///    (This is needed for
893    ///    [`progress_until_stalled`](MockExecutor::progress_until_stalled)
894    ///    and the facilities which use it, such as
895    ///    [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
896    ///  * Subthreads never run in parallel -
897    ///    they only run as scheduled deterministically by the `MockExecutor`.
898    ///    So using Subthreads eliminates a source of test nonndeterminism.
899    ///    (Execution order is still varied due to explicitly varying the scheduling policy.)
900    ///
901    /// # Panics, abuse, and malfunctions
902    ///
903    /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
904    /// The application code should to do something about it if this happens,
905    /// typically, logging errors, tearing things down, or failing a test case.
906    ///
907    /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
908    /// (So, typically, if the thread supposed to run the executor panics,
909    /// for example because a future or the executor itself panics,
910    /// all the subthreads will become stuck - effectively, they'll be leaked.)
911    ///
912    /// `spawn_subthread` panics if OS thread spawning fails.
913    /// (Like `std::thread::spawn()` does.)
914    ///
915    /// `MockExecutor`s will malfunction or panic if
916    /// any executor invocation method (eg `block_on`) is called on a Subthread.
917    pub fn subthread_spawn<T: Send + 'static>(
918        &self,
919        desc: impl Display,
920        call: impl FnOnce() -> T + Send + 'static,
921    ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
922        let desc = desc.to_string();
923        let (output_tx, output_rx) = oneshot::channel();
924
925        // NB: we don't know which thread we're on!
926        // In principle we might be on another Subthread.
927        // So we can't context switch here.  That would be very confusing.
928        //
929        // Instead, we prepare the new Subthread as follows:
930        //   - There is a task in the executor
931        //   - The task is ready to be polled, whenever the executor decides to
932        //   - The thread starts running right away, but immediately waits until it is scheduled
933        // See `subthread_entrypoint`.
934
935        {
936            let mut data = self.shared.lock();
937            let fut = TaskFutureInfo::Normal(
938                Box::new(
939                    // When the executor decides that this new task is to be polled,
940                    // its future (this future) returns Ready immediately,
941                    // and the executor mainloop will context switch to the new thread.
942                    futures::future::ready(()),
943                )
944                .into(),
945            );
946            let id = data.insert_task(desc.clone(), fut, Some(IsSubthread));
947
948            let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
949                .name(desc)
950                .spawn({
951                    let shared = self.shared.clone();
952                    move || shared.subthread_entrypoint(id, call, output_tx)
953                })
954                .expect("spawn failed");
955        }
956
957        output_rx.map(|r| {
958            r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
959        })
960    }
961
962    /// Call an async `Future` from a Subthread
963    ///
964    /// Blocks the Subthread, and arranges to run async tasks,
965    /// including `fut`, until `fut` completes.
966    ///
967    /// `fut` is polled on the executor thread, not on the Subthread.
968    /// (We may change that in the future, allowing passing a non-`Send` future.)
969    ///
970    /// # Panics, abuse, and malfunctions
971    ///
972    /// `subthread_block_on_future` will malfunction or panic
973    /// if called on a thread that isn't a Subthread from the same `MockExecutor`
974    /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
975    ///
976    /// If `fut` itself panics, the executor will panic.
977    ///
978    /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
979    /// See `spawn_subthread`.
980    pub fn subthread_block_on_future<T: Send + 'static>(
981        &self,
982        fut: impl Future<Output = T> + Send + 'static,
983    ) -> T {
984        let ret = Arc::new(Mutex::new(None));
985        let fut = {
986            let ret = ret.clone();
987            async move {
988                let t = fut.await;
989                *ret.lock().expect("poison") = Some(t);
990            }
991        };
992        let fut = TaskFutureInfo::Normal(Box::new(fut).into());
993
994        let id = match THREAD_DESCRIPTOR.get() {
995            ThreadDescriptor::Subthread(id) => id,
996            ThreadDescriptor::Executor => {
997                panic!("subthread_block_on_future called from MockExecutor thread (async task?)")
998            }
999            ThreadDescriptor::Foreign => panic!(
1000    "subthread_block_on_future called on foreign thread (not spawned with spawn_subthread)"
1001            ),
1002        };
1003        trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
1004
1005        {
1006            let mut data = self.shared.lock();
1007            let data_ = &mut *data;
1008            let task = data_.tasks.get_mut(id).expect("Subthread task vanished!");
1009            task.fut = Some(fut);
1010            task.set_awake(id, &mut data_.awake);
1011
1012            self.shared.thread_context_switch(
1013                data,
1014                ThreadDescriptor::Subthread(id),
1015                ThreadDescriptor::Executor,
1016            );
1017        }
1018
1019        let ret = ret.lock().expect("poison").take();
1020        ret.expect("fut completed but didn't store")
1021    }
1022}
1023
1024impl Shared {
1025    /// Main entrypoint function for a Subthread
1026    ///
1027    /// Entered on a new `std::thread` thread created by
1028    /// [`subthread_spawn`](MockExecutor::subthread_spawn).
1029    ///
1030    /// When `call` completes, sends its returned value `T` to `output_tx`.
1031    fn subthread_entrypoint<T: Send + 'static>(
1032        self: Arc<Self>,
1033        id: TaskId,
1034        call: impl FnOnce() -> T + Send + 'static,
1035        output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
1036    ) {
1037        THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
1038        trace!("MockExecutor thread {id:?}, entrypoint");
1039
1040        // Wait for the executor to tell us to run.
1041        // This will be done the first time the task is polled.
1042        {
1043            let data = self.lock();
1044            self.thread_context_switch_waitfor_instruction_to_run(
1045                data,
1046                ThreadDescriptor::Subthread(id),
1047            );
1048        }
1049
1050        trace!("MockExecutor thread {id:?}, entering user code");
1051
1052        // Run the user's actual thread function.
1053        // This will typically reenter us via subthread_block_on_future.
1054        let ret = catch_unwind(AssertUnwindSafe(call));
1055
1056        trace!("MockExecutor thread {id:?}, completed user code");
1057
1058        // This makes SubthreadFuture ready.
1059        // It will be polled by the executor in due course, presumably.
1060
1061        output_tx.send(ret).unwrap_or_else(
1062            #[allow(clippy::unnecessary_lazy_evaluations)]
1063            |_| {}, // receiver dropped, maybe executor dropped or something?
1064        );
1065
1066        {
1067            let mut data = self.lock();
1068
1069            // Never poll this task again (so never schedule this thread)
1070            let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
1071
1072            // Tell the executor it is scheduled now.
1073            // We carry on exiting, in parallel (holding the data lock).
1074            self.thread_context_switch_send_instruction_to_run(
1075                &mut data,
1076                ThreadDescriptor::Subthread(id),
1077                ThreadDescriptor::Executor,
1078            );
1079        }
1080    }
1081
1082    /// Switch from (sub)thread `us` to (sub)thread `them`
1083    ///
1084    /// Returns when someone calls `thread_context_switch(.., us)`.
1085    fn thread_context_switch(
1086        &self,
1087        mut data: MutexGuard<Data>,
1088        us: ThreadDescriptor,
1089        them: ThreadDescriptor,
1090    ) {
1091        trace!("MockExecutor thread {us:?}, switching to {them:?}");
1092        self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
1093        self.thread_context_switch_waitfor_instruction_to_run(data, us);
1094    }
1095
1096    /// Instruct the (sub)thread `them` to run
1097    ///
1098    /// Update `thread_to_run`, which will wake up `them`'s
1099    /// call to `thread_context_switch_waitfor_instruction_to_run`.
1100    ///
1101    /// Must be called from (sub)thread `us`.
1102    /// Part of `thread_context_switch`, not normally called directly.
1103    fn thread_context_switch_send_instruction_to_run(
1104        &self,
1105        data: &mut MutexGuard<Data>,
1106        us: ThreadDescriptor,
1107        them: ThreadDescriptor,
1108    ) {
1109        assert_eq!(data.thread_to_run, us);
1110        data.thread_to_run = them;
1111        self.thread_condvar.notify_all();
1112    }
1113
1114    /// Await an instruction for this thread, `us`, to run
1115    ///
1116    /// Waits for `thread_to_run` to be `us`,
1117    /// waiting for `thread_condvar` as necessary.
1118    ///
1119    /// Part of `thread_context_switch`, not normally called directly.
1120    fn thread_context_switch_waitfor_instruction_to_run(
1121        &self,
1122        data: MutexGuard<Data>,
1123        us: ThreadDescriptor,
1124    ) {
1125        #[allow(let_underscore_lock)]
1126        let _: MutexGuard<_> = self
1127            .thread_condvar
1128            .wait_while(data, |data| {
1129                let live = data.thread_to_run;
1130                let resume = live == us;
1131                if resume {
1132                    trace!("MockExecutor thread {us:?}, resuming");
1133                } else {
1134                    trace!("MockExecutor thread {us:?}, waiting for {live:?}");
1135                }
1136                // We're in `.wait_while`, not `.wait_until`.  Confusing.
1137                !resume
1138            })
1139            .expect("data lock poisoned");
1140    }
1141}
1142
1143//---------- ancillary and convenience functions ----------
1144
1145/// Trait to let us assert at compile time that something is nicely `Sync` etc.
1146#[allow(dead_code)] // yes, we don't *use* anything from this trait
1147trait EnsureSyncSend: Sync + Send + 'static {}
1148impl EnsureSyncSend for ActualWaker {}
1149impl EnsureSyncSend for MockExecutor {}
1150
1151impl MockExecutor {
1152    /// Return the number of tasks running in this executor
1153    ///
1154    /// One possible use is for a test case to check that task(s)
1155    /// that ought to have exited, have indeed done so.
1156    ///
1157    /// In the usual case, the answer will be at least 1,
1158    /// because it counts the future passed to
1159    /// [`block_on`](MockExecutor::block_on)
1160    /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
1161    pub fn n_tasks(&self) -> usize {
1162        self.shared.lock().tasks.len()
1163    }
1164}
1165
1166impl Shared {
1167    /// Lock and obtain the guard
1168    ///
1169    /// Convenience method which panics on poison
1170    fn lock(&self) -> MutexGuard<Data> {
1171        self.data.lock().expect("data lock poisoned")
1172    }
1173}
1174
1175impl Task {
1176    /// Set task `id` to `Awake` and arrange that it will be polled.
1177    fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1178        match self.state {
1179            Awake => {}
1180            Asleep(_) => {
1181                self.state = Awake;
1182                data_awake.push_back(id);
1183            }
1184        }
1185    }
1186}
1187
1188//---------- ActualWaker as RawWaker ----------
1189
1190/// Using [`ActualWaker`] in a [`RawWaker`]
1191///
1192/// We need to make a
1193/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
1194/// which contains an
1195/// [`ActualWaker`] (our actual waker implementation, also safe).
1196///
1197/// `std` offers `Waker::from<Arc<impl Wake>>`.
1198/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
1199///
1200/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
1201/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
1202///
1203/// SAFETY
1204///
1205///  * The data pointer is `Box::<ActualWaker>::into_raw()`
1206///  * We share these when we clone
1207///  * No-one is allowed `&mut ActualWaker` unless there are no other clones
1208///  * So we may make references `&ActualWaker`
1209impl ActualWaker {
1210    /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1211    fn new_waker(self) -> Waker {
1212        unsafe { Waker::from_raw(self.raw_new()) }
1213    }
1214
1215    /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
1216    fn raw_new(self) -> RawWaker {
1217        let self_: Box<ActualWaker> = self.into();
1218        let self_: *mut ActualWaker = Box::into_raw(self_);
1219        let self_: *const () = self_ as _;
1220        RawWaker::new(self_, &RAW_WAKER_VTABLE)
1221    }
1222
1223    /// Implementation of [`RawWakerVTable`]'s `clone`
1224    unsafe fn raw_clone(self_: *const ()) -> RawWaker {
1225        let self_: *const ActualWaker = self_ as _;
1226        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1227        let copy: ActualWaker = self_.clone();
1228        copy.raw_new()
1229    }
1230
1231    /// Implementation of [`RawWakerVTable`]'s `wake`
1232    unsafe fn raw_wake(self_: *const ()) {
1233        Self::raw_wake_by_ref(self_);
1234        Self::raw_drop(self_);
1235    }
1236
1237    /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1238    unsafe fn raw_wake_by_ref(self_: *const ()) {
1239        let self_: *const ActualWaker = self_ as _;
1240        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1241        self_.wake();
1242    }
1243
1244    /// Implementation of [`RawWakerVTable`]'s `drop`
1245    unsafe fn raw_drop(self_: *const ()) {
1246        let self_: *mut ActualWaker = self_ as _;
1247        let self_: Box<ActualWaker> = Box::from_raw(self_);
1248        drop(self_);
1249    }
1250}
1251
1252/// vtable for `Box<ActualWaker>` as `RawWaker`
1253//
1254// This ought to be in the impl block above, but
1255//   "associated `static` items are not allowed"
1256static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1257    ActualWaker::raw_clone,
1258    ActualWaker::raw_wake,
1259    ActualWaker::raw_wake_by_ref,
1260    ActualWaker::raw_drop,
1261);
1262
1263//---------- Sleep location tracking and dumping ----------
1264
1265/// We record "where a future went to sleep" as (just) a backtrace
1266///
1267/// This type alias allows us to mock `Backtrace` for miri.
1268/// (It also insulates from future choices about sleep location representation.0
1269#[cfg(not(miri))]
1270type SleepLocation = Backtrace;
1271
1272impl Data {
1273    /// Dump tasks and their sleep location backtraces
1274    fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
1275        for (id, task) in self.tasks.iter() {
1276            let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
1277            match &task.state {
1278                Awake => {
1279                    prefix(f)?;
1280                    writeln!(f, "awake")?;
1281                }
1282                Asleep(locs) => {
1283                    let n = locs.len();
1284                    for (i, loc) in locs.iter().enumerate() {
1285                        prefix(f)?;
1286                        writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
1287                    }
1288                    if n == 0 {
1289                        prefix(f)?;
1290                        writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
1291                    }
1292                }
1293            }
1294        }
1295        writeln!(
1296            f,
1297            "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
1298        )?;
1299        Ok(())
1300    }
1301}
1302
1303/// Track sleep locations via `<Waker as Clone>`.
1304///
1305/// See [`MockExecutor::debug_dump`] for the explanation.
1306impl Clone for ActualWaker {
1307    fn clone(&self) -> Self {
1308        let id = self.id;
1309
1310        if let Some(data) = self.upgrade_data() {
1311            // If the executor is gone, there is nothing to adjust
1312            let mut data = data.lock();
1313            if let Some(task) = data.tasks.get_mut(self.id) {
1314                match &mut task.state {
1315                    Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
1316                    Asleep(locs) => locs.push(SleepLocation::force_capture()),
1317                }
1318            } else {
1319                trace!("MockExecutor cloned waker for dead task {id:?}");
1320            }
1321        }
1322
1323        ActualWaker {
1324            data: self.data.clone(),
1325            id,
1326        }
1327    }
1328}
1329
1330//---------- API for full debug dump ----------
1331
1332/// Debugging dump of a `MockExecutor`'s state
1333///
1334/// Returned by [`MockExecutor::as_debug_dump`]
1335//
1336// Existence implies backtraces have been resolved
1337//
1338// We use `Either` so that we can also use this internally when we have &mut Data.
1339pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
1340
1341impl MockExecutor {
1342    /// Dump the executor's state including backtraces of waiting tasks, to stderr
1343    ///
1344    /// This is considerably more extensive than simply
1345    /// `MockExecutor as Debug`.
1346    ///
1347    /// (This is a convenience method, which wraps
1348    /// [`MockExecutor::as_debug_dump()`].
1349    ///
1350    /// ### Backtrace salience (possible spurious traces)
1351    ///
1352    /// **Summary**
1353    ///
1354    /// The technique used to capture backtraces when futures sleep is not 100% exact.
1355    /// It will usually show all the actual sleeping sites,
1356    /// but it might also show other backtraces which were part of
1357    /// the implementation of some complex relevant future.
1358    ///
1359    /// **Details**
1360    ///
1361    /// When a future's implementation wants to sleep,
1362    /// it needs to record the [`Waker`] (from the [`Context`])
1363    /// so that the "other end" can call `.wake()` on it later,
1364    /// when the future should be woken.
1365    ///
1366    /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
1367    /// the future must clone the `Waker`,
1368    /// and it must do so in within the `poll()` call.
1369    ///
1370    /// A future which is waiting in a `select!` will typically
1371    /// show multiple traces, one for each branch.
1372    /// But,
1373    /// if a future sleeps on one thing, and then when polled again later,
1374    /// sleeps on something different, without waking up in between,
1375    /// both backtrace locations will be shown.
1376    /// And,
1377    /// a complicated future contraption *might* clone the `Waker` more times.
1378    /// So not every backtrace will necessarily be informative.
1379    ///
1380    /// ### Panics
1381    ///
1382    /// Panics on write errors.
1383    pub fn debug_dump(&self) {
1384        self.as_debug_dump().to_stderr();
1385    }
1386
1387    /// Dump the executor's state including backtraces of waiting tasks
1388    ///
1389    /// This is considerably more extensive than simply
1390    /// `MockExecutor as Debug`.
1391    ///
1392    /// Returns an object for formatting with [`Debug`].
1393    /// To simply print the dump to stderr (eg in a test),
1394    /// use [`.debug_dump()`](MockExecutor::debug_dump).
1395    ///
1396    /// **Backtrace salience (possible spurious traces)** -
1397    /// see [`.debug_dump()`](MockExecutor::debug_dump).
1398    pub fn as_debug_dump(&self) -> DebugDump {
1399        let data = self.shared.lock();
1400        DebugDump(Either::Right(data))
1401    }
1402}
1403
1404impl Data {
1405    /// Convenience function: dump including backtraces, to stderr
1406    fn debug_dump(&mut self) {
1407        DebugDump(Either::Left(self)).to_stderr();
1408    }
1409}
1410
1411impl DebugDump<'_> {
1412    /// Convenience function: dump tasks and backtraces to stderr
1413    #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
1414    fn to_stderr(self) {
1415        write!(io::stderr().lock(), "{:?}", self)
1416            .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
1417    }
1418}
1419
1420//---------- bespoke Debug impls ----------
1421
1422impl Debug for DebugDump<'_> {
1423    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1424        let self_: &Data = &self.0;
1425
1426        writeln!(f, "MockExecutor state:\n{self_:#?}")?;
1427        writeln!(f, "MockExecutor task dump:")?;
1428        self_.dump_backtraces(f)?;
1429
1430        Ok(())
1431    }
1432}
1433
1434// See `impl Debug for Data` for notes on the output
1435impl Debug for Task {
1436    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1437        let Task {
1438            desc,
1439            state,
1440            fut,
1441            is_subthread,
1442        } = self;
1443        write!(f, "{:?}", desc)?;
1444        write!(f, "=")?;
1445        match is_subthread {
1446            None => {}
1447            Some(IsSubthread) => write!(f, "T")?,
1448        }
1449        match fut {
1450            None => write!(f, "P")?,
1451            Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
1452            Some(TaskFutureInfo::Main) => write!(f, "m")?,
1453        }
1454        match state {
1455            Awake => write!(f, "W")?,
1456            Asleep(locs) => write!(f, "s{}", locs.len())?,
1457        };
1458        Ok(())
1459    }
1460}
1461
1462/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
1463///
1464/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
1465///
1466/// `FLAGS` are:
1467///
1468///  * `T`: this task is for a Subthread (from subthread_spawn).
1469///  * `P`: this task is being polled (its `TaskFutureInfo` is absent)
1470///  * `f`: this is a normal task with a future and its future is present in `Data`
1471///  * `m`: this is the main task from `block_on`
1472///
1473///  * `W`: the task is awake
1474///  * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
1475//
1476// We do it this way because the naive dump from derive is very expansive
1477// and makes it impossible to see the wood for the trees.
1478// This very compact representation it easier to find a task of interest in the output.
1479//
1480// This is implemented in `impl Debug for Task`.
1481//
1482//
1483// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
1484// This has caused quite some fallout.  https://github.com/rust-lang/rust/pull/85200
1485// I think derive_more emits #[automatically_derived], so that even though we use this
1486// in our Debug impl, that construction is unused.
1487#[allow(dead_code)]
1488struct DebugTasks<'d, F>(&'d Data, F);
1489
1490// See `impl Debug for Data` for notes on the output
1491impl<F, I> Debug for DebugTasks<'_, F>
1492where
1493    F: Fn() -> I,
1494    I: Iterator<Item = TaskId>,
1495{
1496    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1497        let DebugTasks(data, ids) = self;
1498        for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
1499            write!(f, "{delim}{id:?}")?;
1500            match data.tasks.get(id) {
1501                None => write!(f, "-")?,
1502                Some(task) => write!(f, "={task:?}")?,
1503            }
1504        }
1505        Ok(())
1506    }
1507}
1508
1509/// Mock `Backtrace` for miri
1510///
1511/// See also the not-miri `type SleepLocation`, alias above.
1512#[cfg(miri)]
1513mod miri_sleep_location {
1514    #[derive(Debug, derive_more::Display)]
1515    #[display("<SleepLocation>")]
1516    pub(super) struct SleepLocation {}
1517
1518    impl SleepLocation {
1519        pub(super) fn force_capture() -> Self {
1520            SleepLocation {}
1521        }
1522    }
1523}
1524#[cfg(miri)]
1525use miri_sleep_location::SleepLocation;
1526
1527#[cfg(test)]
1528mod test {
1529    // @@ begin test lint list maintained by maint/add_warning @@
1530    #![allow(clippy::bool_assert_comparison)]
1531    #![allow(clippy::clone_on_copy)]
1532    #![allow(clippy::dbg_macro)]
1533    #![allow(clippy::mixed_attributes_style)]
1534    #![allow(clippy::print_stderr)]
1535    #![allow(clippy::print_stdout)]
1536    #![allow(clippy::single_char_pattern)]
1537    #![allow(clippy::unwrap_used)]
1538    #![allow(clippy::unchecked_duration_subtraction)]
1539    #![allow(clippy::useless_vec)]
1540    #![allow(clippy::needless_pass_by_value)]
1541    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1542    use super::*;
1543    use futures::channel::mpsc;
1544    use futures::{SinkExt as _, StreamExt as _};
1545    use strum::IntoEnumIterator;
1546    use tracing::info;
1547
1548    #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
1549    use tracing_test::traced_test;
1550
1551    fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
1552        // This duplicates the part of the logic in MockRuntime::test_with_various which
1553        // relates to MockExecutor, because we don't have a MockRuntime::builder.
1554        // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
1555        SchedulingPolicy::iter().map(|scheduling| {
1556            eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
1557            MockExecutor::with_scheduling(scheduling)
1558        })
1559    }
1560
1561    #[cfg_attr(not(miri), traced_test)]
1562    #[test]
1563    fn simple() {
1564        let runtime = MockExecutor::default();
1565        let val = runtime.block_on(async { 42 });
1566        assert_eq!(val, 42);
1567    }
1568
1569    #[cfg_attr(not(miri), traced_test)]
1570    #[test]
1571    fn stall() {
1572        let runtime = MockExecutor::default();
1573
1574        runtime.block_on({
1575            let runtime = runtime.clone();
1576            async move {
1577                const N: usize = 3;
1578                let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
1579                    (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
1580
1581                let mut rx_n = rxs.pop().unwrap();
1582
1583                for (i, mut rx) in rxs.into_iter().enumerate() {
1584                    runtime.spawn_identified(i, {
1585                        let mut txs = txs.clone();
1586                        async move {
1587                            loop {
1588                                eprintln!("task {i} rx...");
1589                                let v = rx.next().await.unwrap();
1590                                let nv = v + 1;
1591                                eprintln!("task {i} rx {v}, tx {nv}");
1592                                let v = nv;
1593                                txs[v].send(v).await.unwrap();
1594                            }
1595                        }
1596                    });
1597                }
1598
1599                dbg!();
1600                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1601
1602                dbg!();
1603                runtime.progress_until_stalled().await;
1604
1605                dbg!();
1606                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1607
1608                dbg!();
1609                txs[0].send(0).await.unwrap();
1610
1611                dbg!();
1612                runtime.progress_until_stalled().await;
1613
1614                dbg!();
1615                let r = rx_n.next().await;
1616                assert_eq!(r, Some(N - 1));
1617
1618                dbg!();
1619                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1620
1621                runtime.spawn_identified("tx", {
1622                    let txs = txs.clone();
1623                    async {
1624                        eprintln!("sending task...");
1625                        for (i, mut tx) in txs.into_iter().enumerate() {
1626                            eprintln!("sending 0 to {i}...");
1627                            tx.send(0).await.unwrap();
1628                        }
1629                        eprintln!("sending task done");
1630                    }
1631                });
1632
1633                runtime.debug_dump();
1634
1635                for i in 0..txs.len() {
1636                    eprintln!("main {i} wait stall...");
1637                    runtime.progress_until_stalled().await;
1638                    eprintln!("main {i} rx wait...");
1639                    let r = rx_n.next().await;
1640                    eprintln!("main {i} rx = {r:?}");
1641                    assert!(r == Some(0) || r == Some(N - 1));
1642                }
1643
1644                eprintln!("finishing...");
1645                runtime.progress_until_stalled().await;
1646                eprintln!("finished.");
1647            }
1648        });
1649    }
1650
1651    #[cfg_attr(not(miri), traced_test)]
1652    #[test]
1653    fn spawn_blocking() {
1654        let runtime = MockExecutor::default();
1655
1656        runtime.block_on({
1657            let runtime = runtime.clone();
1658            async move {
1659                let thr_1 = runtime.spawn_blocking(|| 42);
1660                let thr_2 = runtime.spawn_blocking(|| 99);
1661
1662                assert_eq!(thr_2.await, 99);
1663                assert_eq!(thr_1.await, 42);
1664            }
1665        });
1666    }
1667
1668    #[cfg_attr(not(miri), traced_test)]
1669    #[test]
1670    fn drop_reentrancy() {
1671        // Check that dropping a completed task future is done *outside* the data lock.
1672        // Involves a contrived future whose Drop impl reenters the executor.
1673        //
1674        // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
1675        // is replaced with `drop(fut)` (dropping the future at the wrong moment),
1676        // we do indeed get deadlock, so this test case is working.
1677
1678        struct ReentersOnDrop {
1679            runtime: MockExecutor,
1680        }
1681        impl Future for ReentersOnDrop {
1682            type Output = ();
1683            fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
1684                Poll::Ready(())
1685            }
1686        }
1687        impl Drop for ReentersOnDrop {
1688            fn drop(&mut self) {
1689                self.runtime
1690                    .spawn_identified("dummy", futures::future::ready(()));
1691            }
1692        }
1693
1694        for runtime in various_mock_executors() {
1695            runtime.block_on(async {
1696                runtime.spawn_identified("trapper", {
1697                    let runtime = runtime.clone();
1698                    ReentersOnDrop { runtime }
1699                });
1700            });
1701        }
1702    }
1703
1704    #[cfg_attr(not(miri), traced_test)]
1705    #[test]
1706    fn subthread() {
1707        for runtime in various_mock_executors() {
1708            runtime.block_on(async {
1709                let (tx, rx) = oneshot::channel();
1710                info!("spawning subthread");
1711                let thr = runtime.subthread_spawn("thr1", {
1712                    let runtime = runtime.clone();
1713                    move || {
1714                        info!("subthread_block_on_future...");
1715                        let i = runtime.subthread_block_on_future(rx).unwrap();
1716                        info!("subthread_block_on_future => {i}");
1717                        i + 1
1718                    }
1719                });
1720                info!("main task sending");
1721                tx.send(12).unwrap();
1722                info!("main task sent");
1723                let r = thr.await.unwrap();
1724                info!("main task thr => {r}");
1725                assert_eq!(r, 13);
1726            });
1727        }
1728    }
1729}