Skip to main content

net/adapter/net/behavior/meshos/
executor.rs

1//! The action executor — drains the
2//! [`super::action::PendingAction`] queue the
3//! [`super::event_loop::MeshOsLoop`] fills, runs each action
4//! through the Phase G [`super::backpressure::BackpressureState::admit`]
5//! gate, and dispatches to a pluggable
6//! [`ActionDispatcher`].
7//!
8//! Locked decision #4 (action emission ≠ action execution): the
9//! executor is a separate task, not inlined in reconcile.
10//! Locked decision #10 (single backpressure layer): every
11//! action passes through one admit; deferrals re-enter via a
12//! per-executor `BinaryHeap` keyed by retry deadline; gates
13//! drop with a structured failure record.
14//!
15//! Phase-A through G shipped the upstream side (event loop +
16//! state + reconcile + gate); this module is the downstream
17//! consumer. The dispatcher itself is pluggable — a
18//! [`LoggingDispatcher`] ships for bootstrap / tests, and the
19//! production path will wrap [`super::action::MeshOsAction`]
20//! variants over the existing `DaemonRegistry`, migration
21//! orchestrator, and `MeshNode::send_subprotocol` paths.
22
23use std::cmp::Reverse;
24use std::collections::{BinaryHeap, VecDeque};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use futures::future::BoxFuture;
30use futures::FutureExt;
31use parking_lot::{Mutex, RwLock};
32use tokio::sync::mpsc;
33use tokio::time::sleep_until;
34
35use super::action::{MeshOsAction, PendingAction};
36use super::backpressure::{AdmissionResult, BackpressureState, ClusterBackpressureChange};
37use super::chain::{
38    append_dispatched, append_failed, append_gated, ActionChainAppender, AppendError,
39    NoOpActionChainAppender,
40};
41use super::config::MeshOsConfig;
42use super::snapshot::{FailureRecord, RECENT_FAILURES_CAPACITY};
43
44/// Pluggable action sink. The executor calls `dispatch` once
45/// per admitted action; the impl owns the substrate-side
46/// wiring (daemon registry, migration orchestrator, MeshDB
47/// admin commits, etc.).
48///
49/// Returns a [`BoxFuture`] so the trait stays dyn-compatible;
50/// production dispatchers spawn substrate-side futures
51/// themselves rather than blocking the executor task.
52pub trait ActionDispatcher: Send + Sync + 'static {
53    /// Dispatch an admitted action. Errors record on the
54    /// recent-failures ring buffer; the action is not retried
55    /// (admit / defer is the retry surface).
56    fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>>;
57
58    /// Cluster-wide backpressure flag transitioned. The executor
59    /// invokes this once per edge crossing — `Asserted` when the
60    /// action-queue depth crosses the high-water mark, `Released`
61    /// when it drops below the low-water mark. Production
62    /// dispatchers fan `DaemonControl::BackpressureOn { level }` /
63    /// `DaemonControl::BackpressureOff` out to supervised daemons
64    /// so they can shed optional work. Default impl is a no-op —
65    /// dispatchers that don't supervise daemons (e.g. the test
66    /// logger) can ignore the hook.
67    fn on_cluster_backpressure(&self, _change: ClusterBackpressureChange) {}
68}
69
70/// Dispatch error surface. Carries the operator-readable reason
71/// and an optional retry hint — the executor honors the hint
72/// by re-queuing the action through `admit()` after the hint
73/// elapses (if any).
74#[derive(Clone, Debug, Eq, PartialEq)]
75pub struct DispatchError {
76    /// Operator-readable reason.
77    pub reason: String,
78    /// Optional retry hint — if `Some`, the executor re-enters
79    /// the action through `admit()` after this duration. `None`
80    /// drops the action (the typical case — admit is the retry
81    /// surface).
82    pub retry_after: Option<Duration>,
83}
84
85impl DispatchError {
86    /// Construct a non-retried error.
87    pub fn drop(reason: impl Into<String>) -> Self {
88        Self {
89            reason: reason.into(),
90            retry_after: None,
91        }
92    }
93
94    /// Construct a retried error.
95    pub fn retry(reason: impl Into<String>, after: Duration) -> Self {
96        Self {
97            reason: reason.into(),
98            retry_after: Some(after),
99        }
100    }
101}
102
103/// Logging-only dispatcher. Records every admitted action in an
104/// internal `Mutex<Vec<MeshOsAction>>` and returns `Ok(())`.
105/// Useful for bootstrap (before real subsystem wiring lands) +
106/// the executor's unit tests.
107#[derive(Debug, Default)]
108pub struct LoggingDispatcher {
109    log: Mutex<Vec<MeshOsAction>>,
110    fail_next: Mutex<Option<DispatchError>>,
111    backpressure_log: Mutex<Vec<ClusterBackpressureChange>>,
112}
113
114impl LoggingDispatcher {
115    /// Construct an empty logger.
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Snapshot of the actions dispatched so far.
121    pub fn log(&self) -> Vec<MeshOsAction> {
122        self.log.lock().clone()
123    }
124
125    /// Inject an error to surface on the next `dispatch` call.
126    /// Used by tests to exercise the retry / drop paths.
127    pub fn fail_next(&self, err: DispatchError) {
128        *self.fail_next.lock() = Some(err);
129    }
130
131    /// Snapshot of the cluster-backpressure transitions the
132    /// executor has surfaced through `on_cluster_backpressure`.
133    pub fn backpressure_log(&self) -> Vec<ClusterBackpressureChange> {
134        self.backpressure_log.lock().clone()
135    }
136}
137
138impl ActionDispatcher for LoggingDispatcher {
139    fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>> {
140        Box::pin(async move {
141            if let Some(err) = self.fail_next.lock().take() {
142                return Err(err);
143            }
144            self.log.lock().push(action);
145            Ok(())
146        })
147    }
148
149    fn on_cluster_backpressure(&self, change: ClusterBackpressureChange) {
150        self.backpressure_log.lock().push(change);
151    }
152}
153
154/// Counters the executor maintains for diagnostics / Deck
155/// rendering. Returned by [`ActionExecutor::run`] when the
156/// task exits; sampled live via [`ExecutorHandle::stats`].
157#[derive(Debug, Default)]
158pub struct ExecutorStats {
159    /// Total actions admitted + successfully dispatched.
160    pub dispatched: AtomicU64,
161    /// Total actions admitted but failed in dispatch (no retry).
162    pub failed: AtomicU64,
163    /// Total actions deferred via `AdmissionResult::Defer`.
164    /// Re-admits count here each time, so a flapping action
165    /// inflates the metric — the queue-depth gauge is the
166    /// healthy signal, not this counter.
167    pub deferred: AtomicU64,
168    /// Total actions hard-gated via `AdmissionResult::Gate`.
169    pub gated: AtomicU64,
170    /// Total actions retried via a dispatch error's
171    /// `retry_after` hint.
172    pub dispatch_retries: AtomicU64,
173    /// Number of cluster-backpressure assert transitions
174    /// surfaced to the dispatcher.
175    pub cluster_backpressure_asserts: AtomicU64,
176    /// Number of cluster-backpressure release transitions
177    /// surfaced to the dispatcher.
178    pub cluster_backpressure_releases: AtomicU64,
179    /// Total times an `append_dispatched` / `append_failed` /
180    /// `append_gated` / `append_deferred` call returned an error.
181    /// The dispatch itself already succeeded (or hit its terminal
182    /// state) when this counter ticks — the chain record is
183    /// missing for that action, but the action's effect is real.
184    /// Non-zero indicates the chain appender is dropping records;
185    /// in-memory ring and dispatcher state remain consistent.
186    pub chain_append_failures: AtomicU64,
187}
188
189impl ExecutorStats {
190    fn inc(counter: &AtomicU64) {
191        counter.fetch_add(1, Ordering::Relaxed);
192    }
193}
194
195/// Internal heap entry — `Reverse<Instant>` so the smallest
196/// retry deadline is popped first.
197struct DeferredEntry {
198    retry_at: Instant,
199    action: PendingAction,
200    /// Number of times this action has been deferred. Capped by
201    /// `BackpressureConfig::max_defer_count`; past the cap the
202    /// executor drops the action with a structured failure
203    /// record rather than keep it on the heap forever.
204    defer_count: u32,
205}
206
207impl PartialEq for DeferredEntry {
208    fn eq(&self, other: &Self) -> bool {
209        self.retry_at == other.retry_at
210    }
211}
212impl Eq for DeferredEntry {}
213impl PartialOrd for DeferredEntry {
214    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
215        Some(self.cmp(other))
216    }
217}
218impl Ord for DeferredEntry {
219    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
220        // Min-heap by retry_at: smallest first.
221        Reverse(self.retry_at).cmp(&Reverse(other.retry_at))
222    }
223}
224
225/// The executor task body. Owns:
226///
227/// - the receiver side of the loop's action queue,
228/// - a [`BackpressureState`] (Phase G),
229/// - the dispatcher,
230/// - the deferred-retry heap.
231///
232/// Construct via [`ActionExecutor::new`]; drive via
233/// [`ActionExecutor::run`].
234pub struct ActionExecutor<D: ActionDispatcher> {
235    actions_rx: mpsc::Receiver<PendingAction>,
236    config: Arc<MeshOsConfig>,
237    backpressure: BackpressureState,
238    dispatcher: Arc<D>,
239    deferred: BinaryHeap<DeferredEntry>,
240    /// Bounded ring of recent dispatch failures. Shared with the
241    /// loop so the snapshot publish path can copy it into the
242    /// `MeshOsSnapshot::recent_failures` field without going
243    /// through the chain-fold path (which requires a real
244    /// `ActionChainAppender` to be wired). Writer is the
245    /// executor task; reader is the loop task on every publish.
246    recent_failures: Arc<RwLock<VecDeque<FailureRecord>>>,
247    /// Monotonic counter the executor stamps onto every
248    /// `FailureRecord` it pushes. Same dedup primitive as the
249    /// admin audit ring's seq — the Deck SDK's
250    /// `subscribe_failures` stream uses it. Shared via
251    /// `Arc<AtomicU64>` because the loop also records failures
252    /// (e.g. migration-abort dispatch errors) and needs to
253    /// stamp the same monotonic sequence — without the shared
254    /// counter, loop-recorded failures would collide with
255    /// executor-recorded ones at the SDK dedup gate.
256    failure_seq: Arc<AtomicU64>,
257    /// Failure chain appender. Production deployments wire a
258    /// `TypedRedexFile<FailureRecord>` here so the failure
259    /// ring's bounded history extends to cluster-lifetime
260    /// replay. Default is `NoOpFailureChainAppender` — only
261    /// the in-memory ring is observable when no chain is
262    /// wired.
263    failure_appender: Arc<dyn super::failure_chain::FailureChainAppender>,
264    stats: Arc<ExecutorStats>,
265    /// Optional action-chain appender. Each admit/dispatch
266    /// outcome appends an [`super::chain::ActionChainRecord`].
267    /// Defaults to [`NoOpActionChainAppender`] — a real
268    /// appender wires only when a chain consumer is set up.
269    chain_appender: Arc<dyn ActionChainAppender>,
270}
271
272impl<D: ActionDispatcher> ActionExecutor<D> {
273    /// Build an executor. `actions_rx` is the loop's queue
274    /// (returned by [`super::event_loop::MeshOsLoop::new`]).
275    pub fn new(
276        actions_rx: mpsc::Receiver<PendingAction>,
277        config: Arc<MeshOsConfig>,
278        dispatcher: Arc<D>,
279    ) -> Self {
280        Self {
281            actions_rx,
282            config,
283            backpressure: BackpressureState::new(),
284            dispatcher,
285            deferred: BinaryHeap::new(),
286            recent_failures: Arc::new(RwLock::new(VecDeque::with_capacity(
287                RECENT_FAILURES_CAPACITY,
288            ))),
289            failure_seq: Arc::new(AtomicU64::new(0)),
290            failure_appender: super::failure_chain::no_op_arc(),
291            stats: Arc::new(ExecutorStats::default()),
292            chain_appender: Arc::new(NoOpActionChainAppender),
293        }
294    }
295
296    /// Attach a [`super::failure_chain::FailureChainAppender`].
297    /// The executor's `record_failure` path dual-writes every
298    /// failure to both the in-memory ring (snapshot readable)
299    /// and this appender (chain-backed history). Without an
300    /// explicit appender the executor uses the no-op default.
301    pub fn with_failure_appender(
302        mut self,
303        appender: Arc<dyn super::failure_chain::FailureChainAppender>,
304    ) -> Self {
305        self.failure_appender = appender;
306        self
307    }
308
309    /// Clone the shared recent-failures ring. The runtime hands
310    /// this to the loop so the snapshot publish path can copy it
311    /// into the [`super::snapshot::MeshOsSnapshot::recent_failures`]
312    /// field — the chain-fold path is not the only failure
313    /// surface.
314    pub fn recent_failures_handle(&self) -> Arc<RwLock<VecDeque<FailureRecord>>> {
315        Arc::clone(&self.recent_failures)
316    }
317
318    /// Clone the shared failure-seq counter. The loop side
319    /// records its own failures (e.g. migration-abort
320    /// dispatcher errors) and needs the same monotonic
321    /// sequence so SDK consumers' dedup gate doesn't see
322    /// colliding `seq` values.
323    pub fn failure_seq_handle(&self) -> Arc<AtomicU64> {
324        Arc::clone(&self.failure_seq)
325    }
326
327    /// Clone the shared failure-chain appender. The loop side
328    /// dual-writes records via its internal
329    /// `record_runtime_failure` helper so the durable chain
330    /// history covers loop-side failures too, not just
331    /// executor-side dispatch failures.
332    pub fn failure_appender_handle(&self) -> Arc<dyn super::failure_chain::FailureChainAppender> {
333        Arc::clone(&self.failure_appender)
334    }
335
336    /// Builder: install an action-chain appender. The default
337    /// `NoOpActionChainAppender` swallows every record; a real
338    /// appender (e.g. one writing to a RedEX chain consumed by
339    /// `MeshOsSnapshotFold`) takes over per-action recording.
340    pub fn with_chain_appender(mut self, appender: Arc<dyn ActionChainAppender>) -> Self {
341        self.chain_appender = appender;
342        self
343    }
344
345    /// Handle on the executor's live state — `stats` + the
346    /// recent-failures snapshot. Cheap to clone (Arc /
347    /// fixed-size copies). Useful for Phase F snapshot
348    /// building from outside the task.
349    pub fn handle(&self) -> ExecutorHandle {
350        ExecutorHandle {
351            stats: Arc::clone(&self.stats),
352        }
353    }
354
355    /// Clone the stats `Arc`. Useful for the [`super::runtime::MeshOsRuntime`]
356    /// stitching layer, which holds the Arc across `run()`'s
357    /// consumption of `self`.
358    pub fn stats_arc(&self) -> Arc<ExecutorStats> {
359        Arc::clone(&self.stats)
360    }
361
362    /// Drive the executor until either the action receiver
363    /// closes (the loop dropped its sender) or the inner
364    /// dispatcher panics. Returns the accumulated stats.
365    #[expect(
366        clippy::expect_used,
367        reason = "tokio::select arm is gated on `next_deadline.is_some()` which means the prior `peek()` returned Some; pop() on the same heap must succeed"
368    )]
369    pub async fn run(mut self) -> Arc<ExecutorStats> {
370        // Periodic idle tick. The cluster-backpressure release
371        // edge only fires inside `handle_one_retry`, so a queue
372        // that drains below the low-water mark while no fresh
373        // action arrives would otherwise leave daemons throttled
374        // forever in a quiet steady-state cluster. The tick polls
375        // the live queue depth on every 100 ms boundary so the
376        // release surfaces independently of incoming actions.
377        let mut idle_tick = tokio::time::interval(Duration::from_millis(100));
378        idle_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
379        loop {
380            let next_deadline = self.deferred.peek().map(|e| e.retry_at);
381            tokio::select! {
382                action = self.actions_rx.recv() => {
383                    let Some(action) = action else { break };
384                    self.handle_one(action).await;
385                }
386                _ = sleep_until_opt(next_deadline), if next_deadline.is_some() => {
387                    // SAFETY: peek above returned Some.
388                    let due = self.deferred.pop().expect("deferred heap non-empty");
389                    self.handle_one_retry(due.action, due.defer_count).await;
390                }
391                _ = idle_tick.tick() => {
392                    self.poll_cluster_backpressure();
393                }
394            }
395        }
396        Arc::clone(&self.stats)
397    }
398
399    /// Re-evaluate the cluster-backpressure hysteresis with the
400    /// current live queue depth, without consuming an action.
401    /// Used by the idle tick and the post-release path so the
402    /// `Released` edge fires even with zero in-flight actions.
403    /// Pre-fix `update_cluster_backpressure` had only one
404    /// non-test caller (inside `handle_one_retry`); a queue that
405    /// drained below the low-water mark while no fresh action
406    /// arrived left daemons throttled indefinitely.
407    fn poll_cluster_backpressure(&mut self) {
408        let depth = self.actions_rx.len() + self.deferred.len();
409        let change = self
410            .backpressure
411            .update_cluster_backpressure(depth, &self.config.backpressure);
412        match change {
413            ClusterBackpressureChange::Asserted => {
414                ExecutorStats::inc(&self.stats.cluster_backpressure_asserts);
415                self.dispatcher.on_cluster_backpressure(change);
416            }
417            ClusterBackpressureChange::Released => {
418                ExecutorStats::inc(&self.stats.cluster_backpressure_releases);
419                self.dispatcher.on_cluster_backpressure(change);
420            }
421            ClusterBackpressureChange::Steady => {}
422        }
423    }
424
425    async fn handle_one(&mut self, action: PendingAction) {
426        self.handle_one_retry(action, 0).await
427    }
428
429    async fn handle_one_retry(&mut self, action: PendingAction, prior_defers: u32) {
430        // Source `now` from tokio's clock and convert to std so the
431        // deferred-heap deadlines stay coherent with tokio::time::sleep_until
432        // — under `tokio::time::pause()` the std::Instant::now() formulation
433        // diverged from the paused timer, leaving tests unable to drive
434        // deferred-retry semantics. into_std() round-trips through
435        // tokio::time::Instant::from_std() in sleep_until_opt below.
436        let now = tokio::time::Instant::now().into_std();
437        self.backpressure.tick(now);
438        // Compute live queue depth (channel + deferred heap) and
439        // run hysteresis; surface edge crossings to the
440        // dispatcher so it can broadcast
441        // `DaemonControl::BackpressureOn`/`Off` to supervised
442        // daemons.
443        let depth = self.actions_rx.len() + self.deferred.len() + 1;
444        let change = self
445            .backpressure
446            .update_cluster_backpressure(depth, &self.config.backpressure);
447        match change {
448            ClusterBackpressureChange::Asserted => {
449                ExecutorStats::inc(&self.stats.cluster_backpressure_asserts);
450                self.dispatcher.on_cluster_backpressure(change);
451            }
452            ClusterBackpressureChange::Released => {
453                ExecutorStats::inc(&self.stats.cluster_backpressure_releases);
454                self.dispatcher.on_cluster_backpressure(change);
455            }
456            ClusterBackpressureChange::Steady => {}
457        }
458        match self
459            .backpressure
460            .admit(action.id, &action.action, now, &self.config.backpressure)
461        {
462            AdmissionResult::Admit => {
463                self.dispatch_now_with_defer_count(action, now, prior_defers)
464                    .await
465            }
466            AdmissionResult::Defer { retry_after } => {
467                let next_count = prior_defers.saturating_add(1);
468                if next_count > self.config.backpressure.max_defer_count {
469                    ExecutorStats::inc(&self.stats.failed);
470                    let reason = format!(
471                        "deferred {next_count} times — exceeds max_defer_count {}",
472                        self.config.backpressure.max_defer_count,
473                    );
474                    self.record_failure(format!("action-id:{}", action.id.0), reason.clone());
475                    let r = append_failed(&self.chain_appender, &action, reason, None);
476                    self.record_chain_append(action.id.0, "failed_defer_budget", r);
477                    return;
478                }
479                ExecutorStats::inc(&self.stats.deferred);
480                self.deferred.push(DeferredEntry {
481                    retry_at: now.checked_add(retry_after).unwrap_or(now),
482                    action,
483                    defer_count: next_count,
484                });
485            }
486            AdmissionResult::Gate {
487                cooldown_until,
488                reason,
489            } => {
490                ExecutorStats::inc(&self.stats.gated);
491                let age = cooldown_until.saturating_duration_since(now);
492                let cooldown_ms = age.as_millis() as u64;
493                self.record_failure(
494                    format!("action-id:{}", action.id.0),
495                    format!("gated ({reason}) for {cooldown_ms} ms"),
496                );
497                let r = append_gated(
498                    &self.chain_appender,
499                    &action,
500                    reason.to_string(),
501                    Some(cooldown_ms),
502                );
503                self.record_chain_append(action.id.0, "gated", r);
504            }
505        }
506    }
507
508    async fn dispatch_now_with_defer_count(
509        &mut self,
510        action: PendingAction,
511        admit_anchor: Instant,
512        prior_defers: u32,
513    ) {
514        // Wrap the dispatcher in `catch_unwind` so a panicking
515        // future doesn't unwind the executor task. The trait is
516        // pluggable + third-party-installed; trust-but-isolate.
517        let dispatch_future = self.dispatcher.dispatch(action.action.clone());
518        let result = match std::panic::AssertUnwindSafe(dispatch_future)
519            .catch_unwind()
520            .await
521        {
522            Ok(result) => result,
523            Err(_) => {
524                tracing::error!(
525                    target: "meshos",
526                    action_id = action.id.0,
527                    "dispatcher panicked — recording as drop",
528                );
529                Err(DispatchError::drop("dispatcher panicked"))
530            }
531        };
532        match result {
533            Ok(()) => {
534                ExecutorStats::inc(&self.stats.dispatched);
535                let r = append_dispatched(&self.chain_appender, &action);
536                self.record_chain_append(action.id.0, "dispatched", r);
537            }
538            Err(err) => {
539                // Dispatch did not happen — roll back the
540                // reservations admit installed against this
541                // action's id so unrelated future actions aren't
542                // gated by a side effect that never occurred.
543                self.backpressure
544                    .release_failed_admit(action.id, &action.action);
545                // Refresh cluster-backpressure: the release just
546                // dropped one in-flight action's reservation, so a
547                // queue that was hovering near the release water
548                // mark should surface that edge here rather than
549                // wait for the next idle tick.
550                self.poll_cluster_backpressure();
551                let _ = admit_anchor;
552                if let Some(after) = err.retry_after {
553                    // Dispatch-error retries share the
554                    // max_defer_count budget with admit-side
555                    // defers — both occupy the same heap, both
556                    // are "this action couldn't run, try later".
557                    let next_count = prior_defers.saturating_add(1);
558                    if next_count > self.config.backpressure.max_defer_count {
559                        ExecutorStats::inc(&self.stats.failed);
560                        let reason =
561                            format!("dispatch retry budget exhausted after {next_count} attempts",);
562                        self.record_failure(format!("action-id:{}", action.id.0), reason.clone());
563                        let r = append_failed(&self.chain_appender, &action, reason, None);
564                        self.record_chain_append(action.id.0, "failed_retry_budget", r);
565                        return;
566                    }
567                    ExecutorStats::inc(&self.stats.dispatch_retries);
568                    let retry_ms = after.as_millis() as u64;
569                    let r = append_failed(
570                        &self.chain_appender,
571                        &action,
572                        err.reason.clone(),
573                        Some(retry_ms),
574                    );
575                    self.record_chain_append(action.id.0, "failed_retry", r);
576                    // Same time-source rationale as handle_one_retry above.
577                    let now = tokio::time::Instant::now().into_std();
578                    self.deferred.push(DeferredEntry {
579                        retry_at: now.checked_add(after).unwrap_or(now),
580                        action,
581                        defer_count: next_count,
582                    });
583                } else {
584                    ExecutorStats::inc(&self.stats.failed);
585                    let reason = err.reason.clone();
586                    self.record_failure(format!("action-id:{}", action.id.0), err.reason);
587                    let r = append_failed(&self.chain_appender, &action, reason, None);
588                    self.record_chain_append(action.id.0, "failed", r);
589                }
590            }
591        }
592    }
593
594    /// Record the outcome of a chain-append call. Bumps the
595    /// `chain_append_failures` counter on `Err` and emits a
596    /// warn log so operators can see the chain is dropping
597    /// records — the dispatch / admit / gate side effect
598    /// already happened either way.
599    fn record_chain_append(
600        &self,
601        action_id: u64,
602        kind: &'static str,
603        result: Result<(), AppendError>,
604    ) {
605        if let Err(e) = result {
606            self.stats
607                .chain_append_failures
608                .fetch_add(1, Ordering::Relaxed);
609            tracing::warn!(
610                target: "meshos",
611                action_id,
612                kind,
613                error = %e,
614                "executor chain append failed; in-memory state stayed consistent \
615                 but the action's chain record is missing",
616            );
617        }
618    }
619
620    fn record_failure(&mut self, source: String, reason: String) {
621        let recorded_at_ms = std::time::SystemTime::now()
622            .duration_since(std::time::UNIX_EPOCH)
623            .map(|d| d.as_millis() as u64)
624            .unwrap_or(0);
625        let seq = self.failure_seq.fetch_add(1, Ordering::SeqCst) + 1;
626        let record = FailureRecord {
627            seq,
628            source,
629            reason,
630            recorded_at_ms,
631        };
632        // Dual-write: chain first, ring second. The chain
633        // append is non-fatal — a hiccup there must never
634        // wedge the executor's dispatch loop.
635        if let Err(err) = self.failure_appender.append(&record) {
636            tracing::warn!(
637                target: "meshos",
638                seq = record.seq,
639                error = %err,
640                "failure-chain append failed — record kept on in-memory ring only",
641            );
642        }
643        let mut ring = self.recent_failures.write();
644        if ring.len() >= RECENT_FAILURES_CAPACITY {
645            ring.pop_front();
646        }
647        ring.push_back(record);
648    }
649}
650
651/// External handle for sampling executor live state.
652#[derive(Clone)]
653pub struct ExecutorHandle {
654    stats: Arc<ExecutorStats>,
655}
656
657impl ExecutorHandle {
658    /// Sample the current stats. Atomic loads; consistent
659    /// per-counter but not as a single snapshot.
660    pub fn stats(&self) -> ExecutorStatsSnapshot {
661        ExecutorStatsSnapshot {
662            dispatched: self.stats.dispatched.load(Ordering::Relaxed),
663            failed: self.stats.failed.load(Ordering::Relaxed),
664            deferred: self.stats.deferred.load(Ordering::Relaxed),
665            gated: self.stats.gated.load(Ordering::Relaxed),
666            dispatch_retries: self.stats.dispatch_retries.load(Ordering::Relaxed),
667            cluster_backpressure_asserts: self
668                .stats
669                .cluster_backpressure_asserts
670                .load(Ordering::Relaxed),
671            cluster_backpressure_releases: self
672                .stats
673                .cluster_backpressure_releases
674                .load(Ordering::Relaxed),
675            chain_append_failures: self.stats.chain_append_failures.load(Ordering::Relaxed),
676        }
677    }
678}
679
680/// Plain-value stats snapshot (no atomics; safe to copy +
681/// serialize).
682#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
683pub struct ExecutorStatsSnapshot {
684    /// Total actions admitted + successfully dispatched.
685    pub dispatched: u64,
686    /// Total actions admitted but failed in dispatch.
687    pub failed: u64,
688    /// Total `AdmissionResult::Defer` re-queues.
689    pub deferred: u64,
690    /// Total `AdmissionResult::Gate` drops.
691    pub gated: u64,
692    /// Total dispatch errors retried via `retry_after`.
693    pub dispatch_retries: u64,
694    /// Number of cluster-backpressure assert edges surfaced.
695    pub cluster_backpressure_asserts: u64,
696    /// Number of cluster-backpressure release edges surfaced.
697    pub cluster_backpressure_releases: u64,
698    /// Times the chain appender returned `Err` for an action
699    /// the executor was attempting to record (dispatched /
700    /// failed / gated / deferred). Non-zero means the chain is
701    /// missing records — the in-memory ring and dispatcher state
702    /// remain consistent, but downstream chain consumers will
703    /// not see those entries.
704    pub chain_append_failures: u64,
705}
706
707async fn sleep_until_opt(deadline: Option<Instant>) {
708    if let Some(deadline) = deadline {
709        sleep_until(tokio::time::Instant::from_std(deadline)).await;
710    } else {
711        // No deferred work — park forever. The select! arm
712        // gating on `if next_deadline.is_some()` keeps this
713        // branch from ever being polled when no deadline is
714        // pending.
715        std::future::pending::<()>().await;
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use std::time::Duration;
722
723    use tokio::sync::mpsc;
724
725    use super::super::action::{ActionId, MaintenanceTransition};
726    use super::super::config::MeshOsConfig;
727    use super::super::event::{ChainId, DaemonRef};
728    use super::*;
729
730    fn pending(id: u64, action: MeshOsAction) -> PendingAction {
731        PendingAction {
732            id: ActionId(id),
733            action,
734            emitted_at: Instant::now(),
735        }
736    }
737
738    fn dref(name: &str, id: u64) -> DaemonRef {
739        DaemonRef {
740            id,
741            name: name.into(),
742        }
743    }
744
745    fn fast_cfg() -> Arc<MeshOsConfig> {
746        Arc::new(MeshOsConfig::default())
747    }
748
749    /// Chain appender that always returns Err. Used to pin the
750    /// counter-bumping behavior — dispatch must still succeed,
751    /// but `chain_append_failures` records the dropped record.
752    struct FailingChainAppender;
753
754    impl super::super::chain::ActionChainAppender for FailingChainAppender {
755        fn append(
756            &self,
757            _record: super::super::chain::ActionChainRecord,
758        ) -> Result<(), AppendError> {
759            Err(AppendError {
760                reason: "test-injected appender failure".into(),
761            })
762        }
763    }
764
765    #[tokio::test]
766    async fn chain_append_failure_bumps_counter_but_dispatch_still_succeeds() {
767        let (tx, rx) = mpsc::channel(8);
768        let cfg = fast_cfg();
769        let dispatcher = Arc::new(LoggingDispatcher::new());
770        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher))
771            .with_chain_appender(Arc::new(FailingChainAppender));
772        let task = tokio::spawn(exec.run());
773
774        tx.send(pending(
775            1,
776            MeshOsAction::CommitMaintenanceTransition {
777                node: 1,
778                target: MaintenanceTransition::Maintenance,
779            },
780        ))
781        .await
782        .unwrap();
783        drop(tx);
784
785        let stats = task.await.expect("join");
786        // Dispatch still happened — chain miss is not a correctness gap.
787        assert_eq!(stats.dispatched.load(Ordering::Relaxed), 1);
788        assert_eq!(dispatcher.log().len(), 1);
789        // And the counter recorded the dropped chain record.
790        assert_eq!(stats.chain_append_failures.load(Ordering::Relaxed), 1);
791    }
792
793    #[tokio::test]
794    async fn admitted_actions_reach_the_dispatcher() {
795        let (tx, rx) = mpsc::channel(8);
796        let cfg = fast_cfg();
797        let dispatcher = Arc::new(LoggingDispatcher::new());
798        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
799        let task = tokio::spawn(exec.run());
800
801        tx.send(pending(
802            1,
803            MeshOsAction::CommitMaintenanceTransition {
804                node: 1,
805                target: MaintenanceTransition::Maintenance,
806            },
807        ))
808        .await
809        .unwrap();
810        tx.send(pending(
811            2,
812            MeshOsAction::CommitMaintenanceTransition {
813                node: 1,
814                target: MaintenanceTransition::Active,
815            },
816        ))
817        .await
818        .unwrap();
819        drop(tx);
820
821        let stats = task.await.expect("join");
822        assert_eq!(stats.dispatched.load(Ordering::Relaxed), 2);
823        assert_eq!(dispatcher.log().len(), 2);
824    }
825
826    #[tokio::test]
827    async fn gated_actions_do_not_reach_the_dispatcher() {
828        let (tx, rx) = mpsc::channel(8);
829        let cfg = fast_cfg();
830        let dispatcher = Arc::new(LoggingDispatcher::new());
831        let mut exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
832        // Pre-load the daemon gate so StartDaemon is gated.
833        let d = dref("telemetry", 1);
834        exec.backpressure
835            .record_daemon_gate(d.clone(), Instant::now() + Duration::from_secs(60));
836        let task = tokio::spawn(exec.run());
837
838        tx.send(pending(1, MeshOsAction::StartDaemon { daemon: d }))
839            .await
840            .unwrap();
841        drop(tx);
842
843        let stats = task.await.expect("join");
844        assert_eq!(stats.dispatched.load(Ordering::Relaxed), 0);
845        assert_eq!(stats.gated.load(Ordering::Relaxed), 1);
846        assert_eq!(dispatcher.log().len(), 0);
847    }
848
849    #[tokio::test]
850    async fn deferred_actions_eventually_reach_the_dispatcher() {
851        // Two PullReplica in quick succession; default
852        // pull_cooldown is 250 ms so the second defers.
853        // tokio::time::pause() doesn't compose with our
854        // Instant::now() reads (we use std time, not tokio
855        // time), so we rely on real-time delays — pull_cooldown
856        // is 250 ms, drift is small enough.
857        let (tx, rx) = mpsc::channel(8);
858        let cfg = fast_cfg();
859        let dispatcher = Arc::new(LoggingDispatcher::new());
860        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
861        let task = tokio::spawn(exec.run());
862
863        let chain_a: ChainId = 1;
864        let chain_b: ChainId = 2;
865        tx.send(pending(
866            1,
867            MeshOsAction::PullReplica {
868                chain: chain_a,
869                source: 5,
870            },
871        ))
872        .await
873        .unwrap();
874        tx.send(pending(
875            2,
876            MeshOsAction::PullReplica {
877                chain: chain_b,
878                source: 5,
879            },
880        ))
881        .await
882        .unwrap();
883
884        // Give the executor enough wall time to: dispatch the
885        // first, defer the second, wake up after the cooldown,
886        // and dispatch the second.
887        tokio::time::sleep(Duration::from_millis(500)).await;
888        drop(tx);
889
890        let stats = task.await.expect("join");
891        assert_eq!(
892            stats.dispatched.load(Ordering::Relaxed),
893            2,
894            "both pulls should eventually reach the dispatcher",
895        );
896        assert!(
897            stats.deferred.load(Ordering::Relaxed) >= 1,
898            "second pull should have been deferred at least once",
899        );
900        assert_eq!(dispatcher.log().len(), 2);
901    }
902
903    #[tokio::test]
904    async fn dispatch_errors_without_retry_record_failures() {
905        let (tx, rx) = mpsc::channel(8);
906        let cfg = fast_cfg();
907        let dispatcher = Arc::new(LoggingDispatcher::new());
908        dispatcher.fail_next(DispatchError::drop("boom"));
909        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
910        let task = tokio::spawn(exec.run());
911
912        tx.send(pending(
913            1,
914            MeshOsAction::CommitMaintenanceTransition {
915                node: 1,
916                target: MaintenanceTransition::Active,
917            },
918        ))
919        .await
920        .unwrap();
921        drop(tx);
922
923        let stats = task.await.expect("join");
924        assert_eq!(stats.dispatched.load(Ordering::Relaxed), 0);
925        assert_eq!(stats.failed.load(Ordering::Relaxed), 1);
926    }
927
928    #[tokio::test]
929    async fn failure_chain_appender_receives_every_recorded_failure() {
930        use super::super::failure_chain::BufferingFailureChainAppender;
931        let (tx, rx) = mpsc::channel(8);
932        let cfg = fast_cfg();
933        let dispatcher = Arc::new(LoggingDispatcher::new());
934        dispatcher.fail_next(DispatchError::drop("first boom"));
935        let appender = Arc::new(BufferingFailureChainAppender::default());
936        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher)).with_failure_appender(
937            appender.clone() as Arc<dyn super::super::failure_chain::FailureChainAppender>,
938        );
939        // Capture the failure ring handle BEFORE moving `exec`
940        // into the spawned task.
941        let ring_handle = exec.recent_failures_handle();
942        let task = tokio::spawn(exec.run());
943
944        tx.send(pending(
945            1,
946            MeshOsAction::CommitMaintenanceTransition {
947                node: 1,
948                target: MaintenanceTransition::Active,
949            },
950        ))
951        .await
952        .unwrap();
953        drop(tx);
954        let _ = task.await.expect("join");
955
956        let captured = appender.captured();
957        assert_eq!(captured.len(), 1, "appender should see one record");
958        assert!(captured[0].reason.contains("first boom"));
959        assert!(captured[0].seq > 0);
960
961        // Appender + executor ring see the SAME record.
962        let ring: Vec<FailureRecord> = ring_handle.read().iter().cloned().collect();
963        assert_eq!(ring.len(), 1);
964        assert_eq!(ring[0].seq, captured[0].seq);
965        assert_eq!(ring[0].reason, captured[0].reason);
966    }
967
968    #[tokio::test]
969    async fn dispatch_errors_with_retry_re_enqueue() {
970        let (tx, rx) = mpsc::channel(8);
971        let cfg = fast_cfg();
972        let dispatcher = Arc::new(LoggingDispatcher::new());
973        // First call errors with a 50 ms retry; the second
974        // call (after re-queue) succeeds.
975        dispatcher.fail_next(DispatchError::retry("transient", Duration::from_millis(50)));
976        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
977        let task = tokio::spawn(exec.run());
978
979        tx.send(pending(
980            1,
981            MeshOsAction::CommitMaintenanceTransition {
982                node: 1,
983                target: MaintenanceTransition::Active,
984            },
985        ))
986        .await
987        .unwrap();
988        // Wait long enough for the retry to fire + drain.
989        tokio::time::sleep(Duration::from_millis(200)).await;
990        drop(tx);
991
992        let stats = task.await.expect("join");
993        assert_eq!(stats.dispatched.load(Ordering::Relaxed), 1);
994        assert_eq!(stats.dispatch_retries.load(Ordering::Relaxed), 1);
995    }
996
997    #[tokio::test]
998    async fn executor_exits_when_sender_drops() {
999        let (tx, rx) = mpsc::channel(8);
1000        let cfg = fast_cfg();
1001        let dispatcher = Arc::new(LoggingDispatcher::new());
1002        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1003        let task = tokio::spawn(exec.run());
1004        drop(tx);
1005        let stats = tokio::time::timeout(Duration::from_secs(2), task)
1006            .await
1007            .expect("executor did not exit after sender dropped")
1008            .expect("join");
1009        assert_eq!(stats.dispatched.load(Ordering::Relaxed), 0);
1010    }
1011
1012    #[tokio::test]
1013    async fn dispatch_retry_drops_after_exceeding_max_defer_count() {
1014        // Regression for I7: a dispatcher that returns
1015        // `retry_after` forever (a poison pill) used to occupy
1016        // the deferred-action heap indefinitely. The defer
1017        // budget caps total attempts; past the cap the
1018        // executor drops the action with a failure record.
1019        struct AlwaysRetry {
1020            attempts: parking_lot::Mutex<u32>,
1021        }
1022        impl ActionDispatcher for AlwaysRetry {
1023            fn dispatch<'a>(
1024                &'a self,
1025                _action: MeshOsAction,
1026            ) -> BoxFuture<'a, Result<(), DispatchError>> {
1027                Box::pin(async move {
1028                    *self.attempts.lock() += 1;
1029                    Err(DispatchError::retry("transient", Duration::from_millis(5)))
1030                })
1031            }
1032        }
1033
1034        let mut cfg = MeshOsConfig::default();
1035        cfg.backpressure.max_defer_count = 3;
1036        let cfg = Arc::new(cfg);
1037        let (tx, rx) = mpsc::channel(8);
1038        let dispatcher = Arc::new(AlwaysRetry {
1039            attempts: parking_lot::Mutex::new(0),
1040        });
1041        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1042        let task = tokio::spawn(exec.run());
1043
1044        tx.send(pending(
1045            1,
1046            MeshOsAction::CommitMaintenanceTransition {
1047                node: 1,
1048                target: MaintenanceTransition::Active,
1049            },
1050        ))
1051        .await
1052        .unwrap();
1053        // Give the executor enough wall time for max_defer_count
1054        // attempts + a few ms each.
1055        tokio::time::sleep(Duration::from_millis(200)).await;
1056        drop(tx);
1057        let stats = task.await.expect("join");
1058        let attempts = *dispatcher.attempts.lock();
1059        assert_eq!(
1060            stats.failed.load(Ordering::Relaxed),
1061            1,
1062            "action must drop with a failure after exceeding max_defer_count",
1063        );
1064        assert!(
1065            (3..=5).contains(&attempts),
1066            "expected ~max_defer_count dispatch attempts, got {attempts}",
1067        );
1068    }
1069
1070    #[tokio::test]
1071    async fn dispatcher_panic_does_not_kill_executor() {
1072        // Regression for I6: a panicking dispatcher future used
1073        // to unwind the executor task. The catch_unwind wrapper
1074        // converts the panic into a `DispatchError::drop`, so
1075        // the executor continues servicing subsequent actions.
1076        struct PanicOnce {
1077            armed: parking_lot::Mutex<bool>,
1078            log: Mutex<Vec<MeshOsAction>>,
1079        }
1080        impl ActionDispatcher for PanicOnce {
1081            fn dispatch<'a>(
1082                &'a self,
1083                action: MeshOsAction,
1084            ) -> BoxFuture<'a, Result<(), DispatchError>> {
1085                Box::pin(async move {
1086                    let armed = {
1087                        let mut g = self.armed.lock();
1088                        let was = *g;
1089                        *g = false;
1090                        was
1091                    };
1092                    if armed {
1093                        panic!("boom");
1094                    }
1095                    self.log.lock().push(action);
1096                    Ok(())
1097                })
1098            }
1099        }
1100
1101        let (tx, rx) = mpsc::channel(8);
1102        let cfg = fast_cfg();
1103        let dispatcher = Arc::new(PanicOnce {
1104            armed: parking_lot::Mutex::new(true),
1105            log: Mutex::new(Vec::new()),
1106        });
1107        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1108        let task = tokio::spawn(exec.run());
1109
1110        tx.send(pending(
1111            1,
1112            MeshOsAction::CommitMaintenanceTransition {
1113                node: 1,
1114                target: MaintenanceTransition::Maintenance,
1115            },
1116        ))
1117        .await
1118        .unwrap();
1119        tx.send(pending(
1120            2,
1121            MeshOsAction::CommitMaintenanceTransition {
1122                node: 1,
1123                target: MaintenanceTransition::Active,
1124            },
1125        ))
1126        .await
1127        .unwrap();
1128        tokio::time::sleep(Duration::from_millis(50)).await;
1129        drop(tx);
1130
1131        let stats = task
1132            .await
1133            .expect("executor task should NOT have panicked despite dispatcher panic");
1134        assert_eq!(
1135            stats.dispatched.load(Ordering::Relaxed),
1136            1,
1137            "second action should have dispatched after the first panicked",
1138        );
1139        assert_eq!(stats.failed.load(Ordering::Relaxed), 1);
1140        assert_eq!(dispatcher.log.lock().len(), 1);
1141    }
1142
1143    #[tokio::test]
1144    async fn cluster_backpressure_edges_surface_through_dispatcher_hook() {
1145        // Set high-water = 3, low-water = 1 so the channel-only
1146        // depth crosses the threshold quickly. The executor pushes
1147        // four actions into a buffered channel before draining;
1148        // depth at first admit reaches 4 (rx.len() == 3 + 1 in
1149        // flight) crossing the high mark, then drops as actions
1150        // drain.
1151        let mut cfg = MeshOsConfig::default();
1152        cfg.backpressure.cluster_backpressure_threshold = 3;
1153        cfg.backpressure.cluster_backpressure_release = 1;
1154        let cfg = Arc::new(cfg);
1155        let (tx, rx) = mpsc::channel(8);
1156        let dispatcher = Arc::new(LoggingDispatcher::new());
1157        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1158        // Buffer four actions before letting the executor start
1159        // (cargo holds the spawn until we `.await`).
1160        for i in 1..=4u64 {
1161            tx.send(pending(
1162                i,
1163                MeshOsAction::CommitMaintenanceTransition {
1164                    node: 1,
1165                    target: MaintenanceTransition::Active,
1166                },
1167            ))
1168            .await
1169            .unwrap();
1170        }
1171        let task = tokio::spawn(exec.run());
1172        // Let everything drain.
1173        tokio::time::sleep(Duration::from_millis(50)).await;
1174        drop(tx);
1175        let stats = task.await.expect("join");
1176        assert!(
1177            stats.cluster_backpressure_asserts.load(Ordering::Relaxed) >= 1,
1178            "depth crossed the high-water mark at least once",
1179        );
1180        assert!(
1181            stats.cluster_backpressure_releases.load(Ordering::Relaxed) >= 1,
1182            "depth dropped below the low-water mark at least once",
1183        );
1184        let log = dispatcher.backpressure_log();
1185        assert!(matches!(
1186            log.first(),
1187            Some(ClusterBackpressureChange::Asserted)
1188        ));
1189        assert!(matches!(
1190            log.last(),
1191            Some(ClusterBackpressureChange::Released)
1192        ));
1193    }
1194
1195    #[tokio::test]
1196    async fn dispatch_failure_with_retry_releases_pull_cooldown() {
1197        // Regression: a PullReplica admit sets the global pull
1198        // cooldown; if dispatch fails the cooldown must be
1199        // rolled back so unrelated pulls aren't gated by a side
1200        // effect that never happened.
1201        let (tx, rx) = mpsc::channel(8);
1202        let cfg = fast_cfg();
1203        let dispatcher = Arc::new(LoggingDispatcher::new());
1204        // First dispatch fails with a long retry hint; the
1205        // second admit (on a different chain) must succeed
1206        // without waiting on the rolled-back cooldown.
1207        dispatcher.fail_next(DispatchError::retry("transient", Duration::from_secs(60)));
1208        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1209        let task = tokio::spawn(exec.run());
1210
1211        tx.send(pending(
1212            1,
1213            MeshOsAction::PullReplica {
1214                chain: 1,
1215                source: 5,
1216            },
1217        ))
1218        .await
1219        .unwrap();
1220        // Brief settle: first action processed (admit + fail +
1221        // release + heap push) before the second arrives.
1222        tokio::time::sleep(Duration::from_millis(50)).await;
1223        tx.send(pending(
1224            2,
1225            MeshOsAction::PullReplica {
1226                chain: 2,
1227                source: 5,
1228            },
1229        ))
1230        .await
1231        .unwrap();
1232        tokio::time::sleep(Duration::from_millis(50)).await;
1233        drop(tx);
1234
1235        let stats = task.await.expect("join");
1236        assert_eq!(
1237            stats.dispatched.load(Ordering::Relaxed),
1238            1,
1239            "second pull should dispatch immediately after the first \
1240             released its leaked cooldown",
1241        );
1242        assert_eq!(stats.dispatch_retries.load(Ordering::Relaxed), 1);
1243    }
1244
1245    #[tokio::test]
1246    async fn handle_exposes_atomic_stats_to_outside_observers() {
1247        let (tx, rx) = mpsc::channel(8);
1248        let cfg = fast_cfg();
1249        let dispatcher = Arc::new(LoggingDispatcher::new());
1250        let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1251        let handle = exec.handle();
1252        let task = tokio::spawn(exec.run());
1253
1254        tx.send(pending(
1255            1,
1256            MeshOsAction::CommitMaintenanceTransition {
1257                node: 1,
1258                target: MaintenanceTransition::Active,
1259            },
1260        ))
1261        .await
1262        .unwrap();
1263        tokio::time::sleep(Duration::from_millis(50)).await;
1264
1265        let snap = handle.stats();
1266        assert!(snap.dispatched >= 1);
1267        drop(tx);
1268        let _ = task.await;
1269    }
1270}