tor_rtmock/
task.rs

1//! Executor for running tests with mocked environment
2//!
3//! See [`MockExecutor`]
4
5use std::any::Any;
6use std::cell::Cell;
7use std::collections::VecDeque;
8use std::fmt::{self, Debug, Display};
9use std::future::Future;
10use std::io::{self, Write as _};
11use std::iter;
12use std::mem;
13use std::panic::{catch_unwind, AssertUnwindSafe};
14use std::pin::Pin;
15use std::sync::{Arc, Mutex, MutexGuard, Weak};
16use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
17
18use futures::future::Map;
19use futures::pin_mut;
20use futures::task::{FutureObj, Spawn, SpawnError};
21use futures::FutureExt as _;
22
23use educe::Educe;
24use itertools::Either;
25use itertools::{chain, izip};
26use slotmap_careful::DenseSlotMap;
27use std::backtrace::Backtrace;
28use strum::EnumIter;
29
30// NB: when using traced_test, the trace! and error! output here is generally suppressed
31// in tests of other crates.  To see it, you can write something like this
32// (in the dev-dependencies of the crate whose tests you're running):
33//    tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
34use tracing::{error, trace};
35
36use oneshot_fused_workaround::{self as oneshot, Canceled, Receiver};
37use tor_error::error_report;
38use tor_rtcompat::{BlockOn, SpawnBlocking};
39
40use Poll::*;
41use TaskState::*;
42
43/// Type-erased future, one for each of our (normal) tasks
44type TaskFuture = FutureObj<'static, ()>;
45
46/// Future for the argument to `block_on`, which is handled specially
47type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
48
49//---------- principal data structures ----------
50
51/// Executor for running tests with mocked environment
52///
53/// For test cases which don't actually wait for anything in the real world.
54///
55/// This is the executor.
56/// It implements [`Spawn`] and [`BlockOn`]
57///
58/// It will usually be used as part of a `MockRuntime`.
59///
60/// To run futures, call [`BlockOn::block_on`],
61///
62/// # Restricted environment
63///
64/// Tests run with this executor must not attempt to block
65/// on anything "outside":
66/// every future that anything awaits must (eventually) be woken directly
67/// *by some other task* in the same test case.
68///
69/// (By directly we mean that the [`Waker::wake`] call is made
70/// by that waking future, before that future itself awaits anything.)
71///
72/// # Panics
73///
74/// The executor will panic
75/// if the toplevel future (passed to `block_on`)
76/// doesn't complete (without externally blocking),
77/// but instead waits for something.
78///
79/// The executor will malfunction or panic if reentered.
80/// (Eg, if `block_on` is reentered.)
81#[derive(Clone, Default, Educe)]
82#[educe(Debug)]
83pub struct MockExecutor {
84    /// Mutable state
85    #[educe(Debug(ignore))]
86    shared: Arc<Shared>,
87}
88
89/// Shared state and ancillary information
90///
91/// This is always within an `Arc`.
92#[derive(Default)]
93struct Shared {
94    /// Shared state
95    data: Mutex<Data>,
96    /// Condition variable for thread scheduling
97    ///
98    /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
99    /// is modified.
100    thread_condvar: std::sync::Condvar,
101}
102
103/// Task id, module to hide `Ti` alias
104mod task_id {
105    slotmap_careful::new_key_type! {
106        /// Task ID, usually called `TaskId`
107        ///
108        /// Short name in special `task_id` module so that [`Debug`] is nice
109        pub(super) struct Ti;
110    }
111}
112use task_id::Ti as TaskId;
113
114/// Executor's state
115///
116/// ### Task state machine
117///
118/// A task is created in `tasks`, `Awake`, so also in `awake`.
119///
120/// When we poll it, we take it out of `awake` and set it to `Asleep`,
121/// and then call `poll()`.
122/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
123/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
124///
125/// The task's future is of course also present here in this data structure.
126/// However, during poll we must release the lock,
127/// so we cannot borrow the future from `Data`.
128/// Instead, we move it out.  So `Task.fut` is an `Option`.
129///
130/// ### "Main" task - the argument to `block_on`
131///
132/// The signature of `BlockOn::block_on` accepts a non-`'static` future
133/// (and a non-`Send`/`Sync` one).
134///
135/// So we cannot store that future in `Data` because `Data` is `'static`.
136/// Instead, this main task future is passed as an argument down the call stack.
137/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
138#[derive(Default, derive_more::Debug)]
139struct Data {
140    /// Tasks
141    ///
142    /// Includes tasks spawned with `spawn`,
143    /// and also the future passed to `block_on`.
144    #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
145    tasks: DenseSlotMap<TaskId, Task>,
146
147    /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
148    ///
149    /// Tasks are pushed onto the *back* when woken,
150    /// so back is the most recently woken.
151    #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
152    awake: VecDeque<TaskId>,
153
154    /// If a future from `progress_until_stalled` exists
155    progressing_until_stalled: Option<ProgressingUntilStalled>,
156
157    /// Scheduling policy
158    scheduling: SchedulingPolicy,
159
160    /// (Sub)thread we want to run now
161    ///
162    /// At any one time only one thread is meant to be running.
163    /// Other threads are blocked in condvar wait, waiting for this to change.
164    ///
165    /// **Modified only** within
166    /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
167    /// which takes responsibility for preserving the following **invariants**:
168    ///
169    ///  1. no-one but the named thread is allowed to modify this field.
170    ///  2. after modifying this field, signal `thread_condvar`
171    thread_to_run: ThreadDescriptor,
172}
173
174/// How we should schedule?
175#[derive(Debug, Clone, Default, EnumIter)]
176#[non_exhaustive]
177pub enum SchedulingPolicy {
178    /// Task *most* recently woken is run
179    ///
180    /// This is the default.
181    ///
182    /// It will expose starvation bugs if a task never sleeps.
183    /// (Which is a good thing in tests.)
184    #[default]
185    Stack,
186    /// Task *least* recently woken is run.
187    Queue,
188}
189
190/// Record of a single task
191///
192/// Tracks a spawned task, or the main task (the argument to `block_on`).
193///
194/// Stored in [`Data`]`.tasks`.
195struct Task {
196    /// For debugging output
197    desc: String,
198    /// Has this been woken via a waker?  (And is it in `Data.awake`?)
199    ///
200    /// **Set to `Awake` only by [`Task::set_awake`]**,
201    /// preserving the invariant that
202    /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
203    state: TaskState,
204    /// The actual future (or a placeholder for it)
205    ///
206    /// May be `None` because we've temporarily moved it out so we can poll it,
207    /// or if this is a Subthread task which is currently running sync code
208    /// (in which case we're blocked in the executor waiting to be
209    /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
210    fut: Option<TaskFutureInfo>,
211    /// Is this task actually a [`Subthread`](MockExecutor::subthread_spawn)?
212    ///
213    /// Subthread tasks do not end when `fut` is `Ready` -
214    /// instead, `fut` is `Some` when the thread is within `subthread_block_on_future`.
215    /// The rest of the time this is `None`, but we don't run the executor,
216    /// because `Data.thread_to_run` is `ThreadDescriptor::Task(this_task)`.
217    is_subthread: Option<IsSubthread>,
218}
219
220/// A future as stored in our record of a [`Task`]
221enum TaskFutureInfo {
222    /// The [`Future`].  All is normal.
223    Normal(TaskFuture),
224    /// The future isn't here because this task is the main future for `block_on`
225    Main,
226}
227
228/// State of a task - do we think it needs to be polled?
229///
230/// Stored in [`Task`]`.state`.
231#[derive(Debug)]
232enum TaskState {
233    /// Awake - needs to be polled
234    ///
235    /// Established by [`waker.wake()`](Waker::wake)
236    Awake,
237    /// Asleep - does *not* need to be polled
238    ///
239    /// Established each time just before we call the future's [`poll`](Future::poll)
240    Asleep(Vec<SleepLocation>),
241}
242
243/// Actual implementor of `Wake` for use in a `Waker`
244///
245/// Futures (eg, channels from [`futures`]) will use this to wake a task
246/// when it should be polled.
247///
248/// This type must not be `Cloned` with the `Data` lock held.
249/// Consequently, a `Waker` mustn't either.
250struct ActualWaker {
251    /// Executor state
252    ///
253    /// The Waker mustn't to hold a strong reference to the executor,
254    /// since typically a task holds a future that holds a Waker,
255    /// and the executor holds the task - so that would be a cycle.
256    data: Weak<Shared>,
257
258    /// Which task this is
259    id: TaskId,
260}
261
262/// State used for an in-progress call to
263/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
264///
265/// If present in [`Data`], an (async) call to `progress_until_stalled`
266/// is in progress.
267///
268/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
269/// is a normal-ish future.
270/// It can be polled in the normal way.
271/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
272///
273/// The future is made ready, and woken (via `waker`),
274/// by bespoke code in the task executor loop.
275///
276/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
277/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
278#[derive(Debug)]
279struct ProgressingUntilStalled {
280    /// Have we, in fact, stalled?
281    ///
282    /// Made `Ready` by special code in the executor loop
283    finished: Poll<()>,
284
285    /// Waker
286    ///
287    /// Signalled by special code in the executor loop
288    waker: Option<Waker>,
289}
290
291/// Future from
292/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
293///
294/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
295///
296/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
297/// There can only be one at a time.
298#[derive(Educe)]
299#[educe(Debug)]
300struct ProgressUntilStalledFuture {
301    /// Executor's state; this future's state is in `.progressing_until_stalled`
302    #[educe(Debug(ignore))]
303    shared: Arc<Shared>,
304}
305
306/// Identifies a thread we know about - the executor thread, or a Subthread
307///
308/// Not related to `std::thread::ThreadId`.
309///
310/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
311///
312/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
313/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
314/// That's OK (and documented).
315#[derive(Default, Copy, Clone, Eq, PartialEq, derive_more::Debug)]
316enum ThreadDescriptor {
317    /// The executor.
318    #[debug("Exe")]
319    #[default]
320    Executor,
321    /// This task, which is a Subthread.
322    #[debug("{_0:?}")]
323    Subthread(TaskId),
324}
325
326/// Marker indicating that this task is a Subthread, not an async task.
327///
328/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
329#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
330struct IsSubthread;
331
332thread_local! {
333    /// Identifies this thread.
334    pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
335        Cell::new(ThreadDescriptor::Executor)
336    };
337}
338
339//---------- creation ----------
340
341impl MockExecutor {
342    /// Make a `MockExecutor` with default parameters
343    pub fn new() -> Self {
344        Self::default()
345    }
346
347    /// Make a `MockExecutor` with a specific `SchedulingPolicy`
348    pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
349        Data {
350            scheduling,
351            ..Default::default()
352        }
353        .into()
354    }
355}
356
357impl From<Data> for MockExecutor {
358    fn from(data: Data) -> MockExecutor {
359        let shared = Shared {
360            data: Mutex::new(data),
361            thread_condvar: std::sync::Condvar::new(),
362        };
363        MockExecutor {
364            shared: Arc::new(shared),
365        }
366    }
367}
368
369//---------- spawning ----------
370
371impl MockExecutor {
372    /// Spawn a task and return something to identify it
373    ///
374    /// `desc` should `Display` as some kind of short string (ideally without spaces)
375    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
376    ///
377    /// The returned value is an opaque task identifier which is very cheap to clone
378    /// and which can be used by the caller in debug logging,
379    /// if it's desired to correlate with the debug output from `MockExecutor`.
380    /// Most callers will want to ignore it.
381    ///
382    /// This method is infallible.  (The `MockExecutor` cannot be shut down.)
383    pub fn spawn_identified(
384        &self,
385        desc: impl Display,
386        fut: impl Future<Output = ()> + Send + 'static,
387    ) -> impl Debug + Clone + Send + 'static {
388        self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
389    }
390
391    /// Spawn a task and return its output for further usage
392    ///
393    /// `desc` should `Display` as some kind of short string (ideally without spaces)
394    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
395    pub fn spawn_join<T: Debug + Send + 'static>(
396        &self,
397        desc: impl Display,
398        fut: impl Future<Output = T> + Send + 'static,
399    ) -> impl Future<Output = T> {
400        let (tx, rx) = oneshot::channel();
401        self.spawn_identified(desc, async move {
402            let res = fut.await;
403            tx.send(res)
404                .expect("Failed to send future's output, did future panic?");
405        });
406        rx.map(|m| m.expect("Failed to receive future's output"))
407    }
408
409    /// Spawn a task and return its `TaskId`
410    ///
411    /// Convenience method for use by `spawn_identified` and `spawn_obj`.
412    /// The future passed to `block_on` is not handled here.
413    fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
414        let mut data = self.shared.lock();
415        data.insert_task(desc, TaskFutureInfo::Normal(fut), None)
416    }
417}
418
419impl Data {
420    /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
421    fn insert_task(
422        &mut self,
423        desc: String,
424        fut: TaskFutureInfo,
425        is_subthread: Option<IsSubthread>,
426    ) -> TaskId {
427        let state = Awake;
428        let id = self.tasks.insert(Task {
429            state,
430            desc,
431            fut: Some(fut),
432            is_subthread,
433        });
434        self.awake.push_back(id);
435        trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
436        id
437    }
438}
439
440impl Spawn for MockExecutor {
441    fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
442        self.spawn_internal("spawn_obj".into(), future);
443        Ok(())
444    }
445}
446
447impl SpawnBlocking for MockExecutor {
448    type Handle<T: Send + 'static> = Map<Receiver<T>, Box<dyn FnOnce(Result<T, Canceled>) -> T>>;
449
450    fn spawn_blocking<F, T>(&self, f: F) -> Self::Handle<T>
451    where
452        F: FnOnce() -> T + Send + 'static,
453        T: Send + 'static,
454    {
455        // For the mock executor, everything goes on the same threadpool.
456        // If we need something more complex in the future, we can change this.
457        let (tx, rx) = oneshot::channel();
458        self.spawn_identified("spawn_blocking".to_string(), async move {
459            match tx.send(f()) {
460                Ok(()) => (),
461                Err(_) => panic!("Failed to send future's output, did future panic?"),
462            }
463        });
464        rx.map(Box::new(|m| m.expect("Failed to receive future's output")))
465    }
466}
467
468//---------- block_on ----------
469
470impl BlockOn for MockExecutor {
471    fn block_on<F>(&self, input_fut: F) -> F::Output
472    where
473        F: Future,
474    {
475        let mut value: Option<F::Output> = None;
476
477        // Box this just so that we can conveniently control precisely when it's dropped.
478        // (We could do this with Option and Pin::set but that seems clumsier.)
479        let mut input_fut = Box::pin(input_fut);
480
481        let run_store_fut = {
482            let value = &mut value;
483            let input_fut = &mut input_fut;
484            async {
485                trace!("MockExecutor block_on future...");
486                let t = input_fut.await;
487                trace!("MockExecutor block_on future returned...");
488                *value = Some(t);
489                trace!("MockExecutor block_on future exiting.");
490            }
491        };
492
493        {
494            pin_mut!(run_store_fut);
495
496            let main_id = self
497                .shared
498                .lock()
499                .insert_task("main".into(), TaskFutureInfo::Main, None);
500            trace!("MockExecutor {main_id:?} is task for block_on");
501            self.execute_to_completion(run_store_fut);
502        }
503
504        #[allow(clippy::let_and_return)] // clarity
505        let value = value.take().unwrap_or_else(|| {
506            // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
507            // use the latter, so that the debug dump is prefixed by this message.
508            let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
509            // write to tracing too, so the tracing log is clear about when we crashed
510            error!("all futures blocked, crashing...");
511
512            // Sequencing here is subtle.
513            //
514            // We should do the dump before dropping the input future, because the input
515            // future is likely to own things that, when dropped, wake up other tasks,
516            // rendering the dump inaccurate.
517            //
518            // But also, dropping the input future may well drop a ProgressUntilStalledFuture
519            // which then reenters us.  More generally, we mustn't call user code
520            // with the lock held.
521            //
522            // And, we mustn't panic with the data lock held.
523            //
524            // If value was Some, then this closure is dropped without being called,
525            // which drops the future after it has yielded the value, which is correct.
526            {
527                let mut data = self.shared.lock();
528                data.debug_dump();
529            }
530            drop(input_fut);
531
532            panic!(
533                r"
534all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
535"
536            );
537        });
538
539        value
540    }
541}
542
543//---------- execution - core implementation ----------
544
545impl MockExecutor {
546    /// Keep polling tasks until nothing more can be done
547    ///
548    /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
549    fn execute_to_completion(&self, mut main_fut: MainFuture) {
550        trace!("MockExecutor execute_to_completion...");
551        loop {
552            self.execute_until_first_stall(main_fut.as_mut());
553
554            // Handle `progressing_until_stalled`
555            let pus_waker = {
556                let mut data = self.shared.lock();
557                let pus = &mut data.progressing_until_stalled;
558                trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
559                let Some(pus) = pus else {
560                    // No progressing_until_stalled, we're actually done.
561                    break;
562                };
563                assert_eq!(
564                    pus.finished, Pending,
565                    "ProgressingUntilStalled finished twice?!"
566                );
567                pus.finished = Ready(());
568
569                // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
570                let waker = pus
571                    .waker
572                    .take()
573                    .expect("ProgressUntilStalledFuture not ever polled!");
574                drop(data);
575                let waker_copy = waker.clone();
576                let mut data = self.shared.lock();
577
578                let pus = &mut data.progressing_until_stalled;
579                if let Some(double) = mem::replace(
580                    &mut pus
581                        .as_mut()
582                        .expect("progressing_until_stalled updated under our feet!")
583                        .waker,
584                    Some(waker),
585                ) {
586                    panic!("double progressing_until_stalled.waker! {double:?}");
587                }
588
589                waker_copy
590            };
591            pus_waker.wake();
592        }
593        trace!("MockExecutor execute_to_completion done");
594    }
595
596    /// Keep polling tasks until `awake` is empty
597    ///
598    /// (Ignores `progressing_until_stalled` - so if one is active,
599    /// will return when all other tasks have blocked.)
600    ///
601    /// # Panics
602    ///
603    /// Might malfunction or panic if called reentrantly
604    #[allow(clippy::cognitive_complexity)]
605    fn execute_until_first_stall(&self, mut main_fut: MainFuture) {
606        trace!("MockExecutor execute_until_first_stall ...");
607        'outer: loop {
608            // Take a `Awake` task off `awake` and make it `Asleep`
609            let (id, mut fut) = 'inner: loop {
610                let mut data = self.shared.lock();
611                let Some(id) = data.schedule() else {
612                    break 'outer;
613                };
614                let Some(task) = data.tasks.get_mut(id) else {
615                    trace!("MockExecutor {id:?} vanished");
616                    continue;
617                };
618                task.state = Asleep(vec![]);
619                let fut = task.fut.take().expect("future missing from task!");
620                break 'inner (id, fut);
621            };
622
623            // Poll the selected task
624            let waker = ActualWaker {
625                data: Arc::downgrade(&self.shared),
626                id,
627            }
628            .new_waker();
629            trace!("MockExecutor {id:?} polling...");
630            let mut cx = Context::from_waker(&waker);
631            let r = match &mut fut {
632                TaskFutureInfo::Normal(fut) => fut.poll_unpin(&mut cx),
633                TaskFutureInfo::Main => main_fut.as_mut().poll(&mut cx),
634            };
635
636            // Deal with the returned `Poll`
637            let _fut_drop_late;
638            {
639                let mut data = self.shared.lock();
640                let task = data
641                    .tasks
642                    .get_mut(id)
643                    .expect("task vanished while we were polling it");
644
645                match (r, task.is_subthread) {
646                    (Pending, _) => {
647                        trace!("MockExecutor {id:?} -> Pending");
648                        if task.fut.is_some() {
649                            panic!("task reinserted while we polled it?!");
650                        }
651                        // The task might have been woken *by its own poll method*.
652                        // That's why we set it to `Asleep` *earlier* rather than here.
653                        // All we need to do is put the future back.
654                        task.fut = Some(fut);
655                    }
656                    (Ready(()), None) => {
657                        trace!("MockExecutor {id:?} -> Ready");
658                        // Oh, it finished!
659                        // It might be in `awake`, but that's allowed to contain stale tasks,
660                        // so we *don't* need to scan that list and remove it.
661                        data.tasks.remove(id);
662                        // It is important that we don't drop `fut` until we have released
663                        // the data lock, since it is an external type and might try to reenter
664                        // us (eg by calling spawn).  If we do that here, we risk deadlock.
665                        // So, move `fut` to a variable with scope outside the block with `data`.
666                        _fut_drop_late = fut;
667                    }
668                    (Ready(()), Some(IsSubthread)) => {
669                        trace!("MockExecutor {id:?} -> Ready, waking Subthread");
670                        // Task was blocking on the future given to .subthread_block_on_future().
671                        // That future has completed and stored its result where the Subthread
672                        // can see it.  Now we need to wake up that thread, and let it run
673                        // until it blocks again.
674                        //
675                        // We leave `.fut` as `None`.
676                        // subthread_block_on_future is responsible for filling it in again.
677
678                        self.shared.thread_context_switch(
679                            data,
680                            ThreadDescriptor::Executor,
681                            ThreadDescriptor::Subthread(id),
682                        );
683
684                        // Now, if the Subthread still exists, that's because it's waiting
685                        // in subthread_block_on_future again, in which case `fut` is `Some`.
686                        // Or it might have ended, in which case it's not in `tasks` any more.
687                        // We can go back to scheduling futures.
688
689                        // `fut` contains the future passed to `subthread_block_on_future`,
690                        // ie it owns an external type.  See above.
691                        _fut_drop_late = fut;
692                    }
693                }
694            }
695        }
696        trace!("MockExecutor execute_until_first_stall done.");
697    }
698}
699
700impl Data {
701    /// Return the next task to run
702    ///
703    /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
704    /// The caller must restore the invariant!
705    fn schedule(&mut self) -> Option<TaskId> {
706        use SchedulingPolicy as SP;
707        match self.scheduling {
708            SP::Stack => self.awake.pop_back(),
709            SP::Queue => self.awake.pop_front(),
710        }
711    }
712}
713
714impl ActualWaker {
715    /// Obtain a strong reference to the executor's data
716    fn upgrade_data(&self) -> Option<Arc<Shared>> {
717        self.data.upgrade()
718    }
719
720    /// Wake the task corresponding to this `ActualWaker`
721    ///
722    /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
723    fn wake(&self) {
724        let Some(data) = self.upgrade_data() else {
725            // The executor is gone!  Don't try to wake.
726            return;
727        };
728        let mut data = data.lock();
729        let data = &mut *data;
730        trace!("MockExecutor {:?} wake", &self.id);
731        let Some(task) = data.tasks.get_mut(self.id) else {
732            return;
733        };
734        task.set_awake(self.id, &mut data.awake);
735    }
736}
737
738//---------- "progress until stalled" functionality ----------
739
740impl MockExecutor {
741    /// Run tasks in the current executor until every other task is waiting
742    ///
743    /// # Panics
744    ///
745    /// Might malfunction or panic if more than one such call is running at once.
746    ///
747    /// (Ie, you must `.await` or drop the returned `Future`
748    /// before calling this method again.)
749    ///
750    /// Must be called and awaited within a future being run by `self`.
751    pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
752        let mut data = self.shared.lock();
753        assert!(
754            data.progressing_until_stalled.is_none(),
755            "progress_until_stalled called more than once"
756        );
757        trace!("MockExecutor progress_until_stalled...");
758        data.progressing_until_stalled = Some(ProgressingUntilStalled {
759            finished: Pending,
760            waker: None,
761        });
762        ProgressUntilStalledFuture {
763            shared: self.shared.clone(),
764        }
765    }
766}
767
768impl Future for ProgressUntilStalledFuture {
769    type Output = ();
770
771    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
772        let waker = cx.waker().clone();
773        let mut data = self.shared.lock();
774        let pus = data.progressing_until_stalled.as_mut();
775        trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
776        let pus = pus.expect("ProgressingUntilStalled missing");
777        pus.waker = Some(waker);
778        pus.finished
779    }
780}
781
782impl Drop for ProgressUntilStalledFuture {
783    fn drop(&mut self) {
784        self.shared.lock().progressing_until_stalled = None;
785    }
786}
787
788//---------- (sub)threads ----------
789
790impl MockExecutor {
791    /// Spawn a "Subthread", for processing in a sync context
792    ///
793    /// `call` will be run on a separate thread, called a "Subthread".
794    ///
795    /// But it will **not run simultaneously** with the executor,
796    /// nor with other Subthreads.
797    /// So Subthreads are somewhat like coroutines.
798    ///
799    /// `call` must be capable of making progress without waiting for any other Subthreads.
800    /// `call` may wait for async futures, using
801    /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
802    ///
803    /// Subthreads may be used for cpubound activity,
804    /// or synchronous IO (such as large volumes of disk activity),
805    /// provided that the synchronous code will reliably make progress,
806    /// without waiting (directly or indirectly) for any async task or Subthread -
807    /// except via `subthread_block_on_future`.
808    ///
809    /// # Subthreads vs raw `std::thread` threads
810    ///
811    /// Programs using `MockExecutor` may use `std::thread` threads directly.
812    /// However, this is not recommended.  There are severe limitations:
813    ///
814    ///  * Only a Subthread can re-enter the async context from sync code:
815    ///    this must be done with
816    ///    using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
817    ///    (Re-entering the executor with [`block_on`](BlockOn::block_on) is not allowed.)
818    ///  * If async tasks want to suspend waiting for synchronous code,
819    ///    the synchronous code must run on a Subthread.
820    ///    This allows the `MockExecutor` to know when
821    ///    that synchronous code is still making progress.
822    ///    (This is needed for
823    ///    [`progress_until_stalled`](MockExecutor::progress_until_stalled)
824    ///    and the facilities which use it, such as
825    ///    [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
826    ///  * Subthreads never run in parallel -
827    ///    they only run as scheduled deterministically by the `MockExecutor`.
828    ///    So using Subthreads eliminates a source of test nonndeterminism.
829    ///    (Execution order is still varied due to explicitly varying the scheduling policy.)
830    ///
831    /// **TODO [\#1835](https://gitlab.torproject.org/tpo/core/arti/-/issues/1835)**:
832    /// Currently the traits in `tor_rtcompat` do not support proper usage very well.
833    ///
834    /// # Panics, abuse, and malfunctions
835    ///
836    /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
837    /// The application code should to do something about it if this happens,
838    /// typically, logging errors, tearing things down, or failing a test case.
839    ///
840    /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
841    /// (So, typically, if the thread supposed to run the executor panics,
842    /// for example because a future or the executor itself panics,
843    /// all the subthreads will become stuck - effectively, they'll be leaked.)
844    ///
845    /// `spawn_subthread` panics if OS thread spawning fails.
846    /// (Like `std::thread::spawn()` does.)
847    ///
848    /// `MockExecutor`s will malfunction or panic if
849    /// any executor invocation method (eg `block_on`) is called on a Subthread.
850    pub fn subthread_spawn<T: Send + 'static>(
851        &self,
852        desc: impl Display,
853        call: impl FnOnce() -> T + Send + 'static,
854    ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
855        let desc = desc.to_string();
856        let (output_tx, output_rx) = oneshot::channel();
857
858        // NB: we don't know which thread we're on!
859        // In principle we might be on another Subthread.
860        // So we can't context switch here.  That would be very confusing.
861        //
862        // Instead, we prepare the new Subthread as follows:
863        //   - There is a task in the executor
864        //   - The task is ready to be polled, whenever the executor decides to
865        //   - The thread starts running right away, but immediately waits until it is scheduled
866        // See `subthread_entrypoint`.
867
868        {
869            let mut data = self.shared.lock();
870            let fut = TaskFutureInfo::Normal(
871                Box::new(
872                    // When the executor decides that this new task is to be polled,
873                    // its future (this future) returns Ready immediately,
874                    // and the executor mainloop will context switch to the new thread.
875                    futures::future::ready(()),
876                )
877                .into(),
878            );
879            let id = data.insert_task(desc.clone(), fut, Some(IsSubthread));
880
881            let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
882                .name(desc)
883                .spawn({
884                    let shared = self.shared.clone();
885                    move || shared.subthread_entrypoint(id, call, output_tx)
886                })
887                .expect("spawn failed");
888        }
889
890        output_rx.map(|r| {
891            r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
892        })
893    }
894
895    /// Call an async `Future` from a Subthread
896    ///
897    /// Blocks the Subthread, and arranges to run async tasks,
898    /// including `fut`, until `fut` completes.
899    ///
900    /// `fut` is polled on the executor thread, not on the Subthread.
901    ///
902    /// # Panics, abuse, and malfunctions
903    ///
904    /// `subthread_block_on_future` will malfunction or panic
905    /// if called on a thread that isn't a Subthread from the same `MockExecutor`
906    /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
907    ///
908    /// If `fut` itself panics, the executor will panic.
909    ///
910    /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
911    /// See `spawn_subthread`.
912    pub fn subthread_block_on_future<T: Send + 'static>(
913        &self,
914        fut: impl Future<Output = T> + Send + 'static,
915    ) -> T {
916        let ret = Arc::new(Mutex::new(None));
917        let fut = {
918            let ret = ret.clone();
919            async move {
920                let t = fut.await;
921                *ret.lock().expect("poison") = Some(t);
922            }
923        };
924        let fut = TaskFutureInfo::Normal(Box::new(fut).into());
925
926        let id = match THREAD_DESCRIPTOR.get() {
927            ThreadDescriptor::Subthread(id) => id,
928            ThreadDescriptor::Executor => panic!(
929                "subthread_block_on_future called on thread not spawned with spawn_subthread"
930            ),
931        };
932        trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
933
934        {
935            let mut data = self.shared.lock();
936            let data_ = &mut *data;
937            let task = data_.tasks.get_mut(id).expect("Subthread task vanished!");
938            task.fut = Some(fut);
939            task.set_awake(id, &mut data_.awake);
940
941            self.shared.thread_context_switch(
942                data,
943                ThreadDescriptor::Subthread(id),
944                ThreadDescriptor::Executor,
945            );
946        }
947
948        let ret = ret.lock().expect("poison").take();
949        ret.expect("fut completed but didn't store")
950    }
951}
952
953impl Shared {
954    /// Main entrypoint function for a Subthread
955    ///
956    /// Entered on a new `std::thread` thread created by
957    /// [`subthread_spawn`](MockExecutor::subthread_spawn).
958    ///
959    /// When `call` completes, sends its returned value `T` to `output_tx`.
960    fn subthread_entrypoint<T: Send + 'static>(
961        self: Arc<Self>,
962        id: TaskId,
963        call: impl FnOnce() -> T + Send + 'static,
964        output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
965    ) {
966        THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
967        trace!("MockExecutor thread {id:?}, entrypoint");
968
969        // Wait for the executor to tell us to run.
970        // This will be done the first time the task is polled.
971        {
972            let data = self.lock();
973            self.thread_context_switch_waitfor_instruction_to_run(
974                data,
975                ThreadDescriptor::Subthread(id),
976            );
977        }
978
979        trace!("MockExecutor thread {id:?}, entering user code");
980
981        // Run the user's actual thread function.
982        // This will typically reenter us via subthread_block_on_future.
983        let ret = catch_unwind(AssertUnwindSafe(call));
984
985        trace!("MockExecutor thread {id:?}, completed user code");
986
987        // This makes SubthreadFuture ready.
988        // It will be polled by the executor in due course, presumably.
989
990        output_tx.send(ret).unwrap_or_else(
991            #[allow(clippy::unnecessary_lazy_evaluations)]
992            |_| {}, // receiver dropped, maybe executor dropped or something?
993        );
994
995        {
996            let mut data = self.lock();
997
998            // Never poll this task again (so never schedule this thread)
999            let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
1000
1001            // Tell the executor it is scheduled now.
1002            // We carry on exiting, in parallel (holding the data lock).
1003            self.thread_context_switch_send_instruction_to_run(
1004                &mut data,
1005                ThreadDescriptor::Subthread(id),
1006                ThreadDescriptor::Executor,
1007            );
1008        }
1009    }
1010
1011    /// Switch from (sub)thread `us` to (sub)thread `them`
1012    ///
1013    /// Returns when someone calls `thread_context_switch(.., us)`.
1014    fn thread_context_switch(
1015        &self,
1016        mut data: MutexGuard<Data>,
1017        us: ThreadDescriptor,
1018        them: ThreadDescriptor,
1019    ) {
1020        trace!("MockExecutor thread {us:?}, switching to {them:?}");
1021        self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
1022        self.thread_context_switch_waitfor_instruction_to_run(data, us);
1023    }
1024
1025    /// Instruct the (sub)thread `them` to run
1026    ///
1027    /// Update `thread_to_run`, which will wake up `them`'s
1028    /// call to `thread_context_switch_waitfor_instruction_to_run`.
1029    ///
1030    /// Must be called from (sub)thread `us`.
1031    /// Part of `thread_context_switch`, not normally called directly.
1032    fn thread_context_switch_send_instruction_to_run(
1033        &self,
1034        data: &mut MutexGuard<Data>,
1035        us: ThreadDescriptor,
1036        them: ThreadDescriptor,
1037    ) {
1038        assert_eq!(data.thread_to_run, us);
1039        data.thread_to_run = them;
1040        self.thread_condvar.notify_all();
1041    }
1042
1043    /// Await an instruction for this thread, `us`, to run
1044    ///
1045    /// Waits for `thread_to_run` to be `us`,
1046    /// waiting for `thread_condvar` as necessary.
1047    ///
1048    /// Part of `thread_context_switch`, not normally called directly.
1049    fn thread_context_switch_waitfor_instruction_to_run(
1050        &self,
1051        data: MutexGuard<Data>,
1052        us: ThreadDescriptor,
1053    ) {
1054        #[allow(let_underscore_lock)]
1055        let _: MutexGuard<_> = self
1056            .thread_condvar
1057            .wait_while(data, |data| {
1058                let live = data.thread_to_run;
1059                let resume = live == us;
1060                if resume {
1061                    trace!("MockExecutor thread {us:?}, resuming");
1062                } else {
1063                    trace!("MockExecutor thread {us:?}, waiting for {live:?}");
1064                }
1065                // We're in `.wait_while`, not `.wait_until`.  Confusing.
1066                !resume
1067            })
1068            .expect("data lock poisoned");
1069    }
1070}
1071
1072//---------- ancillary and convenience functions ----------
1073
1074/// Trait to let us assert at compile time that something is nicely `Sync` etc.
1075#[allow(dead_code)] // yes, we don't *use* anything from this trait
1076trait EnsureSyncSend: Sync + Send + 'static {}
1077impl EnsureSyncSend for ActualWaker {}
1078impl EnsureSyncSend for MockExecutor {}
1079
1080impl MockExecutor {
1081    /// Return the number of tasks running in this executor
1082    ///
1083    /// One possible use is for a test case to check that task(s)
1084    /// that ought to have exited, have indeed done so.
1085    ///
1086    /// In the usual case, the answer will be at least 1,
1087    /// because it counts the future passed to
1088    /// [`block_on`](MockExecutor::block_on)
1089    /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
1090    pub fn n_tasks(&self) -> usize {
1091        self.shared.lock().tasks.len()
1092    }
1093}
1094
1095impl Shared {
1096    /// Lock and obtain the guard
1097    ///
1098    /// Convenience method which panics on poison
1099    fn lock(&self) -> MutexGuard<Data> {
1100        self.data.lock().expect("data lock poisoned")
1101    }
1102}
1103
1104impl Task {
1105    /// Set task `id` to `Awake` and arrange that it will be polled.
1106    fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1107        match self.state {
1108            Awake => {}
1109            Asleep(_) => {
1110                self.state = Awake;
1111                data_awake.push_back(id);
1112            }
1113        }
1114    }
1115}
1116
1117//---------- ActualWaker as RawWaker ----------
1118
1119/// Using [`ActualWaker`] in a [`RawWaker`]
1120///
1121/// We need to make a
1122/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
1123/// which contains an
1124/// [`ActualWaker`] (our actual waker implementation, also safe).
1125///
1126/// `std` offers `Waker::from<Arc<impl Wake>>`.
1127/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
1128///
1129/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
1130/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
1131///
1132/// SAFETY
1133///
1134///  * The data pointer is `Box::<ActualWaker>::into_raw()`
1135///  * We share these when we clone
1136///  * No-one is allowed `&mut ActualWaker` unless there are no other clones
1137///  * So we may make references `&ActualWaker`
1138impl ActualWaker {
1139    /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1140    fn new_waker(self) -> Waker {
1141        unsafe { Waker::from_raw(self.raw_new()) }
1142    }
1143
1144    /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
1145    fn raw_new(self) -> RawWaker {
1146        let self_: Box<ActualWaker> = self.into();
1147        let self_: *mut ActualWaker = Box::into_raw(self_);
1148        let self_: *const () = self_ as _;
1149        RawWaker::new(self_, &RAW_WAKER_VTABLE)
1150    }
1151
1152    /// Implementation of [`RawWakerVTable`]'s `clone`
1153    unsafe fn raw_clone(self_: *const ()) -> RawWaker {
1154        let self_: *const ActualWaker = self_ as _;
1155        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1156        let copy: ActualWaker = self_.clone();
1157        copy.raw_new()
1158    }
1159
1160    /// Implementation of [`RawWakerVTable`]'s `wake`
1161    unsafe fn raw_wake(self_: *const ()) {
1162        Self::raw_wake_by_ref(self_);
1163        Self::raw_drop(self_);
1164    }
1165
1166    /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1167    unsafe fn raw_wake_by_ref(self_: *const ()) {
1168        let self_: *const ActualWaker = self_ as _;
1169        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1170        self_.wake();
1171    }
1172
1173    /// Implementation of [`RawWakerVTable`]'s `drop`
1174    unsafe fn raw_drop(self_: *const ()) {
1175        let self_: *mut ActualWaker = self_ as _;
1176        let self_: Box<ActualWaker> = Box::from_raw(self_);
1177        drop(self_);
1178    }
1179}
1180
1181/// vtable for `Box<ActualWaker>` as `RawWaker`
1182//
1183// This ought to be in the impl block above, but
1184//   "associated `static` items are not allowed"
1185static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1186    ActualWaker::raw_clone,
1187    ActualWaker::raw_wake,
1188    ActualWaker::raw_wake_by_ref,
1189    ActualWaker::raw_drop,
1190);
1191
1192//---------- Sleep location tracking and dumping ----------
1193
1194/// We record "where a future went to sleep" as (just) a backtrace
1195///
1196/// This type alias allows us to mock `Backtrace` for miri.
1197/// (It also insulates from future choices about sleep location representation.0
1198#[cfg(not(miri))]
1199type SleepLocation = Backtrace;
1200
1201impl Data {
1202    /// Dump tasks and their sleep location backtraces
1203    fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
1204        for (id, task) in self.tasks.iter() {
1205            let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
1206            match &task.state {
1207                Awake => {
1208                    prefix(f)?;
1209                    writeln!(f, "awake")?;
1210                }
1211                Asleep(locs) => {
1212                    let n = locs.len();
1213                    for (i, loc) in locs.iter().enumerate() {
1214                        prefix(f)?;
1215                        writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
1216                    }
1217                    if n == 0 {
1218                        prefix(f)?;
1219                        writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
1220                    }
1221                }
1222            }
1223        }
1224        writeln!(
1225            f,
1226            "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
1227        )?;
1228        Ok(())
1229    }
1230}
1231
1232/// Track sleep locations via `<Waker as Clone>`.
1233///
1234/// See [`MockExecutor::debug_dump`] for the explanation.
1235impl Clone for ActualWaker {
1236    fn clone(&self) -> Self {
1237        let id = self.id;
1238
1239        if let Some(data) = self.upgrade_data() {
1240            // If the executor is gone, there is nothing to adjust
1241            let mut data = data.lock();
1242            if let Some(task) = data.tasks.get_mut(self.id) {
1243                match &mut task.state {
1244                    Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
1245                    Asleep(locs) => locs.push(SleepLocation::force_capture()),
1246                }
1247            } else {
1248                trace!("MockExecutor cloned waker for dead task {id:?}");
1249            }
1250        }
1251
1252        ActualWaker {
1253            data: self.data.clone(),
1254            id,
1255        }
1256    }
1257}
1258
1259//---------- API for full debug dump ----------
1260
1261/// Debugging dump of a `MockExecutor`'s state
1262///
1263/// Returned by [`MockExecutor::as_debug_dump`]
1264//
1265// Existence implies backtraces have been resolved
1266//
1267// We use `Either` so that we can also use this internally when we have &mut Data.
1268pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
1269
1270impl MockExecutor {
1271    /// Dump the executor's state including backtraces of waiting tasks, to stderr
1272    ///
1273    /// This is considerably more extensive than simply
1274    /// `MockExecutor as Debug`.
1275    ///
1276    /// (This is a convenience method, which wraps
1277    /// [`MockExecutor::as_debug_dump()`].
1278    ///
1279    /// ### Backtrace salience (possible spurious traces)
1280    ///
1281    /// **Summary**
1282    ///
1283    /// The technique used to capture backtraces when futures sleep is not 100% exact.
1284    /// It will usually show all the actual sleeping sites,
1285    /// but it might also show other backtraces which were part of
1286    /// the implementation of some complex relevant future.
1287    ///
1288    /// **Details**
1289    ///
1290    /// When a future's implementation wants to sleep,
1291    /// it needs to record the [`Waker`] (from the [`Context`])
1292    /// so that the "other end" can call `.wake()` on it later,
1293    /// when the future should be woken.
1294    ///
1295    /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
1296    /// the future must clone the `Waker`,
1297    /// and it must do so in within the `poll()` call.
1298    ///
1299    /// A future which is waiting in a `select!` will typically
1300    /// show multiple traces, one for each branch.
1301    /// But,
1302    /// if a future sleeps on one thing, and then when polled again later,
1303    /// sleeps on something different, without waking up in between,
1304    /// both backtrace locations will be shown.
1305    /// And,
1306    /// a complicated future contraption *might* clone the `Waker` more times.
1307    /// So not every backtrace will necessarily be informative.
1308    ///
1309    /// ### Panics
1310    ///
1311    /// Panics on write errors.
1312    pub fn debug_dump(&self) {
1313        self.as_debug_dump().to_stderr();
1314    }
1315
1316    /// Dump the executor's state including backtraces of waiting tasks
1317    ///
1318    /// This is considerably more extensive than simply
1319    /// `MockExecutor as Debug`.
1320    ///
1321    /// Returns an object for formatting with [`Debug`].
1322    /// To simply print the dump to stderr (eg in a test),
1323    /// use [`.debug_dump()`](MockExecutor::debug_dump).
1324    ///
1325    /// **Backtrace salience (possible spurious traces)** -
1326    /// see [`.debug_dump()`](MockExecutor::debug_dump).
1327    pub fn as_debug_dump(&self) -> DebugDump {
1328        let data = self.shared.lock();
1329        DebugDump(Either::Right(data))
1330    }
1331}
1332
1333impl Data {
1334    /// Convenience function: dump including backtraces, to stderr
1335    fn debug_dump(&mut self) {
1336        DebugDump(Either::Left(self)).to_stderr();
1337    }
1338}
1339
1340impl DebugDump<'_> {
1341    /// Convenience function: dump tasks and backtraces to stderr
1342    #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
1343    fn to_stderr(self) {
1344        write!(io::stderr().lock(), "{:?}", self)
1345            .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
1346    }
1347}
1348
1349//---------- bespoke Debug impls ----------
1350
1351impl Debug for DebugDump<'_> {
1352    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1353        let self_: &Data = &self.0;
1354
1355        writeln!(f, "MockExecutor state:\n{self_:#?}")?;
1356        writeln!(f, "MockExecutor task dump:")?;
1357        self_.dump_backtraces(f)?;
1358
1359        Ok(())
1360    }
1361}
1362
1363// See `impl Debug for Data` for notes on the output
1364impl Debug for Task {
1365    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1366        let Task {
1367            desc,
1368            state,
1369            fut,
1370            is_subthread,
1371        } = self;
1372        write!(f, "{:?}", desc)?;
1373        write!(f, "=")?;
1374        match is_subthread {
1375            None => {}
1376            Some(IsSubthread) => write!(f, "T")?,
1377        }
1378        match fut {
1379            None => write!(f, "P")?,
1380            Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
1381            Some(TaskFutureInfo::Main) => write!(f, "m")?,
1382        }
1383        match state {
1384            Awake => write!(f, "W")?,
1385            Asleep(locs) => write!(f, "s{}", locs.len())?,
1386        };
1387        Ok(())
1388    }
1389}
1390
1391/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
1392///
1393/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
1394///
1395/// `FLAGS` are:
1396///
1397///  * `T`: this task is for a Subthread (from subthread_spawn).
1398///  * `P`: this task is being polled (its `TaskFutureInfo` is absent)
1399///  * `f`: this is a normal task with a future and its future is present in `Data`
1400///  * `m`: this is the main task from `block_on`
1401///
1402///  * `W`: the task is awake
1403///  * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
1404//
1405// We do it this way because the naive dump from derive is very expansive
1406// and makes it impossible to see the wood for the trees.
1407// This very compact representation it easier to find a task of interest in the output.
1408//
1409// This is implemented in `impl Debug for Task`.
1410//
1411//
1412// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
1413// This has caused quite some fallout.  https://github.com/rust-lang/rust/pull/85200
1414// I think derive_more emits #[automatically_derived], so that even though we use this
1415// in our Debug impl, that construction is unused.
1416#[allow(dead_code)]
1417struct DebugTasks<'d, F>(&'d Data, F);
1418
1419// See `impl Debug for Data` for notes on the output
1420impl<F, I> Debug for DebugTasks<'_, F>
1421where
1422    F: Fn() -> I,
1423    I: Iterator<Item = TaskId>,
1424{
1425    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1426        let DebugTasks(data, ids) = self;
1427        for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
1428            write!(f, "{delim}{id:?}")?;
1429            match data.tasks.get(id) {
1430                None => write!(f, "-")?,
1431                Some(task) => write!(f, "={task:?}")?,
1432            }
1433        }
1434        Ok(())
1435    }
1436}
1437
1438/// Mock `Backtrace` for miri
1439///
1440/// See also the not-miri `type SleepLocation`, alias above.
1441#[cfg(miri)]
1442mod miri_sleep_location {
1443    #[derive(Debug, derive_more::Display)]
1444    #[display("<SleepLocation>")]
1445    pub(super) struct SleepLocation {}
1446
1447    impl SleepLocation {
1448        pub(super) fn force_capture() -> Self {
1449            SleepLocation {}
1450        }
1451    }
1452}
1453#[cfg(miri)]
1454use miri_sleep_location::SleepLocation;
1455
1456#[cfg(test)]
1457mod test {
1458    // @@ begin test lint list maintained by maint/add_warning @@
1459    #![allow(clippy::bool_assert_comparison)]
1460    #![allow(clippy::clone_on_copy)]
1461    #![allow(clippy::dbg_macro)]
1462    #![allow(clippy::mixed_attributes_style)]
1463    #![allow(clippy::print_stderr)]
1464    #![allow(clippy::print_stdout)]
1465    #![allow(clippy::single_char_pattern)]
1466    #![allow(clippy::unwrap_used)]
1467    #![allow(clippy::unchecked_duration_subtraction)]
1468    #![allow(clippy::useless_vec)]
1469    #![allow(clippy::needless_pass_by_value)]
1470    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1471    use super::*;
1472    use futures::channel::mpsc;
1473    use futures::{SinkExt as _, StreamExt as _};
1474    use strum::IntoEnumIterator;
1475    use tracing::info;
1476
1477    #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
1478    use tracing_test::traced_test;
1479
1480    fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
1481        // This duplicates the part of the logic in MockRuntime::test_with_various which
1482        // relates to MockExecutor, because we don't have a MockRuntime::builder.
1483        // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
1484        SchedulingPolicy::iter().map(|scheduling| {
1485            eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
1486            MockExecutor::with_scheduling(scheduling)
1487        })
1488    }
1489
1490    #[cfg_attr(not(miri), traced_test)]
1491    #[test]
1492    fn simple() {
1493        let runtime = MockExecutor::default();
1494        let val = runtime.block_on(async { 42 });
1495        assert_eq!(val, 42);
1496    }
1497
1498    #[cfg_attr(not(miri), traced_test)]
1499    #[test]
1500    fn stall() {
1501        let runtime = MockExecutor::default();
1502
1503        runtime.block_on({
1504            let runtime = runtime.clone();
1505            async move {
1506                const N: usize = 3;
1507                let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
1508                    (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
1509
1510                let mut rx_n = rxs.pop().unwrap();
1511
1512                for (i, mut rx) in rxs.into_iter().enumerate() {
1513                    runtime.spawn_identified(i, {
1514                        let mut txs = txs.clone();
1515                        async move {
1516                            loop {
1517                                eprintln!("task {i} rx...");
1518                                let v = rx.next().await.unwrap();
1519                                let nv = v + 1;
1520                                eprintln!("task {i} rx {v}, tx {nv}");
1521                                let v = nv;
1522                                txs[v].send(v).await.unwrap();
1523                            }
1524                        }
1525                    });
1526                }
1527
1528                dbg!();
1529                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1530
1531                dbg!();
1532                runtime.progress_until_stalled().await;
1533
1534                dbg!();
1535                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1536
1537                dbg!();
1538                txs[0].send(0).await.unwrap();
1539
1540                dbg!();
1541                runtime.progress_until_stalled().await;
1542
1543                dbg!();
1544                let r = rx_n.next().await;
1545                assert_eq!(r, Some(N - 1));
1546
1547                dbg!();
1548                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1549
1550                runtime.spawn_identified("tx", {
1551                    let txs = txs.clone();
1552                    async {
1553                        eprintln!("sending task...");
1554                        for (i, mut tx) in txs.into_iter().enumerate() {
1555                            eprintln!("sending 0 to {i}...");
1556                            tx.send(0).await.unwrap();
1557                        }
1558                        eprintln!("sending task done");
1559                    }
1560                });
1561
1562                runtime.debug_dump();
1563
1564                for i in 0..txs.len() {
1565                    eprintln!("main {i} wait stall...");
1566                    runtime.progress_until_stalled().await;
1567                    eprintln!("main {i} rx wait...");
1568                    let r = rx_n.next().await;
1569                    eprintln!("main {i} rx = {r:?}");
1570                    assert!(r == Some(0) || r == Some(N - 1));
1571                }
1572
1573                eprintln!("finishing...");
1574                runtime.progress_until_stalled().await;
1575                eprintln!("finished.");
1576            }
1577        });
1578    }
1579
1580    #[cfg_attr(not(miri), traced_test)]
1581    #[test]
1582    fn spawn_blocking() {
1583        let runtime = MockExecutor::default();
1584
1585        runtime.block_on({
1586            let runtime = runtime.clone();
1587            async move {
1588                let task_1 = runtime.spawn_blocking(|| 42);
1589                let task_2 = runtime.spawn_blocking(|| 99);
1590
1591                assert_eq!(task_2.await, 99);
1592                assert_eq!(task_1.await, 42);
1593            }
1594        });
1595    }
1596
1597    #[cfg_attr(not(miri), traced_test)]
1598    #[test]
1599    fn drop_reentrancy() {
1600        // Check that dropping a completed task future is done *outside* the data lock.
1601        // Involves a contrived future whose Drop impl reenters the executor.
1602        //
1603        // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
1604        // is replaced with `drop(fut)` (dropping the future at the wrong moment),
1605        // we do indeed get deadlock, so this test case is working.
1606
1607        struct ReentersOnDrop {
1608            runtime: MockExecutor,
1609        }
1610        impl Future for ReentersOnDrop {
1611            type Output = ();
1612            fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
1613                Poll::Ready(())
1614            }
1615        }
1616        impl Drop for ReentersOnDrop {
1617            fn drop(&mut self) {
1618                self.runtime
1619                    .spawn_identified("dummy", futures::future::ready(()));
1620            }
1621        }
1622
1623        for runtime in various_mock_executors() {
1624            runtime.block_on(async {
1625                runtime.spawn_identified("trapper", {
1626                    let runtime = runtime.clone();
1627                    ReentersOnDrop { runtime }
1628                });
1629            });
1630        }
1631    }
1632
1633    #[cfg_attr(not(miri), traced_test)]
1634    #[test]
1635    fn subthread() {
1636        for runtime in various_mock_executors() {
1637            runtime.block_on(async {
1638                let (tx, rx) = oneshot::channel();
1639                info!("spawning subthread");
1640                let thr = runtime.subthread_spawn("thr1", {
1641                    let runtime = runtime.clone();
1642                    move || {
1643                        info!("subthread_block_on_future...");
1644                        let i = runtime.subthread_block_on_future(rx).unwrap();
1645                        info!("subthread_block_on_future => {i}");
1646                        i + 1
1647                    }
1648                });
1649                info!("main task sending");
1650                tx.send(12).unwrap();
1651                info!("main task sent");
1652                let r = thr.await.unwrap();
1653                info!("main task thr => {r}");
1654                assert_eq!(r, 13);
1655            });
1656        }
1657    }
1658}