1use std::cmp::Reverse;
24use std::collections::{BinaryHeap, VecDeque};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use futures::future::BoxFuture;
30use futures::FutureExt;
31use parking_lot::{Mutex, RwLock};
32use tokio::sync::mpsc;
33use tokio::time::sleep_until;
34
35use super::action::{MeshOsAction, PendingAction};
36use super::backpressure::{AdmissionResult, BackpressureState, ClusterBackpressureChange};
37use super::chain::{
38 append_dispatched, append_failed, append_gated, ActionChainAppender, AppendError,
39 NoOpActionChainAppender,
40};
41use super::config::MeshOsConfig;
42use super::snapshot::{FailureRecord, RECENT_FAILURES_CAPACITY};
43
44pub trait ActionDispatcher: Send + Sync + 'static {
53 fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>>;
57
58 fn on_cluster_backpressure(&self, _change: ClusterBackpressureChange) {}
68}
69
70#[derive(Clone, Debug, Eq, PartialEq)]
75pub struct DispatchError {
76 pub reason: String,
78 pub retry_after: Option<Duration>,
83}
84
85impl DispatchError {
86 pub fn drop(reason: impl Into<String>) -> Self {
88 Self {
89 reason: reason.into(),
90 retry_after: None,
91 }
92 }
93
94 pub fn retry(reason: impl Into<String>, after: Duration) -> Self {
96 Self {
97 reason: reason.into(),
98 retry_after: Some(after),
99 }
100 }
101}
102
103#[derive(Debug, Default)]
108pub struct LoggingDispatcher {
109 log: Mutex<Vec<MeshOsAction>>,
110 fail_next: Mutex<Option<DispatchError>>,
111 backpressure_log: Mutex<Vec<ClusterBackpressureChange>>,
112}
113
114impl LoggingDispatcher {
115 pub fn new() -> Self {
117 Self::default()
118 }
119
120 pub fn log(&self) -> Vec<MeshOsAction> {
122 self.log.lock().clone()
123 }
124
125 pub fn fail_next(&self, err: DispatchError) {
128 *self.fail_next.lock() = Some(err);
129 }
130
131 pub fn backpressure_log(&self) -> Vec<ClusterBackpressureChange> {
134 self.backpressure_log.lock().clone()
135 }
136}
137
138impl ActionDispatcher for LoggingDispatcher {
139 fn dispatch<'a>(&'a self, action: MeshOsAction) -> BoxFuture<'a, Result<(), DispatchError>> {
140 Box::pin(async move {
141 if let Some(err) = self.fail_next.lock().take() {
142 return Err(err);
143 }
144 self.log.lock().push(action);
145 Ok(())
146 })
147 }
148
149 fn on_cluster_backpressure(&self, change: ClusterBackpressureChange) {
150 self.backpressure_log.lock().push(change);
151 }
152}
153
154#[derive(Debug, Default)]
158pub struct ExecutorStats {
159 pub dispatched: AtomicU64,
161 pub failed: AtomicU64,
163 pub deferred: AtomicU64,
168 pub gated: AtomicU64,
170 pub dispatch_retries: AtomicU64,
173 pub cluster_backpressure_asserts: AtomicU64,
176 pub cluster_backpressure_releases: AtomicU64,
179 pub chain_append_failures: AtomicU64,
187}
188
189impl ExecutorStats {
190 fn inc(counter: &AtomicU64) {
191 counter.fetch_add(1, Ordering::Relaxed);
192 }
193}
194
195struct DeferredEntry {
198 retry_at: Instant,
199 action: PendingAction,
200 defer_count: u32,
205}
206
207impl PartialEq for DeferredEntry {
208 fn eq(&self, other: &Self) -> bool {
209 self.retry_at == other.retry_at
210 }
211}
212impl Eq for DeferredEntry {}
213impl PartialOrd for DeferredEntry {
214 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
215 Some(self.cmp(other))
216 }
217}
218impl Ord for DeferredEntry {
219 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
220 Reverse(self.retry_at).cmp(&Reverse(other.retry_at))
222 }
223}
224
225pub struct ActionExecutor<D: ActionDispatcher> {
235 actions_rx: mpsc::Receiver<PendingAction>,
236 config: Arc<MeshOsConfig>,
237 backpressure: BackpressureState,
238 dispatcher: Arc<D>,
239 deferred: BinaryHeap<DeferredEntry>,
240 recent_failures: Arc<RwLock<VecDeque<FailureRecord>>>,
247 failure_seq: Arc<AtomicU64>,
257 failure_appender: Arc<dyn super::failure_chain::FailureChainAppender>,
264 stats: Arc<ExecutorStats>,
265 chain_appender: Arc<dyn ActionChainAppender>,
270}
271
272impl<D: ActionDispatcher> ActionExecutor<D> {
273 pub fn new(
276 actions_rx: mpsc::Receiver<PendingAction>,
277 config: Arc<MeshOsConfig>,
278 dispatcher: Arc<D>,
279 ) -> Self {
280 Self {
281 actions_rx,
282 config,
283 backpressure: BackpressureState::new(),
284 dispatcher,
285 deferred: BinaryHeap::new(),
286 recent_failures: Arc::new(RwLock::new(VecDeque::with_capacity(
287 RECENT_FAILURES_CAPACITY,
288 ))),
289 failure_seq: Arc::new(AtomicU64::new(0)),
290 failure_appender: super::failure_chain::no_op_arc(),
291 stats: Arc::new(ExecutorStats::default()),
292 chain_appender: Arc::new(NoOpActionChainAppender),
293 }
294 }
295
296 pub fn with_failure_appender(
302 mut self,
303 appender: Arc<dyn super::failure_chain::FailureChainAppender>,
304 ) -> Self {
305 self.failure_appender = appender;
306 self
307 }
308
309 pub fn recent_failures_handle(&self) -> Arc<RwLock<VecDeque<FailureRecord>>> {
315 Arc::clone(&self.recent_failures)
316 }
317
318 pub fn failure_seq_handle(&self) -> Arc<AtomicU64> {
324 Arc::clone(&self.failure_seq)
325 }
326
327 pub fn failure_appender_handle(&self) -> Arc<dyn super::failure_chain::FailureChainAppender> {
333 Arc::clone(&self.failure_appender)
334 }
335
336 pub fn with_chain_appender(mut self, appender: Arc<dyn ActionChainAppender>) -> Self {
341 self.chain_appender = appender;
342 self
343 }
344
345 pub fn handle(&self) -> ExecutorHandle {
350 ExecutorHandle {
351 stats: Arc::clone(&self.stats),
352 }
353 }
354
355 pub fn stats_arc(&self) -> Arc<ExecutorStats> {
359 Arc::clone(&self.stats)
360 }
361
362 #[expect(
366 clippy::expect_used,
367 reason = "tokio::select arm is gated on `next_deadline.is_some()` which means the prior `peek()` returned Some; pop() on the same heap must succeed"
368 )]
369 pub async fn run(mut self) -> Arc<ExecutorStats> {
370 let mut idle_tick = tokio::time::interval(Duration::from_millis(100));
378 idle_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
379 loop {
380 let next_deadline = self.deferred.peek().map(|e| e.retry_at);
381 tokio::select! {
382 action = self.actions_rx.recv() => {
383 let Some(action) = action else { break };
384 self.handle_one(action).await;
385 }
386 _ = sleep_until_opt(next_deadline), if next_deadline.is_some() => {
387 let due = self.deferred.pop().expect("deferred heap non-empty");
389 self.handle_one_retry(due.action, due.defer_count).await;
390 }
391 _ = idle_tick.tick() => {
392 self.poll_cluster_backpressure();
393 }
394 }
395 }
396 Arc::clone(&self.stats)
397 }
398
399 fn poll_cluster_backpressure(&mut self) {
408 let depth = self.actions_rx.len() + self.deferred.len();
409 let change = self
410 .backpressure
411 .update_cluster_backpressure(depth, &self.config.backpressure);
412 match change {
413 ClusterBackpressureChange::Asserted => {
414 ExecutorStats::inc(&self.stats.cluster_backpressure_asserts);
415 self.dispatcher.on_cluster_backpressure(change);
416 }
417 ClusterBackpressureChange::Released => {
418 ExecutorStats::inc(&self.stats.cluster_backpressure_releases);
419 self.dispatcher.on_cluster_backpressure(change);
420 }
421 ClusterBackpressureChange::Steady => {}
422 }
423 }
424
425 async fn handle_one(&mut self, action: PendingAction) {
426 self.handle_one_retry(action, 0).await
427 }
428
429 async fn handle_one_retry(&mut self, action: PendingAction, prior_defers: u32) {
430 let now = tokio::time::Instant::now().into_std();
437 self.backpressure.tick(now);
438 let depth = self.actions_rx.len() + self.deferred.len() + 1;
444 let change = self
445 .backpressure
446 .update_cluster_backpressure(depth, &self.config.backpressure);
447 match change {
448 ClusterBackpressureChange::Asserted => {
449 ExecutorStats::inc(&self.stats.cluster_backpressure_asserts);
450 self.dispatcher.on_cluster_backpressure(change);
451 }
452 ClusterBackpressureChange::Released => {
453 ExecutorStats::inc(&self.stats.cluster_backpressure_releases);
454 self.dispatcher.on_cluster_backpressure(change);
455 }
456 ClusterBackpressureChange::Steady => {}
457 }
458 match self
459 .backpressure
460 .admit(action.id, &action.action, now, &self.config.backpressure)
461 {
462 AdmissionResult::Admit => {
463 self.dispatch_now_with_defer_count(action, now, prior_defers)
464 .await
465 }
466 AdmissionResult::Defer { retry_after } => {
467 let next_count = prior_defers.saturating_add(1);
468 if next_count > self.config.backpressure.max_defer_count {
469 ExecutorStats::inc(&self.stats.failed);
470 let reason = format!(
471 "deferred {next_count} times — exceeds max_defer_count {}",
472 self.config.backpressure.max_defer_count,
473 );
474 self.record_failure(format!("action-id:{}", action.id.0), reason.clone());
475 let r = append_failed(&self.chain_appender, &action, reason, None);
476 self.record_chain_append(action.id.0, "failed_defer_budget", r);
477 return;
478 }
479 ExecutorStats::inc(&self.stats.deferred);
480 self.deferred.push(DeferredEntry {
481 retry_at: now.checked_add(retry_after).unwrap_or(now),
482 action,
483 defer_count: next_count,
484 });
485 }
486 AdmissionResult::Gate {
487 cooldown_until,
488 reason,
489 } => {
490 ExecutorStats::inc(&self.stats.gated);
491 let age = cooldown_until.saturating_duration_since(now);
492 let cooldown_ms = age.as_millis() as u64;
493 self.record_failure(
494 format!("action-id:{}", action.id.0),
495 format!("gated ({reason}) for {cooldown_ms} ms"),
496 );
497 let r = append_gated(
498 &self.chain_appender,
499 &action,
500 reason.to_string(),
501 Some(cooldown_ms),
502 );
503 self.record_chain_append(action.id.0, "gated", r);
504 }
505 }
506 }
507
508 async fn dispatch_now_with_defer_count(
509 &mut self,
510 action: PendingAction,
511 admit_anchor: Instant,
512 prior_defers: u32,
513 ) {
514 let dispatch_future = self.dispatcher.dispatch(action.action.clone());
518 let result = match std::panic::AssertUnwindSafe(dispatch_future)
519 .catch_unwind()
520 .await
521 {
522 Ok(result) => result,
523 Err(_) => {
524 tracing::error!(
525 target: "meshos",
526 action_id = action.id.0,
527 "dispatcher panicked — recording as drop",
528 );
529 Err(DispatchError::drop("dispatcher panicked"))
530 }
531 };
532 match result {
533 Ok(()) => {
534 ExecutorStats::inc(&self.stats.dispatched);
535 let r = append_dispatched(&self.chain_appender, &action);
536 self.record_chain_append(action.id.0, "dispatched", r);
537 }
538 Err(err) => {
539 self.backpressure
544 .release_failed_admit(action.id, &action.action);
545 self.poll_cluster_backpressure();
551 let _ = admit_anchor;
552 if let Some(after) = err.retry_after {
553 let next_count = prior_defers.saturating_add(1);
558 if next_count > self.config.backpressure.max_defer_count {
559 ExecutorStats::inc(&self.stats.failed);
560 let reason =
561 format!("dispatch retry budget exhausted after {next_count} attempts",);
562 self.record_failure(format!("action-id:{}", action.id.0), reason.clone());
563 let r = append_failed(&self.chain_appender, &action, reason, None);
564 self.record_chain_append(action.id.0, "failed_retry_budget", r);
565 return;
566 }
567 ExecutorStats::inc(&self.stats.dispatch_retries);
568 let retry_ms = after.as_millis() as u64;
569 let r = append_failed(
570 &self.chain_appender,
571 &action,
572 err.reason.clone(),
573 Some(retry_ms),
574 );
575 self.record_chain_append(action.id.0, "failed_retry", r);
576 let now = tokio::time::Instant::now().into_std();
578 self.deferred.push(DeferredEntry {
579 retry_at: now.checked_add(after).unwrap_or(now),
580 action,
581 defer_count: next_count,
582 });
583 } else {
584 ExecutorStats::inc(&self.stats.failed);
585 let reason = err.reason.clone();
586 self.record_failure(format!("action-id:{}", action.id.0), err.reason);
587 let r = append_failed(&self.chain_appender, &action, reason, None);
588 self.record_chain_append(action.id.0, "failed", r);
589 }
590 }
591 }
592 }
593
594 fn record_chain_append(
600 &self,
601 action_id: u64,
602 kind: &'static str,
603 result: Result<(), AppendError>,
604 ) {
605 if let Err(e) = result {
606 self.stats
607 .chain_append_failures
608 .fetch_add(1, Ordering::Relaxed);
609 tracing::warn!(
610 target: "meshos",
611 action_id,
612 kind,
613 error = %e,
614 "executor chain append failed; in-memory state stayed consistent \
615 but the action's chain record is missing",
616 );
617 }
618 }
619
620 fn record_failure(&mut self, source: String, reason: String) {
621 let recorded_at_ms = std::time::SystemTime::now()
622 .duration_since(std::time::UNIX_EPOCH)
623 .map(|d| d.as_millis() as u64)
624 .unwrap_or(0);
625 let seq = self.failure_seq.fetch_add(1, Ordering::SeqCst) + 1;
626 let record = FailureRecord {
627 seq,
628 source,
629 reason,
630 recorded_at_ms,
631 };
632 if let Err(err) = self.failure_appender.append(&record) {
636 tracing::warn!(
637 target: "meshos",
638 seq = record.seq,
639 error = %err,
640 "failure-chain append failed — record kept on in-memory ring only",
641 );
642 }
643 let mut ring = self.recent_failures.write();
644 if ring.len() >= RECENT_FAILURES_CAPACITY {
645 ring.pop_front();
646 }
647 ring.push_back(record);
648 }
649}
650
651#[derive(Clone)]
653pub struct ExecutorHandle {
654 stats: Arc<ExecutorStats>,
655}
656
657impl ExecutorHandle {
658 pub fn stats(&self) -> ExecutorStatsSnapshot {
661 ExecutorStatsSnapshot {
662 dispatched: self.stats.dispatched.load(Ordering::Relaxed),
663 failed: self.stats.failed.load(Ordering::Relaxed),
664 deferred: self.stats.deferred.load(Ordering::Relaxed),
665 gated: self.stats.gated.load(Ordering::Relaxed),
666 dispatch_retries: self.stats.dispatch_retries.load(Ordering::Relaxed),
667 cluster_backpressure_asserts: self
668 .stats
669 .cluster_backpressure_asserts
670 .load(Ordering::Relaxed),
671 cluster_backpressure_releases: self
672 .stats
673 .cluster_backpressure_releases
674 .load(Ordering::Relaxed),
675 chain_append_failures: self.stats.chain_append_failures.load(Ordering::Relaxed),
676 }
677 }
678}
679
680#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
683pub struct ExecutorStatsSnapshot {
684 pub dispatched: u64,
686 pub failed: u64,
688 pub deferred: u64,
690 pub gated: u64,
692 pub dispatch_retries: u64,
694 pub cluster_backpressure_asserts: u64,
696 pub cluster_backpressure_releases: u64,
698 pub chain_append_failures: u64,
705}
706
707async fn sleep_until_opt(deadline: Option<Instant>) {
708 if let Some(deadline) = deadline {
709 sleep_until(tokio::time::Instant::from_std(deadline)).await;
710 } else {
711 std::future::pending::<()>().await;
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use std::time::Duration;
722
723 use tokio::sync::mpsc;
724
725 use super::super::action::{ActionId, MaintenanceTransition};
726 use super::super::config::MeshOsConfig;
727 use super::super::event::{ChainId, DaemonRef};
728 use super::*;
729
730 fn pending(id: u64, action: MeshOsAction) -> PendingAction {
731 PendingAction {
732 id: ActionId(id),
733 action,
734 emitted_at: Instant::now(),
735 }
736 }
737
738 fn dref(name: &str, id: u64) -> DaemonRef {
739 DaemonRef {
740 id,
741 name: name.into(),
742 }
743 }
744
745 fn fast_cfg() -> Arc<MeshOsConfig> {
746 Arc::new(MeshOsConfig::default())
747 }
748
749 struct FailingChainAppender;
753
754 impl super::super::chain::ActionChainAppender for FailingChainAppender {
755 fn append(
756 &self,
757 _record: super::super::chain::ActionChainRecord,
758 ) -> Result<(), AppendError> {
759 Err(AppendError {
760 reason: "test-injected appender failure".into(),
761 })
762 }
763 }
764
765 #[tokio::test]
766 async fn chain_append_failure_bumps_counter_but_dispatch_still_succeeds() {
767 let (tx, rx) = mpsc::channel(8);
768 let cfg = fast_cfg();
769 let dispatcher = Arc::new(LoggingDispatcher::new());
770 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher))
771 .with_chain_appender(Arc::new(FailingChainAppender));
772 let task = tokio::spawn(exec.run());
773
774 tx.send(pending(
775 1,
776 MeshOsAction::CommitMaintenanceTransition {
777 node: 1,
778 target: MaintenanceTransition::Maintenance,
779 },
780 ))
781 .await
782 .unwrap();
783 drop(tx);
784
785 let stats = task.await.expect("join");
786 assert_eq!(stats.dispatched.load(Ordering::Relaxed), 1);
788 assert_eq!(dispatcher.log().len(), 1);
789 assert_eq!(stats.chain_append_failures.load(Ordering::Relaxed), 1);
791 }
792
793 #[tokio::test]
794 async fn admitted_actions_reach_the_dispatcher() {
795 let (tx, rx) = mpsc::channel(8);
796 let cfg = fast_cfg();
797 let dispatcher = Arc::new(LoggingDispatcher::new());
798 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
799 let task = tokio::spawn(exec.run());
800
801 tx.send(pending(
802 1,
803 MeshOsAction::CommitMaintenanceTransition {
804 node: 1,
805 target: MaintenanceTransition::Maintenance,
806 },
807 ))
808 .await
809 .unwrap();
810 tx.send(pending(
811 2,
812 MeshOsAction::CommitMaintenanceTransition {
813 node: 1,
814 target: MaintenanceTransition::Active,
815 },
816 ))
817 .await
818 .unwrap();
819 drop(tx);
820
821 let stats = task.await.expect("join");
822 assert_eq!(stats.dispatched.load(Ordering::Relaxed), 2);
823 assert_eq!(dispatcher.log().len(), 2);
824 }
825
826 #[tokio::test]
827 async fn gated_actions_do_not_reach_the_dispatcher() {
828 let (tx, rx) = mpsc::channel(8);
829 let cfg = fast_cfg();
830 let dispatcher = Arc::new(LoggingDispatcher::new());
831 let mut exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
832 let d = dref("telemetry", 1);
834 exec.backpressure
835 .record_daemon_gate(d.clone(), Instant::now() + Duration::from_secs(60));
836 let task = tokio::spawn(exec.run());
837
838 tx.send(pending(1, MeshOsAction::StartDaemon { daemon: d }))
839 .await
840 .unwrap();
841 drop(tx);
842
843 let stats = task.await.expect("join");
844 assert_eq!(stats.dispatched.load(Ordering::Relaxed), 0);
845 assert_eq!(stats.gated.load(Ordering::Relaxed), 1);
846 assert_eq!(dispatcher.log().len(), 0);
847 }
848
849 #[tokio::test]
850 async fn deferred_actions_eventually_reach_the_dispatcher() {
851 let (tx, rx) = mpsc::channel(8);
858 let cfg = fast_cfg();
859 let dispatcher = Arc::new(LoggingDispatcher::new());
860 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
861 let task = tokio::spawn(exec.run());
862
863 let chain_a: ChainId = 1;
864 let chain_b: ChainId = 2;
865 tx.send(pending(
866 1,
867 MeshOsAction::PullReplica {
868 chain: chain_a,
869 source: 5,
870 },
871 ))
872 .await
873 .unwrap();
874 tx.send(pending(
875 2,
876 MeshOsAction::PullReplica {
877 chain: chain_b,
878 source: 5,
879 },
880 ))
881 .await
882 .unwrap();
883
884 tokio::time::sleep(Duration::from_millis(500)).await;
888 drop(tx);
889
890 let stats = task.await.expect("join");
891 assert_eq!(
892 stats.dispatched.load(Ordering::Relaxed),
893 2,
894 "both pulls should eventually reach the dispatcher",
895 );
896 assert!(
897 stats.deferred.load(Ordering::Relaxed) >= 1,
898 "second pull should have been deferred at least once",
899 );
900 assert_eq!(dispatcher.log().len(), 2);
901 }
902
903 #[tokio::test]
904 async fn dispatch_errors_without_retry_record_failures() {
905 let (tx, rx) = mpsc::channel(8);
906 let cfg = fast_cfg();
907 let dispatcher = Arc::new(LoggingDispatcher::new());
908 dispatcher.fail_next(DispatchError::drop("boom"));
909 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
910 let task = tokio::spawn(exec.run());
911
912 tx.send(pending(
913 1,
914 MeshOsAction::CommitMaintenanceTransition {
915 node: 1,
916 target: MaintenanceTransition::Active,
917 },
918 ))
919 .await
920 .unwrap();
921 drop(tx);
922
923 let stats = task.await.expect("join");
924 assert_eq!(stats.dispatched.load(Ordering::Relaxed), 0);
925 assert_eq!(stats.failed.load(Ordering::Relaxed), 1);
926 }
927
928 #[tokio::test]
929 async fn failure_chain_appender_receives_every_recorded_failure() {
930 use super::super::failure_chain::BufferingFailureChainAppender;
931 let (tx, rx) = mpsc::channel(8);
932 let cfg = fast_cfg();
933 let dispatcher = Arc::new(LoggingDispatcher::new());
934 dispatcher.fail_next(DispatchError::drop("first boom"));
935 let appender = Arc::new(BufferingFailureChainAppender::default());
936 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher)).with_failure_appender(
937 appender.clone() as Arc<dyn super::super::failure_chain::FailureChainAppender>,
938 );
939 let ring_handle = exec.recent_failures_handle();
942 let task = tokio::spawn(exec.run());
943
944 tx.send(pending(
945 1,
946 MeshOsAction::CommitMaintenanceTransition {
947 node: 1,
948 target: MaintenanceTransition::Active,
949 },
950 ))
951 .await
952 .unwrap();
953 drop(tx);
954 let _ = task.await.expect("join");
955
956 let captured = appender.captured();
957 assert_eq!(captured.len(), 1, "appender should see one record");
958 assert!(captured[0].reason.contains("first boom"));
959 assert!(captured[0].seq > 0);
960
961 let ring: Vec<FailureRecord> = ring_handle.read().iter().cloned().collect();
963 assert_eq!(ring.len(), 1);
964 assert_eq!(ring[0].seq, captured[0].seq);
965 assert_eq!(ring[0].reason, captured[0].reason);
966 }
967
968 #[tokio::test]
969 async fn dispatch_errors_with_retry_re_enqueue() {
970 let (tx, rx) = mpsc::channel(8);
971 let cfg = fast_cfg();
972 let dispatcher = Arc::new(LoggingDispatcher::new());
973 dispatcher.fail_next(DispatchError::retry("transient", Duration::from_millis(50)));
976 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
977 let task = tokio::spawn(exec.run());
978
979 tx.send(pending(
980 1,
981 MeshOsAction::CommitMaintenanceTransition {
982 node: 1,
983 target: MaintenanceTransition::Active,
984 },
985 ))
986 .await
987 .unwrap();
988 tokio::time::sleep(Duration::from_millis(200)).await;
990 drop(tx);
991
992 let stats = task.await.expect("join");
993 assert_eq!(stats.dispatched.load(Ordering::Relaxed), 1);
994 assert_eq!(stats.dispatch_retries.load(Ordering::Relaxed), 1);
995 }
996
997 #[tokio::test]
998 async fn executor_exits_when_sender_drops() {
999 let (tx, rx) = mpsc::channel(8);
1000 let cfg = fast_cfg();
1001 let dispatcher = Arc::new(LoggingDispatcher::new());
1002 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1003 let task = tokio::spawn(exec.run());
1004 drop(tx);
1005 let stats = tokio::time::timeout(Duration::from_secs(2), task)
1006 .await
1007 .expect("executor did not exit after sender dropped")
1008 .expect("join");
1009 assert_eq!(stats.dispatched.load(Ordering::Relaxed), 0);
1010 }
1011
1012 #[tokio::test]
1013 async fn dispatch_retry_drops_after_exceeding_max_defer_count() {
1014 struct AlwaysRetry {
1020 attempts: parking_lot::Mutex<u32>,
1021 }
1022 impl ActionDispatcher for AlwaysRetry {
1023 fn dispatch<'a>(
1024 &'a self,
1025 _action: MeshOsAction,
1026 ) -> BoxFuture<'a, Result<(), DispatchError>> {
1027 Box::pin(async move {
1028 *self.attempts.lock() += 1;
1029 Err(DispatchError::retry("transient", Duration::from_millis(5)))
1030 })
1031 }
1032 }
1033
1034 let mut cfg = MeshOsConfig::default();
1035 cfg.backpressure.max_defer_count = 3;
1036 let cfg = Arc::new(cfg);
1037 let (tx, rx) = mpsc::channel(8);
1038 let dispatcher = Arc::new(AlwaysRetry {
1039 attempts: parking_lot::Mutex::new(0),
1040 });
1041 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1042 let task = tokio::spawn(exec.run());
1043
1044 tx.send(pending(
1045 1,
1046 MeshOsAction::CommitMaintenanceTransition {
1047 node: 1,
1048 target: MaintenanceTransition::Active,
1049 },
1050 ))
1051 .await
1052 .unwrap();
1053 tokio::time::sleep(Duration::from_millis(200)).await;
1056 drop(tx);
1057 let stats = task.await.expect("join");
1058 let attempts = *dispatcher.attempts.lock();
1059 assert_eq!(
1060 stats.failed.load(Ordering::Relaxed),
1061 1,
1062 "action must drop with a failure after exceeding max_defer_count",
1063 );
1064 assert!(
1065 (3..=5).contains(&attempts),
1066 "expected ~max_defer_count dispatch attempts, got {attempts}",
1067 );
1068 }
1069
1070 #[tokio::test]
1071 async fn dispatcher_panic_does_not_kill_executor() {
1072 struct PanicOnce {
1077 armed: parking_lot::Mutex<bool>,
1078 log: Mutex<Vec<MeshOsAction>>,
1079 }
1080 impl ActionDispatcher for PanicOnce {
1081 fn dispatch<'a>(
1082 &'a self,
1083 action: MeshOsAction,
1084 ) -> BoxFuture<'a, Result<(), DispatchError>> {
1085 Box::pin(async move {
1086 let armed = {
1087 let mut g = self.armed.lock();
1088 let was = *g;
1089 *g = false;
1090 was
1091 };
1092 if armed {
1093 panic!("boom");
1094 }
1095 self.log.lock().push(action);
1096 Ok(())
1097 })
1098 }
1099 }
1100
1101 let (tx, rx) = mpsc::channel(8);
1102 let cfg = fast_cfg();
1103 let dispatcher = Arc::new(PanicOnce {
1104 armed: parking_lot::Mutex::new(true),
1105 log: Mutex::new(Vec::new()),
1106 });
1107 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1108 let task = tokio::spawn(exec.run());
1109
1110 tx.send(pending(
1111 1,
1112 MeshOsAction::CommitMaintenanceTransition {
1113 node: 1,
1114 target: MaintenanceTransition::Maintenance,
1115 },
1116 ))
1117 .await
1118 .unwrap();
1119 tx.send(pending(
1120 2,
1121 MeshOsAction::CommitMaintenanceTransition {
1122 node: 1,
1123 target: MaintenanceTransition::Active,
1124 },
1125 ))
1126 .await
1127 .unwrap();
1128 tokio::time::sleep(Duration::from_millis(50)).await;
1129 drop(tx);
1130
1131 let stats = task
1132 .await
1133 .expect("executor task should NOT have panicked despite dispatcher panic");
1134 assert_eq!(
1135 stats.dispatched.load(Ordering::Relaxed),
1136 1,
1137 "second action should have dispatched after the first panicked",
1138 );
1139 assert_eq!(stats.failed.load(Ordering::Relaxed), 1);
1140 assert_eq!(dispatcher.log.lock().len(), 1);
1141 }
1142
1143 #[tokio::test]
1144 async fn cluster_backpressure_edges_surface_through_dispatcher_hook() {
1145 let mut cfg = MeshOsConfig::default();
1152 cfg.backpressure.cluster_backpressure_threshold = 3;
1153 cfg.backpressure.cluster_backpressure_release = 1;
1154 let cfg = Arc::new(cfg);
1155 let (tx, rx) = mpsc::channel(8);
1156 let dispatcher = Arc::new(LoggingDispatcher::new());
1157 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1158 for i in 1..=4u64 {
1161 tx.send(pending(
1162 i,
1163 MeshOsAction::CommitMaintenanceTransition {
1164 node: 1,
1165 target: MaintenanceTransition::Active,
1166 },
1167 ))
1168 .await
1169 .unwrap();
1170 }
1171 let task = tokio::spawn(exec.run());
1172 tokio::time::sleep(Duration::from_millis(50)).await;
1174 drop(tx);
1175 let stats = task.await.expect("join");
1176 assert!(
1177 stats.cluster_backpressure_asserts.load(Ordering::Relaxed) >= 1,
1178 "depth crossed the high-water mark at least once",
1179 );
1180 assert!(
1181 stats.cluster_backpressure_releases.load(Ordering::Relaxed) >= 1,
1182 "depth dropped below the low-water mark at least once",
1183 );
1184 let log = dispatcher.backpressure_log();
1185 assert!(matches!(
1186 log.first(),
1187 Some(ClusterBackpressureChange::Asserted)
1188 ));
1189 assert!(matches!(
1190 log.last(),
1191 Some(ClusterBackpressureChange::Released)
1192 ));
1193 }
1194
1195 #[tokio::test]
1196 async fn dispatch_failure_with_retry_releases_pull_cooldown() {
1197 let (tx, rx) = mpsc::channel(8);
1202 let cfg = fast_cfg();
1203 let dispatcher = Arc::new(LoggingDispatcher::new());
1204 dispatcher.fail_next(DispatchError::retry("transient", Duration::from_secs(60)));
1208 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1209 let task = tokio::spawn(exec.run());
1210
1211 tx.send(pending(
1212 1,
1213 MeshOsAction::PullReplica {
1214 chain: 1,
1215 source: 5,
1216 },
1217 ))
1218 .await
1219 .unwrap();
1220 tokio::time::sleep(Duration::from_millis(50)).await;
1223 tx.send(pending(
1224 2,
1225 MeshOsAction::PullReplica {
1226 chain: 2,
1227 source: 5,
1228 },
1229 ))
1230 .await
1231 .unwrap();
1232 tokio::time::sleep(Duration::from_millis(50)).await;
1233 drop(tx);
1234
1235 let stats = task.await.expect("join");
1236 assert_eq!(
1237 stats.dispatched.load(Ordering::Relaxed),
1238 1,
1239 "second pull should dispatch immediately after the first \
1240 released its leaked cooldown",
1241 );
1242 assert_eq!(stats.dispatch_retries.load(Ordering::Relaxed), 1);
1243 }
1244
1245 #[tokio::test]
1246 async fn handle_exposes_atomic_stats_to_outside_observers() {
1247 let (tx, rx) = mpsc::channel(8);
1248 let cfg = fast_cfg();
1249 let dispatcher = Arc::new(LoggingDispatcher::new());
1250 let exec = ActionExecutor::new(rx, cfg, Arc::clone(&dispatcher));
1251 let handle = exec.handle();
1252 let task = tokio::spawn(exec.run());
1253
1254 tx.send(pending(
1255 1,
1256 MeshOsAction::CommitMaintenanceTransition {
1257 node: 1,
1258 target: MaintenanceTransition::Active,
1259 },
1260 ))
1261 .await
1262 .unwrap();
1263 tokio::time::sleep(Duration::from_millis(50)).await;
1264
1265 let snap = handle.stats();
1266 assert!(snap.dispatched >= 1);
1267 drop(tx);
1268 let _ = task.await;
1269 }
1270}