tor_rtmock/
task.rs

1//! Executor for running tests with mocked environment
2//!
3//! See [`MockExecutor`]
4
5use std::any::Any;
6use std::cell::Cell;
7use std::collections::VecDeque;
8use std::fmt::{self, Debug, Display};
9use std::future::Future;
10use std::io::{self, Write as _};
11use std::iter;
12use std::panic::{catch_unwind, panic_any, AssertUnwindSafe};
13use std::pin::{pin, Pin};
14use std::sync::{Arc, Mutex, MutexGuard, Weak};
15use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
16
17use futures::future::Map;
18use futures::pin_mut;
19use futures::task::{FutureObj, Spawn, SpawnError};
20use futures::FutureExt as _;
21
22use assert_matches::assert_matches;
23use educe::Educe;
24use itertools::Either::{self, *};
25use itertools::{chain, izip};
26use slotmap_careful::DenseSlotMap;
27use std::backtrace::Backtrace;
28use strum::EnumIter;
29
30// NB: when using traced_test, the trace! and error! output here is generally suppressed
31// in tests of other crates.  To see it, you can write something like this
32// (in the dev-dependencies of the crate whose tests you're running):
33//    tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
34use tracing::{error, trace};
35
36use oneshot_fused_workaround::{self as oneshot, Canceled, Receiver};
37use tor_error::error_report;
38use tor_rtcompat::{Blocking, ToplevelBlockOn};
39
40use Poll::*;
41use TaskState::*;
42
43/// Type-erased future, one for each of our (normal) tasks
44type TaskFuture = FutureObj<'static, ()>;
45
46/// Future for the argument to `block_on`, which is handled specially
47type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
48
49//---------- principal data structures ----------
50
51/// Executor for running tests with mocked environment
52///
53/// For test cases which don't actually wait for anything in the real world.
54///
55/// This is the executor.
56/// It implements [`Spawn`] and [`ToplevelBlockOn`]
57///
58/// It will usually be used as part of a `MockRuntime`.
59///
60/// To run futures, call [`ToplevelBlockOn::block_on`]
61///
62/// # Restricted environment
63///
64/// Tests run with this executor must not attempt to block
65/// on anything "outside":
66/// every future that anything awaits must (eventually) be woken directly
67/// *by some other task* in the same test case.
68///
69/// (By directly we mean that the [`Waker::wake`] call is made
70/// by that waking future, before that future itself awaits anything.)
71///
72/// # Panics
73///
74/// The executor will panic
75/// if the toplevel future (passed to `block_on`)
76/// doesn't complete (without externally blocking),
77/// but instead waits for something.
78///
79/// The executor will malfunction or panic if reentered.
80/// (Eg, if `block_on` is reentered.)
81#[derive(Clone, Default, Educe)]
82#[educe(Debug)]
83pub struct MockExecutor {
84    /// Mutable state
85    #[educe(Debug(ignore))]
86    shared: Arc<Shared>,
87}
88
89/// Shared state and ancillary information
90///
91/// This is always within an `Arc`.
92#[derive(Default)]
93struct Shared {
94    /// Shared state
95    data: Mutex<Data>,
96    /// Condition variable for thread scheduling
97    ///
98    /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
99    /// is modified.
100    thread_condvar: std::sync::Condvar,
101}
102
103/// Task id, module to hide `Ti` alias
104mod task_id {
105    slotmap_careful::new_key_type! {
106        /// Task ID, usually called `TaskId`
107        ///
108        /// Short name in special `task_id` module so that [`Debug`] is nice
109        pub(super) struct Ti;
110    }
111}
112use task_id::Ti as TaskId;
113
114/// Executor's state
115///
116/// ### Task state machine
117///
118/// A task is created in `tasks`, `Awake`, so also in `awake`.
119///
120/// When we poll it, we take it out of `awake` and set it to `Asleep`,
121/// and then call `poll()`.
122/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
123/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
124///
125/// The task's future is of course also present here in this data structure.
126/// However, during poll we must release the lock,
127/// so we cannot borrow the future from `Data`.
128/// Instead, we move it out.  So `Task.fut` is an `Option`.
129///
130/// ### "Main" task - the argument to `block_on`
131///
132/// The signature of `BlockOn::block_on` accepts a non-`'static` future
133/// (and a non-`Send`/`Sync` one).
134///
135/// So we cannot store that future in `Data` because `Data` is `'static`.
136/// Instead, this main task future is passed as an argument down the call stack.
137/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
138#[derive(Educe, derive_more::Debug)]
139#[educe(Default)]
140struct Data {
141    /// Tasks
142    ///
143    /// Includes tasks spawned with `spawn`,
144    /// and also the future passed to `block_on`.
145    #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
146    tasks: DenseSlotMap<TaskId, Task>,
147
148    /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
149    ///
150    /// Tasks are pushed onto the *back* when woken,
151    /// so back is the most recently woken.
152    #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
153    awake: VecDeque<TaskId>,
154
155    /// If a future from `progress_until_stalled` exists
156    progressing_until_stalled: Option<ProgressingUntilStalled>,
157
158    /// Scheduling policy
159    scheduling: SchedulingPolicy,
160
161    /// (Sub)thread we want to run now
162    ///
163    /// At any one time only one thread is meant to be running.
164    /// Other threads are blocked in condvar wait, waiting for this to change.
165    ///
166    /// **Modified only** within
167    /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
168    /// which takes responsibility for preserving the following **invariants**:
169    ///
170    ///  1. no-one but the named thread is allowed to modify this field.
171    ///  2. after modifying this field, signal `thread_condvar`
172    #[educe(Default(expression = "ThreadDescriptor::Executor"))]
173    thread_to_run: ThreadDescriptor,
174}
175
176/// How we should schedule?
177#[derive(Debug, Clone, Default, EnumIter)]
178#[non_exhaustive]
179pub enum SchedulingPolicy {
180    /// Task *most* recently woken is run
181    ///
182    /// This is the default.
183    ///
184    /// It will expose starvation bugs if a task never sleeps.
185    /// (Which is a good thing in tests.)
186    #[default]
187    Stack,
188    /// Task *least* recently woken is run.
189    Queue,
190}
191
192/// Record of a single task
193///
194/// Tracks a spawned task, or the main task (the argument to `block_on`).
195///
196/// Stored in [`Data`]`.tasks`.
197struct Task {
198    /// For debugging output
199    desc: String,
200    /// Has this been woken via a waker?  (And is it in `Data.awake`?)
201    ///
202    /// **Set to `Awake` only by [`Task::set_awake`]**,
203    /// preserving the invariant that
204    /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
205    state: TaskState,
206    /// The actual future (or a placeholder for it)
207    ///
208    /// May be `None` briefly in the executor main loop, because we've
209    /// temporarily moved it out so we can poll it,
210    /// or if this is a Subthread task which is currently running sync code
211    /// (in which case we're blocked in the executor waiting to be
212    /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
213    ///
214    /// Note that the `None` can be observed outside the main loop, because
215    /// the main loop unlocks while it polls, so other (non-main-loop) code
216    /// might see it.
217    fut: Option<TaskFutureInfo>,
218}
219
220/// A future as stored in our record of a [`Task`]
221#[derive(Educe)]
222#[educe(Debug)]
223enum TaskFutureInfo {
224    /// The [`Future`].  All is normal.
225    Normal(#[educe(Debug(ignore))] TaskFuture),
226    /// The future isn't here because this task is the main future for `block_on`
227    Main,
228    /// This task is actually a [`Subthread`](MockExecutor::subthread_spawn)
229    ///
230    /// Instead of polling it, we'll switch to it with
231    /// [`thread_context_switch`](Shared::thread_context_switch).
232    Subthread,
233}
234
235/// State of a task - do we think it needs to be polled?
236///
237/// Stored in [`Task`]`.state`.
238#[derive(Debug)]
239enum TaskState {
240    /// Awake - needs to be polled
241    ///
242    /// Established by [`waker.wake()`](Waker::wake)
243    Awake,
244    /// Asleep - does *not* need to be polled
245    ///
246    /// Established each time just before we call the future's [`poll`](Future::poll)
247    Asleep(Vec<SleepLocation>),
248}
249
250/// Actual implementor of `Wake` for use in a `Waker`
251///
252/// Futures (eg, channels from [`futures`]) will use this to wake a task
253/// when it should be polled.
254///
255/// This type must not be `Cloned` with the `Data` lock held.
256/// Consequently, a `Waker` mustn't either.
257struct ActualWaker {
258    /// Executor state
259    ///
260    /// The Waker mustn't to hold a strong reference to the executor,
261    /// since typically a task holds a future that holds a Waker,
262    /// and the executor holds the task - so that would be a cycle.
263    data: Weak<Shared>,
264
265    /// Which task this is
266    id: TaskId,
267}
268
269/// State used for an in-progress call to
270/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
271///
272/// If present in [`Data`], an (async) call to `progress_until_stalled`
273/// is in progress.
274///
275/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
276/// is a normal-ish future.
277/// It can be polled in the normal way.
278/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
279///
280/// The future is made ready, and woken (via `waker`),
281/// by bespoke code in the task executor loop.
282///
283/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
284/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
285#[derive(Debug)]
286struct ProgressingUntilStalled {
287    /// Have we, in fact, stalled?
288    ///
289    /// Made `Ready` by special code in the executor loop
290    finished: Poll<()>,
291
292    /// Waker
293    ///
294    /// Signalled by special code in the executor loop
295    waker: Option<Waker>,
296}
297
298/// Future from
299/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
300///
301/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
302///
303/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
304/// There can only be one at a time.
305#[derive(Educe)]
306#[educe(Debug)]
307struct ProgressUntilStalledFuture {
308    /// Executor's state; this future's state is in `.progressing_until_stalled`
309    #[educe(Debug(ignore))]
310    shared: Arc<Shared>,
311}
312
313/// Identifies a thread we know about - the executor thread, or a Subthread
314///
315/// Not related to `std::thread::ThreadId`.
316///
317/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
318///
319/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
320/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
321/// That's OK (and documented).
322#[derive(Copy, Clone, Eq, PartialEq, derive_more::Debug)]
323enum ThreadDescriptor {
324    /// Foreign - neither the (running) executor, nor a Subthread
325    #[debug("FOREIGN")]
326    Foreign,
327    /// The executor.
328    #[debug("Exe")]
329    Executor,
330    /// This task, which is a Subthread.
331    #[debug("{_0:?}")]
332    Subthread(TaskId),
333}
334
335/// Marker indicating that this task is a Subthread, not an async task.
336///
337/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
338#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
339struct IsSubthread;
340
341/// [`Shared::subthread_yield`] should set our task awake before switching to the executor
342#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
343struct SetAwake;
344
345thread_local! {
346    /// Identifies this thread.
347    pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
348        Cell::new(ThreadDescriptor::Foreign)
349    };
350}
351
352//---------- creation ----------
353
354impl MockExecutor {
355    /// Make a `MockExecutor` with default parameters
356    pub fn new() -> Self {
357        Self::default()
358    }
359
360    /// Make a `MockExecutor` with a specific `SchedulingPolicy`
361    pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
362        Data {
363            scheduling,
364            ..Default::default()
365        }
366        .into()
367    }
368}
369
370impl From<Data> for MockExecutor {
371    fn from(data: Data) -> MockExecutor {
372        let shared = Shared {
373            data: Mutex::new(data),
374            thread_condvar: std::sync::Condvar::new(),
375        };
376        MockExecutor {
377            shared: Arc::new(shared),
378        }
379    }
380}
381
382//---------- spawning ----------
383
384impl MockExecutor {
385    /// Spawn a task and return something to identify it
386    ///
387    /// `desc` should `Display` as some kind of short string (ideally without spaces)
388    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
389    ///
390    /// The returned value is an opaque task identifier which is very cheap to clone
391    /// and which can be used by the caller in debug logging,
392    /// if it's desired to correlate with the debug output from `MockExecutor`.
393    /// Most callers will want to ignore it.
394    ///
395    /// This method is infallible.  (The `MockExecutor` cannot be shut down.)
396    pub fn spawn_identified(
397        &self,
398        desc: impl Display,
399        fut: impl Future<Output = ()> + Send + 'static,
400    ) -> impl Debug + Clone + Send + 'static {
401        self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
402    }
403
404    /// Spawn a task and return its output for further usage
405    ///
406    /// `desc` should `Display` as some kind of short string (ideally without spaces)
407    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
408    pub fn spawn_join<T: Debug + Send + 'static>(
409        &self,
410        desc: impl Display,
411        fut: impl Future<Output = T> + Send + 'static,
412    ) -> impl Future<Output = T> {
413        let (tx, rx) = oneshot::channel();
414        self.spawn_identified(desc, async move {
415            let res = fut.await;
416            tx.send(res)
417                .expect("Failed to send future's output, did future panic?");
418        });
419        rx.map(|m| m.expect("Failed to receive future's output"))
420    }
421
422    /// Spawn a task and return its `TaskId`
423    ///
424    /// Convenience method for use by `spawn_identified` and `spawn_obj`.
425    /// The future passed to `block_on` is not handled here.
426    fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
427        let mut data = self.shared.lock();
428        data.insert_task(desc, TaskFutureInfo::Normal(fut))
429    }
430}
431
432impl Data {
433    /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
434    fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId {
435        let state = Awake;
436        let id = self.tasks.insert(Task {
437            state,
438            desc,
439            fut: Some(fut),
440        });
441        self.awake.push_back(id);
442        trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
443        id
444    }
445}
446
447impl Spawn for MockExecutor {
448    fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
449        self.spawn_internal("spawn_obj".into(), future);
450        Ok(())
451    }
452}
453
454impl MockExecutor {
455    /// Implementation of `spawn_blocking` and `blocking_io`
456    fn spawn_thread_inner<F, T>(&self, f: F) -> <Self as Blocking>::ThreadHandle<T>
457    where
458        F: FnOnce() -> T + Send + 'static,
459        T: Send + 'static,
460    {
461        // For the mock executor, everything runs on the same thread.
462        // If we need something more complex in the future, we can change this.
463        let (tx, rx) = oneshot::channel();
464        self.spawn_identified("Blocking".to_string(), async move {
465            match tx.send(f()) {
466                Ok(()) => (),
467                Err(_) => panic!("Failed to send future's output, did future panic?"),
468            }
469        });
470        rx.map(Box::new(|m| m.expect("Failed to receive future's output")))
471    }
472}
473
474impl Blocking for MockExecutor {
475    type ThreadHandle<T: Send + 'static> =
476        Map<Receiver<T>, Box<dyn FnOnce(Result<T, Canceled>) -> T>>;
477
478    fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
479    where
480        F: FnOnce() -> T + Send + 'static,
481        T: Send + 'static,
482    {
483        assert_matches!(
484            THREAD_DESCRIPTOR.get(),
485            ThreadDescriptor::Executor | ThreadDescriptor::Subthread(_),
486 "MockExecutor::spawn_blocking_io only allowed from future or subthread, being run by this executor"
487        );
488        self.spawn_thread_inner(f)
489    }
490
491    fn reenter_block_on<F>(&self, future: F) -> F::Output
492    where
493        F: Future,
494        F::Output: Send + 'static,
495    {
496        self.subthread_block_on_future(future)
497    }
498
499    fn blocking_io<F, T>(&self, f: F) -> impl Future<Output = T>
500    where
501        F: FnOnce() -> T + Send + 'static,
502        T: Send + 'static,
503    {
504        assert_eq!(
505            THREAD_DESCRIPTOR.get(),
506            ThreadDescriptor::Executor,
507            "MockExecutor::blocking_io only allowed from future being polled by this executor"
508        );
509        self.spawn_thread_inner(f)
510    }
511}
512
513//---------- block_on ----------
514
515impl ToplevelBlockOn for MockExecutor {
516    fn block_on<F>(&self, input_fut: F) -> F::Output
517    where
518        F: Future,
519    {
520        let mut value: Option<F::Output> = None;
521
522        // Box this just so that we can conveniently control precisely when it's dropped.
523        // (We could do this with Option and Pin::set but that seems clumsier.)
524        let mut input_fut = Box::pin(input_fut);
525
526        let run_store_fut = {
527            let value = &mut value;
528            let input_fut = &mut input_fut;
529            async {
530                trace!("MockExecutor block_on future...");
531                let t = input_fut.await;
532                trace!("MockExecutor block_on future returned...");
533                *value = Some(t);
534                trace!("MockExecutor block_on future exiting.");
535            }
536        };
537
538        {
539            pin_mut!(run_store_fut);
540
541            let main_id = self
542                .shared
543                .lock()
544                .insert_task("main".into(), TaskFutureInfo::Main);
545            trace!("MockExecutor {main_id:?} is task for block_on");
546            self.execute_to_completion(run_store_fut);
547        }
548
549        #[allow(clippy::let_and_return)] // clarity
550        let value = value.take().unwrap_or_else(|| {
551            // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
552            // use the latter, so that the debug dump is prefixed by this message.
553            let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
554            // write to tracing too, so the tracing log is clear about when we crashed
555            error!("all futures blocked, crashing...");
556
557            // Sequencing here is subtle.
558            //
559            // We should do the dump before dropping the input future, because the input
560            // future is likely to own things that, when dropped, wake up other tasks,
561            // rendering the dump inaccurate.
562            //
563            // But also, dropping the input future may well drop a ProgressUntilStalledFuture
564            // which then reenters us.  More generally, we mustn't call user code
565            // with the lock held.
566            //
567            // And, we mustn't panic with the data lock held.
568            //
569            // If value was Some, then this closure is dropped without being called,
570            // which drops the future after it has yielded the value, which is correct.
571            {
572                let mut data = self.shared.lock();
573                data.debug_dump();
574            }
575            drop(input_fut);
576
577            panic!(
578                r"
579all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
580"
581            );
582        });
583
584        value
585    }
586}
587
588//---------- execution - core implementation ----------
589
590impl MockExecutor {
591    /// Keep polling tasks until nothing more can be done
592    ///
593    /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
594    fn execute_to_completion(&self, mut main_fut: MainFuture) {
595        trace!("MockExecutor execute_to_completion...");
596        loop {
597            self.execute_until_first_stall(main_fut.as_mut());
598
599            // Handle `progressing_until_stalled`
600            let pus_waker = {
601                let mut data = self.shared.lock();
602                let pus = &mut data.progressing_until_stalled;
603                trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
604                let Some(pus) = pus else {
605                    // No progressing_until_stalled, we're actually done.
606                    break;
607                };
608                assert_eq!(
609                    pus.finished, Pending,
610                    "ProgressingUntilStalled finished twice?!"
611                );
612                pus.finished = Ready(());
613
614                // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
615                let waker = pus
616                    .waker
617                    .take()
618                    .expect("ProgressUntilStalledFuture not ever polled!");
619                drop(data);
620                let waker_copy = waker.clone();
621                let mut data = self.shared.lock();
622
623                let pus = &mut data.progressing_until_stalled;
624                if let Some(double) = pus
625                    .as_mut()
626                    .expect("progressing_until_stalled updated under our feet!")
627                    .waker
628                    .replace(waker)
629                {
630                    panic!("double progressing_until_stalled.waker! {double:?}");
631                }
632
633                waker_copy
634            };
635            pus_waker.wake();
636        }
637        trace!("MockExecutor execute_to_completion done");
638    }
639
640    /// Keep polling tasks until `awake` is empty
641    ///
642    /// (Ignores `progressing_until_stalled` - so if one is active,
643    /// will return when all other tasks have blocked.)
644    ///
645    /// # Panics
646    ///
647    /// Might malfunction or panic if called reentrantly
648    fn execute_until_first_stall(&self, main_fut: MainFuture) {
649        trace!("MockExecutor execute_until_first_stall ...");
650
651        assert_eq!(
652            THREAD_DESCRIPTOR.get(),
653            ThreadDescriptor::Foreign,
654            "MockExecutor executor re-entered"
655        );
656        THREAD_DESCRIPTOR.set(ThreadDescriptor::Executor);
657
658        let r = catch_unwind(AssertUnwindSafe(|| self.executor_main_loop(main_fut)));
659
660        THREAD_DESCRIPTOR.set(ThreadDescriptor::Foreign);
661
662        match r {
663            Ok(()) => trace!("MockExecutor execute_until_first_stall done."),
664            Err(e) => {
665                trace!("MockExecutor executor, or async task, panicked!");
666                panic_any(e)
667            }
668        }
669    }
670
671    /// Keep polling tasks until `awake` is empty (inner, executor main loop)
672    ///
673    /// This is only called from [`MockExecutor::execute_until_first_stall`],
674    /// so it could also be called `execute_until_first_stall_inner`.
675    #[allow(clippy::cognitive_complexity)]
676    fn executor_main_loop(&self, mut main_fut: MainFuture) {
677        'outer: loop {
678            // Take a `Awake` task off `awake` and make it `Asleep`
679            let (id, mut fut) = 'inner: loop {
680                let mut data = self.shared.lock();
681                let Some(id) = data.schedule() else {
682                    break 'outer;
683                };
684                let Some(task) = data.tasks.get_mut(id) else {
685                    trace!("MockExecutor {id:?} vanished");
686                    continue;
687                };
688                task.state = Asleep(vec![]);
689                let fut = task.fut.take().expect("future missing from task!");
690                break 'inner (id, fut);
691            };
692
693            // Poll the selected task
694            trace!("MockExecutor {id:?} polling...");
695            let waker = ActualWaker::make_waker(&self.shared, id);
696            let mut cx = Context::from_waker(&waker);
697            let r: Either<Poll<()>, IsSubthread> = match &mut fut {
698                TaskFutureInfo::Normal(fut) => Left(fut.poll_unpin(&mut cx)),
699                TaskFutureInfo::Main => Left(main_fut.as_mut().poll(&mut cx)),
700                TaskFutureInfo::Subthread => Right(IsSubthread),
701            };
702
703            // Deal with the returned `Poll`
704            let _fut_drop_late;
705            {
706                let mut data = self.shared.lock();
707                let task = data
708                    .tasks
709                    .get_mut(id)
710                    .expect("task vanished while we were polling it");
711
712                match r {
713                    Left(Pending) => {
714                        trace!("MockExecutor {id:?} -> Pending");
715                        if task.fut.is_some() {
716                            panic!("task reinserted while we polled it?!");
717                        }
718                        // The task might have been woken *by its own poll method*.
719                        // That's why we set it to `Asleep` *earlier* rather than here.
720                        // All we need to do is put the future back.
721                        task.fut = Some(fut);
722                    }
723                    Left(Ready(())) => {
724                        trace!("MockExecutor {id:?} -> Ready");
725                        // Oh, it finished!
726                        // It might be in `awake`, but that's allowed to contain stale tasks,
727                        // so we *don't* need to scan that list and remove it.
728                        data.tasks.remove(id);
729                        // It is important that we don't drop `fut` until we have released
730                        // the data lock, since it is an external type and might try to reenter
731                        // us (eg by calling spawn).  If we do that here, we risk deadlock.
732                        // So, move `fut` to a variable with scope outside the block with `data`.
733                        _fut_drop_late = fut;
734                    }
735                    Right(IsSubthread) => {
736                        trace!("MockExecutor {id:?} -> Ready, waking Subthread");
737                        // Task is a subthread, which has called thread_context_switch
738                        // to switch to us.  We "poll" it by switching back.
739
740                        // Put back `TFI::Subthread`, which was moved out temporarily, above.
741                        task.fut = Some(fut);
742
743                        self.shared.thread_context_switch(
744                            data,
745                            ThreadDescriptor::Executor,
746                            ThreadDescriptor::Subthread(id),
747                        );
748
749                        // Now, if the Subthread still exists, that's because it's switched
750                        // back to us, and is waiting in subthread_block_on_future again.
751                        // Or it might have ended, in which case it's not in `tasks` any more.
752                        // In any case we can go back to scheduling futures.
753                    }
754                }
755            }
756        }
757    }
758}
759
760impl Data {
761    /// Return the next task to run
762    ///
763    /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
764    /// The caller must restore the invariant!
765    fn schedule(&mut self) -> Option<TaskId> {
766        use SchedulingPolicy as SP;
767        match self.scheduling {
768            SP::Stack => self.awake.pop_back(),
769            SP::Queue => self.awake.pop_front(),
770        }
771    }
772}
773
774impl ActualWaker {
775    /// Obtain a strong reference to the executor's data
776    fn upgrade_data(&self) -> Option<Arc<Shared>> {
777        self.data.upgrade()
778    }
779
780    /// Wake the task corresponding to this `ActualWaker`
781    ///
782    /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
783    fn wake(&self) {
784        let Some(data) = self.upgrade_data() else {
785            // The executor is gone!  Don't try to wake.
786            return;
787        };
788        let mut data = data.lock();
789        let data = &mut *data;
790        trace!("MockExecutor {:?} wake", &self.id);
791        let Some(task) = data.tasks.get_mut(self.id) else {
792            return;
793        };
794        task.set_awake(self.id, &mut data.awake);
795    }
796
797    /// Create and return a `Waker` for task `id`
798    fn make_waker(shared: &Arc<Shared>, id: TaskId) -> Waker {
799        ActualWaker {
800            data: Arc::downgrade(shared),
801            id,
802        }
803        .new_waker()
804    }
805}
806
807//---------- "progress until stalled" functionality ----------
808
809impl MockExecutor {
810    /// Run tasks in the current executor until every other task is waiting
811    ///
812    /// # Panics
813    ///
814    /// Might malfunction or panic if more than one such call is running at once.
815    ///
816    /// (Ie, you must `.await` or drop the returned `Future`
817    /// before calling this method again.)
818    ///
819    /// Must be called and awaited within a future being run by `self`.
820    pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
821        let mut data = self.shared.lock();
822        assert!(
823            data.progressing_until_stalled.is_none(),
824            "progress_until_stalled called more than once"
825        );
826        trace!("MockExecutor progress_until_stalled...");
827        data.progressing_until_stalled = Some(ProgressingUntilStalled {
828            finished: Pending,
829            waker: None,
830        });
831        ProgressUntilStalledFuture {
832            shared: self.shared.clone(),
833        }
834    }
835}
836
837impl Future for ProgressUntilStalledFuture {
838    type Output = ();
839
840    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
841        let waker = cx.waker().clone();
842        let mut data = self.shared.lock();
843        let pus = data.progressing_until_stalled.as_mut();
844        trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
845        let pus = pus.expect("ProgressingUntilStalled missing");
846        pus.waker = Some(waker);
847        pus.finished
848    }
849}
850
851impl Drop for ProgressUntilStalledFuture {
852    fn drop(&mut self) {
853        self.shared.lock().progressing_until_stalled = None;
854    }
855}
856
857//---------- (sub)threads ----------
858
859impl MockExecutor {
860    /// Spawn a "Subthread", for processing in a sync context
861    ///
862    /// `call` will be run on a separate thread, called a "Subthread".
863    ///
864    /// But it will **not run simultaneously** with the executor,
865    /// nor with other Subthreads.
866    /// So Subthreads are somewhat like coroutines.
867    ///
868    /// `call` must be capable of making progress without waiting for any other Subthreads.
869    /// `call` may wait for async futures, using
870    /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
871    ///
872    /// Subthreads may be used for cpubound activity,
873    /// or synchronous IO (such as large volumes of disk activity),
874    /// provided that the synchronous code will reliably make progress,
875    /// without waiting (directly or indirectly) for any async task or Subthread -
876    /// except via `subthread_block_on_future`.
877    ///
878    /// # Subthreads vs raw `std::thread` threads
879    ///
880    /// Programs using `MockExecutor` may use `std::thread` threads directly.
881    /// However, this is not recommended.  There are severe limitations:
882    ///
883    ///  * Only a Subthread can re-enter the async context from sync code:
884    ///    this must be done with
885    ///    using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
886    ///    (Re-entering the executor with
887    ///    [`block_on`](tor_rtcompat::ToplevelBlockOn::block_on)
888    ///    is not allowed.)
889    ///  * If async tasks want to suspend waiting for synchronous code,
890    ///    the synchronous code must run on a Subthread.
891    ///    This allows the `MockExecutor` to know when
892    ///    that synchronous code is still making progress.
893    ///    (This is needed for
894    ///    [`progress_until_stalled`](MockExecutor::progress_until_stalled)
895    ///    and the facilities which use it, such as
896    ///    [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
897    ///  * Subthreads never run in parallel -
898    ///    they only run as scheduled deterministically by the `MockExecutor`.
899    ///    So using Subthreads eliminates a source of test nonndeterminism.
900    ///    (Execution order is still varied due to explicitly varying the scheduling policy.)
901    ///
902    /// # Panics, abuse, and malfunctions
903    ///
904    /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
905    /// The application code should to do something about it if this happens,
906    /// typically, logging errors, tearing things down, or failing a test case.
907    ///
908    /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
909    /// (So, typically, if the thread supposed to run the executor panics,
910    /// for example because a future or the executor itself panics,
911    /// all the subthreads will become stuck - effectively, they'll be leaked.)
912    ///
913    /// `spawn_subthread` panics if OS thread spawning fails.
914    /// (Like `std::thread::spawn()` does.)
915    ///
916    /// `MockExecutor`s will malfunction or panic if
917    /// any executor invocation method (eg `block_on`) is called on a Subthread.
918    pub fn subthread_spawn<T: Send + 'static>(
919        &self,
920        desc: impl Display,
921        call: impl FnOnce() -> T + Send + 'static,
922    ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
923        let desc = desc.to_string();
924        let (output_tx, output_rx) = oneshot::channel();
925
926        // NB: we don't know which thread we're on!
927        // In principle we might be on another Subthread.
928        // So we can't context switch here.  That would be very confusing.
929        //
930        // Instead, we prepare the new Subthread as follows:
931        //   - There is a task in the executor
932        //   - The task is ready to be polled, whenever the executor decides to
933        //   - The thread starts running right away, but immediately waits until it is scheduled
934        // See `subthread_entrypoint`.
935
936        {
937            let mut data = self.shared.lock();
938            let id = data.insert_task(desc.clone(), TaskFutureInfo::Subthread);
939
940            let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
941                .name(desc)
942                .spawn({
943                    let shared = self.shared.clone();
944                    move || shared.subthread_entrypoint(id, call, output_tx)
945                })
946                .expect("spawn failed");
947        }
948
949        output_rx.map(|r| {
950            r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
951        })
952    }
953
954    /// Call an async `Future` from a Subthread
955    ///
956    /// Blocks the Subthread, and arranges to run async tasks,
957    /// including `fut`, until `fut` completes.
958    ///
959    /// `fut` is polled on the executor thread, not on the Subthread.
960    /// (We may change that in the future, allowing passing a non-`Send` future.)
961    ///
962    /// # Panics, abuse, and malfunctions
963    ///
964    /// `subthread_block_on_future` will malfunction or panic
965    /// if called on a thread that isn't a Subthread from the same `MockExecutor`
966    /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
967    ///
968    /// If `fut` itself panics, the executor will panic.
969    ///
970    /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
971    /// See `spawn_subthread`.
972    #[allow(clippy::cognitive_complexity)] // Splitting this up would be worse
973    pub fn subthread_block_on_future<T: Send + 'static>(&self, fut: impl Future<Output = T>) -> T {
974        let id = match THREAD_DESCRIPTOR.get() {
975            ThreadDescriptor::Subthread(id) => id,
976            ThreadDescriptor::Executor => {
977                panic!("subthread_block_on_future called from MockExecutor thread (async task?)")
978            }
979            ThreadDescriptor::Foreign => panic!(
980    "subthread_block_on_future called on foreign thread (not spawned with spawn_subthread)"
981            ),
982        };
983        trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
984        let mut fut = pin!(fut);
985
986        // We yield once before the first poll, and once after Ready, to shake up the
987        // execution order a bit, depending on the scheduling policy.
988        let yield_ = |set_awake| self.shared.subthread_yield(id, set_awake);
989        yield_(Some(SetAwake));
990
991        let ret = loop {
992            // Poll the provided future
993            trace!("MockExecutor thread {id:?}, s.t._block_on_future polling...");
994            let waker = ActualWaker::make_waker(&self.shared, id);
995            let mut cx = Context::from_waker(&waker);
996            let r: Poll<T> = fut.as_mut().poll(&mut cx);
997
998            if let Ready(r) = r {
999                trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Ready");
1000                break r;
1001            }
1002
1003            // Pending.  Switch back to the exeuctor thread.
1004            // When the future becomes ready, the Waker will be woken, waking the task,
1005            // so that the executor will "poll" us again.
1006            trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Pending");
1007
1008            yield_(None);
1009        };
1010
1011        yield_(Some(SetAwake));
1012
1013        trace!("MockExecutor thread {id:?}, subthread_block_on_future complete.");
1014
1015        ret
1016    }
1017}
1018
1019impl Shared {
1020    /// Main entrypoint function for a Subthread
1021    ///
1022    /// Entered on a new `std::thread` thread created by
1023    /// [`subthread_spawn`](MockExecutor::subthread_spawn).
1024    ///
1025    /// When `call` completes, sends its returned value `T` to `output_tx`.
1026    fn subthread_entrypoint<T: Send + 'static>(
1027        self: Arc<Self>,
1028        id: TaskId,
1029        call: impl FnOnce() -> T + Send + 'static,
1030        output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
1031    ) {
1032        THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
1033        trace!("MockExecutor thread {id:?}, entrypoint");
1034
1035        // We start out Awake, but we wait for the executor to tell us to run.
1036        // This will be done the first time the task is "polled".
1037        {
1038            let data = self.lock();
1039            self.thread_context_switch_waitfor_instruction_to_run(
1040                data,
1041                ThreadDescriptor::Subthread(id),
1042            );
1043        }
1044
1045        trace!("MockExecutor thread {id:?}, entering user code");
1046
1047        // Run the user's actual thread function.
1048        // This will typically reenter us via subthread_block_on_future.
1049        let ret = catch_unwind(AssertUnwindSafe(call));
1050
1051        trace!("MockExecutor thread {id:?}, completed user code");
1052
1053        // This makes the return value from subthread_spawn ready.
1054        // It will be polled by the executor in due course, presumably.
1055
1056        output_tx.send(ret).unwrap_or_else(
1057            #[allow(clippy::unnecessary_lazy_evaluations)]
1058            |_| {}, // receiver dropped, maybe executor dropped or something?
1059        );
1060
1061        {
1062            let mut data = self.lock();
1063
1064            // Never poll this task again (so never schedule this thread)
1065            let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
1066
1067            // Tell the executor it is scheduled now.
1068            // We carry on exiting, in parallel (holding the data lock).
1069            self.thread_context_switch_send_instruction_to_run(
1070                &mut data,
1071                ThreadDescriptor::Subthread(id),
1072                ThreadDescriptor::Executor,
1073            );
1074        }
1075    }
1076
1077    /// Yield back to the executor from a subthread
1078    ///
1079    /// Checks that things are in order
1080    /// (in particular, that this task is in the data structure as a subhtread)
1081    /// and switches to the executor thread.
1082    ///
1083    /// The caller must arrange that the task gets woken.
1084    ///
1085    /// With [`SetAwake`], sets our task awake, so that we'll be polled
1086    /// again as soon as we get to the top of the executor's queue.
1087    /// Otherwise, we'll be reentered after someone wakes a [`Waker`] for the task.
1088    fn subthread_yield(&self, us: TaskId, set_awake: Option<SetAwake>) {
1089        let mut data = self.lock();
1090        {
1091            let data = &mut *data;
1092            let task = data.tasks.get_mut(us).expect("Subthread task vanished!");
1093            match &task.fut {
1094                Some(TaskFutureInfo::Subthread) => {}
1095                other => panic!("subthread_block_on_future but TFI {other:?}"),
1096            };
1097            if let Some(SetAwake) = set_awake {
1098                task.set_awake(us, &mut data.awake);
1099            }
1100        }
1101        self.thread_context_switch(
1102            data,
1103            ThreadDescriptor::Subthread(us),
1104            ThreadDescriptor::Executor,
1105        );
1106    }
1107
1108    /// Switch from (sub)thread `us` to (sub)thread `them`
1109    ///
1110    /// Returns when someone calls `thread_context_switch(.., us)`.
1111    fn thread_context_switch(
1112        &self,
1113        mut data: MutexGuard<Data>,
1114        us: ThreadDescriptor,
1115        them: ThreadDescriptor,
1116    ) {
1117        trace!("MockExecutor thread {us:?}, switching to {them:?}");
1118        self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
1119        self.thread_context_switch_waitfor_instruction_to_run(data, us);
1120    }
1121
1122    /// Instruct the (sub)thread `them` to run
1123    ///
1124    /// Update `thread_to_run`, which will wake up `them`'s
1125    /// call to `thread_context_switch_waitfor_instruction_to_run`.
1126    ///
1127    /// Must be called from (sub)thread `us`.
1128    /// Part of `thread_context_switch`, not normally called directly.
1129    fn thread_context_switch_send_instruction_to_run(
1130        &self,
1131        data: &mut MutexGuard<Data>,
1132        us: ThreadDescriptor,
1133        them: ThreadDescriptor,
1134    ) {
1135        assert_eq!(data.thread_to_run, us);
1136        data.thread_to_run = them;
1137        self.thread_condvar.notify_all();
1138    }
1139
1140    /// Await an instruction for this thread, `us`, to run
1141    ///
1142    /// Waits for `thread_to_run` to be `us`,
1143    /// waiting for `thread_condvar` as necessary.
1144    ///
1145    /// Part of `thread_context_switch`, not normally called directly.
1146    fn thread_context_switch_waitfor_instruction_to_run(
1147        &self,
1148        data: MutexGuard<Data>,
1149        us: ThreadDescriptor,
1150    ) {
1151        #[allow(let_underscore_lock)]
1152        let _: MutexGuard<_> = self
1153            .thread_condvar
1154            .wait_while(data, |data| {
1155                let live = data.thread_to_run;
1156                let resume = live == us;
1157                if resume {
1158                    trace!("MockExecutor thread {us:?}, resuming");
1159                } else {
1160                    trace!("MockExecutor thread {us:?}, waiting for {live:?}");
1161                }
1162                // We're in `.wait_while`, not `.wait_until`.  Confusing.
1163                !resume
1164            })
1165            .expect("data lock poisoned");
1166    }
1167}
1168
1169//---------- ancillary and convenience functions ----------
1170
1171/// Trait to let us assert at compile time that something is nicely `Sync` etc.
1172#[allow(dead_code)] // yes, we don't *use* anything from this trait
1173trait EnsureSyncSend: Sync + Send + 'static {}
1174impl EnsureSyncSend for ActualWaker {}
1175impl EnsureSyncSend for MockExecutor {}
1176
1177impl MockExecutor {
1178    /// Return the number of tasks running in this executor
1179    ///
1180    /// One possible use is for a test case to check that task(s)
1181    /// that ought to have exited, have indeed done so.
1182    ///
1183    /// In the usual case, the answer will be at least 1,
1184    /// because it counts the future passed to
1185    /// [`block_on`](MockExecutor::block_on)
1186    /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
1187    pub fn n_tasks(&self) -> usize {
1188        self.shared.lock().tasks.len()
1189    }
1190}
1191
1192impl Shared {
1193    /// Lock and obtain the guard
1194    ///
1195    /// Convenience method which panics on poison
1196    fn lock(&self) -> MutexGuard<Data> {
1197        self.data.lock().expect("data lock poisoned")
1198    }
1199}
1200
1201impl Task {
1202    /// Set task `id` to `Awake` and arrange that it will be polled.
1203    fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1204        match self.state {
1205            Awake => {}
1206            Asleep(_) => {
1207                self.state = Awake;
1208                data_awake.push_back(id);
1209            }
1210        }
1211    }
1212}
1213
1214//---------- ActualWaker as RawWaker ----------
1215
1216/// Using [`ActualWaker`] in a [`RawWaker`]
1217///
1218/// We need to make a
1219/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
1220/// which contains an
1221/// [`ActualWaker`] (our actual waker implementation, also safe).
1222///
1223/// `std` offers `Waker::from<Arc<impl Wake>>`.
1224/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
1225///
1226/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
1227/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
1228///
1229/// SAFETY
1230///
1231///  * The data pointer is `Box::<ActualWaker>::into_raw()`
1232///  * We share these when we clone
1233///  * No-one is allowed `&mut ActualWaker` unless there are no other clones
1234///  * So we may make references `&ActualWaker`
1235impl ActualWaker {
1236    /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1237    fn new_waker(self) -> Waker {
1238        unsafe { Waker::from_raw(self.raw_new()) }
1239    }
1240
1241    /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
1242    fn raw_new(self) -> RawWaker {
1243        let self_: Box<ActualWaker> = self.into();
1244        let self_: *mut ActualWaker = Box::into_raw(self_);
1245        let self_: *const () = self_ as _;
1246        RawWaker::new(self_, &RAW_WAKER_VTABLE)
1247    }
1248
1249    /// Implementation of [`RawWakerVTable`]'s `clone`
1250    unsafe fn raw_clone(self_: *const ()) -> RawWaker {
1251        let self_: *const ActualWaker = self_ as _;
1252        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1253        let copy: ActualWaker = self_.clone();
1254        copy.raw_new()
1255    }
1256
1257    /// Implementation of [`RawWakerVTable`]'s `wake`
1258    unsafe fn raw_wake(self_: *const ()) {
1259        Self::raw_wake_by_ref(self_);
1260        Self::raw_drop(self_);
1261    }
1262
1263    /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1264    unsafe fn raw_wake_by_ref(self_: *const ()) {
1265        let self_: *const ActualWaker = self_ as _;
1266        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1267        self_.wake();
1268    }
1269
1270    /// Implementation of [`RawWakerVTable`]'s `drop`
1271    unsafe fn raw_drop(self_: *const ()) {
1272        let self_: *mut ActualWaker = self_ as _;
1273        let self_: Box<ActualWaker> = Box::from_raw(self_);
1274        drop(self_);
1275    }
1276}
1277
1278/// vtable for `Box<ActualWaker>` as `RawWaker`
1279//
1280// This ought to be in the impl block above, but
1281//   "associated `static` items are not allowed"
1282static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1283    ActualWaker::raw_clone,
1284    ActualWaker::raw_wake,
1285    ActualWaker::raw_wake_by_ref,
1286    ActualWaker::raw_drop,
1287);
1288
1289//---------- Sleep location tracking and dumping ----------
1290
1291/// We record "where a future went to sleep" as (just) a backtrace
1292///
1293/// This type alias allows us to mock `Backtrace` for miri.
1294/// (It also insulates from future choices about sleep location representation.0
1295#[cfg(not(miri))]
1296type SleepLocation = Backtrace;
1297
1298impl Data {
1299    /// Dump tasks and their sleep location backtraces
1300    fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
1301        for (id, task) in self.tasks.iter() {
1302            let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
1303            match &task.state {
1304                Awake => {
1305                    prefix(f)?;
1306                    writeln!(f, "awake")?;
1307                }
1308                Asleep(locs) => {
1309                    let n = locs.len();
1310                    for (i, loc) in locs.iter().enumerate() {
1311                        prefix(f)?;
1312                        writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
1313                    }
1314                    if n == 0 {
1315                        prefix(f)?;
1316                        writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
1317                    }
1318                }
1319            }
1320        }
1321        writeln!(
1322            f,
1323            "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
1324        )?;
1325        Ok(())
1326    }
1327}
1328
1329/// Track sleep locations via `<Waker as Clone>`.
1330///
1331/// See [`MockExecutor::debug_dump`] for the explanation.
1332impl Clone for ActualWaker {
1333    fn clone(&self) -> Self {
1334        let id = self.id;
1335
1336        if let Some(data) = self.upgrade_data() {
1337            // If the executor is gone, there is nothing to adjust
1338            let mut data = data.lock();
1339            if let Some(task) = data.tasks.get_mut(self.id) {
1340                match &mut task.state {
1341                    Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
1342                    Asleep(locs) => locs.push(SleepLocation::force_capture()),
1343                }
1344            } else {
1345                trace!("MockExecutor cloned waker for dead task {id:?}");
1346            }
1347        }
1348
1349        ActualWaker {
1350            data: self.data.clone(),
1351            id,
1352        }
1353    }
1354}
1355
1356//---------- API for full debug dump ----------
1357
1358/// Debugging dump of a `MockExecutor`'s state
1359///
1360/// Returned by [`MockExecutor::as_debug_dump`]
1361//
1362// Existence implies backtraces have been resolved
1363//
1364// We use `Either` so that we can also use this internally when we have &mut Data.
1365pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
1366
1367impl MockExecutor {
1368    /// Dump the executor's state including backtraces of waiting tasks, to stderr
1369    ///
1370    /// This is considerably more extensive than simply
1371    /// `MockExecutor as Debug`.
1372    ///
1373    /// (This is a convenience method, which wraps
1374    /// [`MockExecutor::as_debug_dump()`].
1375    ///
1376    /// ### Backtrace salience (possible spurious traces)
1377    ///
1378    /// **Summary**
1379    ///
1380    /// The technique used to capture backtraces when futures sleep is not 100% exact.
1381    /// It will usually show all the actual sleeping sites,
1382    /// but it might also show other backtraces which were part of
1383    /// the implementation of some complex relevant future.
1384    ///
1385    /// **Details**
1386    ///
1387    /// When a future's implementation wants to sleep,
1388    /// it needs to record the [`Waker`] (from the [`Context`])
1389    /// so that the "other end" can call `.wake()` on it later,
1390    /// when the future should be woken.
1391    ///
1392    /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
1393    /// the future must clone the `Waker`,
1394    /// and it must do so in within the `poll()` call.
1395    ///
1396    /// A future which is waiting in a `select!` will typically
1397    /// show multiple traces, one for each branch.
1398    /// But,
1399    /// if a future sleeps on one thing, and then when polled again later,
1400    /// sleeps on something different, without waking up in between,
1401    /// both backtrace locations will be shown.
1402    /// And,
1403    /// a complicated future contraption *might* clone the `Waker` more times.
1404    /// So not every backtrace will necessarily be informative.
1405    ///
1406    /// ### Panics
1407    ///
1408    /// Panics on write errors.
1409    pub fn debug_dump(&self) {
1410        self.as_debug_dump().to_stderr();
1411    }
1412
1413    /// Dump the executor's state including backtraces of waiting tasks
1414    ///
1415    /// This is considerably more extensive than simply
1416    /// `MockExecutor as Debug`.
1417    ///
1418    /// Returns an object for formatting with [`Debug`].
1419    /// To simply print the dump to stderr (eg in a test),
1420    /// use [`.debug_dump()`](MockExecutor::debug_dump).
1421    ///
1422    /// **Backtrace salience (possible spurious traces)** -
1423    /// see [`.debug_dump()`](MockExecutor::debug_dump).
1424    pub fn as_debug_dump(&self) -> DebugDump {
1425        let data = self.shared.lock();
1426        DebugDump(Right(data))
1427    }
1428}
1429
1430impl Data {
1431    /// Convenience function: dump including backtraces, to stderr
1432    fn debug_dump(&mut self) {
1433        DebugDump(Left(self)).to_stderr();
1434    }
1435}
1436
1437impl DebugDump<'_> {
1438    /// Convenience function: dump tasks and backtraces to stderr
1439    #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
1440    fn to_stderr(self) {
1441        write!(io::stderr().lock(), "{:?}", self)
1442            .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
1443    }
1444}
1445
1446//---------- bespoke Debug impls ----------
1447
1448impl Debug for DebugDump<'_> {
1449    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1450        let self_: &Data = &self.0;
1451
1452        writeln!(f, "MockExecutor state:\n{self_:#?}")?;
1453        writeln!(f, "MockExecutor task dump:")?;
1454        self_.dump_backtraces(f)?;
1455
1456        Ok(())
1457    }
1458}
1459
1460// See `impl Debug for Data` for notes on the output
1461impl Debug for Task {
1462    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1463        let Task { desc, state, fut } = self;
1464        write!(f, "{:?}", desc)?;
1465        write!(f, "=")?;
1466        match fut {
1467            None => write!(f, "P")?,
1468            Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
1469            Some(TaskFutureInfo::Main) => write!(f, "m")?,
1470            Some(TaskFutureInfo::Subthread) => write!(f, "T")?,
1471        }
1472        match state {
1473            Awake => write!(f, "W")?,
1474            Asleep(locs) => write!(f, "s{}", locs.len())?,
1475        };
1476        Ok(())
1477    }
1478}
1479
1480/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
1481///
1482/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
1483///
1484/// `FLAGS` are:
1485///
1486///  * `T`: this task is for a Subthread (from subthread_spawn).
1487///  * `P`: this task is being polled (its `TaskFutureInfo` is absent)
1488///  * `f`: this is a normal task with a future and its future is present in `Data`
1489///  * `m`: this is the main task from `block_on`
1490///
1491///  * `W`: the task is awake
1492///  * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
1493//
1494// We do it this way because the naive dump from derive is very expansive
1495// and makes it impossible to see the wood for the trees.
1496// This very compact representation it easier to find a task of interest in the output.
1497//
1498// This is implemented in `impl Debug for Task`.
1499//
1500//
1501// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
1502// This has caused quite some fallout.  https://github.com/rust-lang/rust/pull/85200
1503// I think derive_more emits #[automatically_derived], so that even though we use this
1504// in our Debug impl, that construction is unused.
1505#[allow(dead_code)]
1506struct DebugTasks<'d, F>(&'d Data, F);
1507
1508// See `impl Debug for Data` for notes on the output
1509impl<F, I> Debug for DebugTasks<'_, F>
1510where
1511    F: Fn() -> I,
1512    I: Iterator<Item = TaskId>,
1513{
1514    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1515        let DebugTasks(data, ids) = self;
1516        for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
1517            write!(f, "{delim}{id:?}")?;
1518            match data.tasks.get(id) {
1519                None => write!(f, "-")?,
1520                Some(task) => write!(f, "={task:?}")?,
1521            }
1522        }
1523        Ok(())
1524    }
1525}
1526
1527/// Mock `Backtrace` for miri
1528///
1529/// See also the not-miri `type SleepLocation`, alias above.
1530#[cfg(miri)]
1531mod miri_sleep_location {
1532    #[derive(Debug, derive_more::Display)]
1533    #[display("<SleepLocation>")]
1534    pub(super) struct SleepLocation {}
1535
1536    impl SleepLocation {
1537        pub(super) fn force_capture() -> Self {
1538            SleepLocation {}
1539        }
1540    }
1541}
1542#[cfg(miri)]
1543use miri_sleep_location::SleepLocation;
1544
1545#[cfg(test)]
1546mod test {
1547    // @@ begin test lint list maintained by maint/add_warning @@
1548    #![allow(clippy::bool_assert_comparison)]
1549    #![allow(clippy::clone_on_copy)]
1550    #![allow(clippy::dbg_macro)]
1551    #![allow(clippy::mixed_attributes_style)]
1552    #![allow(clippy::print_stderr)]
1553    #![allow(clippy::print_stdout)]
1554    #![allow(clippy::single_char_pattern)]
1555    #![allow(clippy::unwrap_used)]
1556    #![allow(clippy::unchecked_duration_subtraction)]
1557    #![allow(clippy::useless_vec)]
1558    #![allow(clippy::needless_pass_by_value)]
1559    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1560    use super::*;
1561    use futures::channel::mpsc;
1562    use futures::{SinkExt as _, StreamExt as _};
1563    use strum::IntoEnumIterator;
1564    use tracing::info;
1565
1566    #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
1567    use tracing_test::traced_test;
1568
1569    fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
1570        // This duplicates the part of the logic in MockRuntime::test_with_various which
1571        // relates to MockExecutor, because we don't have a MockRuntime::builder.
1572        // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
1573        SchedulingPolicy::iter().map(|scheduling| {
1574            eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
1575            MockExecutor::with_scheduling(scheduling)
1576        })
1577    }
1578
1579    #[cfg_attr(not(miri), traced_test)]
1580    #[test]
1581    fn simple() {
1582        let runtime = MockExecutor::default();
1583        let val = runtime.block_on(async { 42 });
1584        assert_eq!(val, 42);
1585    }
1586
1587    #[cfg_attr(not(miri), traced_test)]
1588    #[test]
1589    fn stall() {
1590        let runtime = MockExecutor::default();
1591
1592        runtime.block_on({
1593            let runtime = runtime.clone();
1594            async move {
1595                const N: usize = 3;
1596                let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
1597                    (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
1598
1599                let mut rx_n = rxs.pop().unwrap();
1600
1601                for (i, mut rx) in rxs.into_iter().enumerate() {
1602                    runtime.spawn_identified(i, {
1603                        let mut txs = txs.clone();
1604                        async move {
1605                            loop {
1606                                eprintln!("task {i} rx...");
1607                                let v = rx.next().await.unwrap();
1608                                let nv = v + 1;
1609                                eprintln!("task {i} rx {v}, tx {nv}");
1610                                let v = nv;
1611                                txs[v].send(v).await.unwrap();
1612                            }
1613                        }
1614                    });
1615                }
1616
1617                dbg!();
1618                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1619
1620                dbg!();
1621                runtime.progress_until_stalled().await;
1622
1623                dbg!();
1624                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1625
1626                dbg!();
1627                txs[0].send(0).await.unwrap();
1628
1629                dbg!();
1630                runtime.progress_until_stalled().await;
1631
1632                dbg!();
1633                let r = rx_n.next().await;
1634                assert_eq!(r, Some(N - 1));
1635
1636                dbg!();
1637                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1638
1639                runtime.spawn_identified("tx", {
1640                    let txs = txs.clone();
1641                    async {
1642                        eprintln!("sending task...");
1643                        for (i, mut tx) in txs.into_iter().enumerate() {
1644                            eprintln!("sending 0 to {i}...");
1645                            tx.send(0).await.unwrap();
1646                        }
1647                        eprintln!("sending task done");
1648                    }
1649                });
1650
1651                runtime.debug_dump();
1652
1653                for i in 0..txs.len() {
1654                    eprintln!("main {i} wait stall...");
1655                    runtime.progress_until_stalled().await;
1656                    eprintln!("main {i} rx wait...");
1657                    let r = rx_n.next().await;
1658                    eprintln!("main {i} rx = {r:?}");
1659                    assert!(r == Some(0) || r == Some(N - 1));
1660                }
1661
1662                eprintln!("finishing...");
1663                runtime.progress_until_stalled().await;
1664                eprintln!("finished.");
1665            }
1666        });
1667    }
1668
1669    #[cfg_attr(not(miri), traced_test)]
1670    #[test]
1671    fn spawn_blocking() {
1672        let runtime = MockExecutor::default();
1673
1674        runtime.block_on({
1675            let runtime = runtime.clone();
1676            async move {
1677                let thr_1 = runtime.spawn_blocking(|| 42);
1678                let thr_2 = runtime.spawn_blocking(|| 99);
1679
1680                assert_eq!(thr_2.await, 99);
1681                assert_eq!(thr_1.await, 42);
1682            }
1683        });
1684    }
1685
1686    #[cfg_attr(not(miri), traced_test)]
1687    #[test]
1688    fn drop_reentrancy() {
1689        // Check that dropping a completed task future is done *outside* the data lock.
1690        // Involves a contrived future whose Drop impl reenters the executor.
1691        //
1692        // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
1693        // is replaced with `drop(fut)` (dropping the future at the wrong moment),
1694        // we do indeed get deadlock, so this test case is working.
1695
1696        struct ReentersOnDrop {
1697            runtime: MockExecutor,
1698        }
1699        impl Future for ReentersOnDrop {
1700            type Output = ();
1701            fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
1702                Poll::Ready(())
1703            }
1704        }
1705        impl Drop for ReentersOnDrop {
1706            fn drop(&mut self) {
1707                self.runtime
1708                    .spawn_identified("dummy", futures::future::ready(()));
1709            }
1710        }
1711
1712        for runtime in various_mock_executors() {
1713            runtime.block_on(async {
1714                runtime.spawn_identified("trapper", {
1715                    let runtime = runtime.clone();
1716                    ReentersOnDrop { runtime }
1717                });
1718            });
1719        }
1720    }
1721
1722    #[cfg_attr(not(miri), traced_test)]
1723    #[test]
1724    fn subthread_oneshot() {
1725        for runtime in various_mock_executors() {
1726            runtime.block_on(async {
1727                let (tx, rx) = oneshot::channel();
1728                info!("spawning subthread");
1729                let thr = runtime.subthread_spawn("thr1", {
1730                    let runtime = runtime.clone();
1731                    move || {
1732                        info!("subthread_block_on_future...");
1733                        let i = runtime.subthread_block_on_future(rx).unwrap();
1734                        info!("subthread_block_on_future => {i}");
1735                        i + 1
1736                    }
1737                });
1738                info!("main task sending");
1739                tx.send(12).unwrap();
1740                info!("main task sent");
1741                let r = thr.await.unwrap();
1742                info!("main task thr => {r}");
1743                assert_eq!(r, 13);
1744            });
1745        }
1746    }
1747
1748    #[cfg_attr(not(miri), traced_test)]
1749    #[test]
1750    #[allow(clippy::cognitive_complexity)] // It's is not that complicated, really.
1751    fn subthread_pingpong() {
1752        for runtime in various_mock_executors() {
1753            runtime.block_on(async {
1754                let (mut i_tx, mut i_rx) = mpsc::channel(1);
1755                let (mut o_tx, mut o_rx) = mpsc::channel(1);
1756                info!("spawning subthread");
1757                let thr = runtime.subthread_spawn("thr", {
1758                    let runtime = runtime.clone();
1759                    move || {
1760                        while let Some(i) = {
1761                            info!("thread receiving ...");
1762                            runtime.subthread_block_on_future(i_rx.next())
1763                        } {
1764                            let o = i + 12;
1765                            info!("thread received {i}, sending {o}");
1766                            runtime.subthread_block_on_future(o_tx.send(o)).unwrap();
1767                            info!("thread sent {o}");
1768                        }
1769                        info!("thread exiting");
1770                        42
1771                    }
1772                });
1773                for i in 0..2 {
1774                    info!("main task sending {i}");
1775                    i_tx.send(i).await.unwrap();
1776                    info!("main task sent {i}");
1777                    let o = o_rx.next().await.unwrap();
1778                    info!("main task recv => {o}");
1779                    assert_eq!(o, i + 12);
1780                }
1781                info!("main task dropping sender");
1782                drop(i_tx);
1783                info!("main task awaiting thread");
1784                let r = thr.await.unwrap();
1785                info!("main task complete");
1786                assert_eq!(r, 42);
1787            });
1788        }
1789    }
1790}