1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
//! Cancellation Debt Runtime Integration
//!
//! Integrates the cancellation debt monitor with the asupersync runtime to provide
//! real-time monitoring of cancellation work accumulation and processing rates.
use crate::observability::cancellation_debt_monitor::{
CancellationDebtConfig, CancellationDebtMonitor, DebtAlert, DebtAlertLevel, DebtSnapshot,
PendingWork, WorkType,
};
use crate::types::{CancelKind, CancelReason, RegionId, TaskId};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
/// br-asupersync-b4ocgc: pluggable sleep abstraction for the background
/// monitoring loop.
///
/// Pre-fix the monitoring loop called `std::thread::sleep(check_interval)`
/// directly, which binds the loop's pacing to wall-clock time even when
/// the runtime is supposed to be running under virtual time (LabRuntime
/// or any deterministic test harness). The function-pointer-of-Duration
/// indirection lets:
///
/// - Production: keep using `std::thread::sleep` via the default
/// constructor (zero behaviour change).
/// - LabRuntime tests: inject a virtual-time sleeper that advances
/// the lab clock instead of blocking the OS thread, so monitoring
/// iterations are deterministic and bit-exact replayable.
///
/// The trait-object form (`Arc<dyn Fn(Duration) + Send + Sync>`) keeps
/// the field Send + Sync without making `DebtRuntimeIntegration` itself
/// generic over a sleeper type — callers don't pay a type-parameter
/// cost for the legacy code path.
pub type DebtMonitorSleeper = Arc<dyn Fn(Duration) + Send + Sync>;
/// Integration points for debt monitoring in the runtime.
pub struct DebtRuntimeIntegration {
monitor: Arc<CancellationDebtMonitor>,
/// Background monitoring thread handle.
monitoring_thread: Option<thread::JoinHandle<()>>,
/// Shutdown signal for background thread.
shutdown: Arc<Mutex<bool>>,
/// Alert callback for integration with logging/alerting systems.
alert_callback: Option<Box<dyn Fn(&DebtAlert) + Send + Sync>>,
/// br-asupersync-b4ocgc: pluggable sleep — defaults to thread::sleep,
/// overridable in tests via [`Self::with_sleeper`].
sleeper: DebtMonitorSleeper,
}
impl DebtRuntimeIntegration {
/// Creates a new debt runtime integration.
#[must_use]
pub fn new(config: CancellationDebtConfig) -> Self {
let monitor = Arc::new(CancellationDebtMonitor::new(config));
Self {
monitor,
monitoring_thread: None,
shutdown: Arc::new(Mutex::new(false)),
alert_callback: None,
// br-asupersync-b4ocgc: default = std::thread::sleep
// (production behaviour unchanged).
sleeper: Arc::new(std::thread::sleep),
}
}
/// Creates integration with default configuration.
#[must_use]
pub fn default() -> Self {
Self::new(CancellationDebtConfig::default())
}
/// br-asupersync-b4ocgc: override the wall-clock-blocking sleep
/// used by the background monitoring loop. Intended for tests that
/// run under [`LabRuntime`](crate::lab::LabRuntime) virtual time —
/// inject a closure that advances the lab clock instead of
/// blocking the OS thread, and the monitoring loop becomes
/// deterministic and replayable.
///
/// Must be called BEFORE [`Self::start_monitoring`] — the sleeper
/// is captured into the spawned thread and changing it later has
/// no effect on the running loop.
#[must_use]
pub fn with_sleeper(mut self, sleeper: DebtMonitorSleeper) -> Self {
self.sleeper = sleeper;
self
}
/// Set a callback to be invoked when debt alerts are generated.
pub fn set_alert_callback<F>(&mut self, callback: F)
where
F: Fn(&DebtAlert) + Send + Sync + 'static,
{
self.alert_callback = Some(Box::new(callback));
}
/// Start background monitoring thread.
pub fn start_monitoring(&mut self, check_interval: Duration) {
if self.monitoring_thread.is_some() {
return; // Already started
}
let monitor = self.monitor.clone();
let shutdown = self.shutdown.clone();
let alert_callback = self.alert_callback.take();
let sleeper = self.sleeper.clone();
let handle = thread::spawn(move || {
Self::monitoring_loop(monitor, shutdown, check_interval, alert_callback, sleeper);
});
self.monitoring_thread = Some(handle);
}
/// Stop background monitoring.
pub fn stop_monitoring(&mut self) {
{
let mut shutdown = self
.shutdown
.lock()
.expect("shutdown mutex should not be poisoned");
*shutdown = true;
}
if let Some(handle) = self.monitoring_thread.take() {
let _ = handle.join(); // Wait for thread to finish
}
}
/// Get reference to the underlying debt monitor.
#[must_use]
pub fn monitor(&self) -> &Arc<CancellationDebtMonitor> {
&self.monitor
}
/// Called when a task begins cancellation cleanup.
#[must_use]
pub fn on_task_cleanup_started(
&self,
task_id: TaskId,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
estimated_cleanup_work: u32,
) -> u64 {
self.monitor.queue_work(
WorkType::TaskCleanup,
format!("task-{task_id:?}"),
self.calculate_priority(cancel_kind),
estimated_cleanup_work,
cancel_reason,
cancel_kind,
Vec::new(),
)
}
/// Called when a region begins closure.
#[must_use]
pub fn on_region_cleanup_started(
&self,
region_id: RegionId,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
child_dependencies: Vec<u64>,
) -> u64 {
self.monitor.queue_work(
WorkType::RegionCleanup,
format!("region-{region_id:?}"),
self.calculate_priority(cancel_kind),
100, // Baseline region cleanup cost
cancel_reason,
cancel_kind,
child_dependencies,
)
}
/// Called when waker cleanup is required.
#[must_use]
pub fn on_waker_cleanup_started(
&self,
waker_id: String,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
) -> u64 {
self.monitor.queue_work(
WorkType::WakerCleanup,
waker_id,
self.calculate_priority(cancel_kind),
10, // Waker cleanup is typically fast
cancel_reason,
cancel_kind,
Vec::new(),
)
}
/// Called when channel cleanup begins.
#[must_use]
pub fn on_channel_cleanup_started(
&self,
channel_id: String,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
buffer_size: usize,
) -> u64 {
let cleanup_cost = (buffer_size / 100).max(10) as u32; // Scale by buffer size
self.monitor.queue_work(
WorkType::ChannelCleanup,
channel_id,
self.calculate_priority(cancel_kind),
cleanup_cost,
cancel_reason,
cancel_kind,
Vec::new(),
)
}
/// Called when obligation settlement is needed.
#[must_use]
pub fn on_obligation_settlement_started(
&self,
obligation_id: String,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
settlement_complexity: u32,
) -> u64 {
self.monitor.queue_work(
WorkType::ObligationSettlement,
obligation_id,
self.calculate_priority(cancel_kind) + 10, // Higher priority for obligations
settlement_complexity,
cancel_reason,
cancel_kind,
Vec::new(),
)
}
/// Called when resource finalization begins.
#[must_use]
pub fn on_resource_finalization_started(
&self,
resource_id: String,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
finalization_cost: u32,
) -> u64 {
self.monitor.queue_work(
WorkType::ResourceFinalization,
resource_id,
self.calculate_priority(cancel_kind),
finalization_cost,
cancel_reason,
cancel_kind,
Vec::new(),
)
}
/// Called when any cleanup work completes.
pub fn on_cleanup_completed(&self, work_id: u64) {
self.monitor.complete_work(work_id);
}
/// Called when multiple cleanup items complete (batch processing).
#[must_use]
pub fn on_batch_cleanup_completed(&self, work_ids: &[u64]) -> usize {
self.monitor.complete_work_batch(work_ids)
}
/// Get current debt status for monitoring dashboards.
#[must_use]
pub fn get_debt_status(&self) -> DebtSnapshot {
self.monitor.get_debt_snapshot()
}
/// Get pending work for a specific entity.
#[must_use]
pub fn get_entity_debt(&self, entity_id: &str) -> Vec<PendingWork> {
self.monitor.get_entity_pending_work(entity_id)
}
/// Get highest priority pending work.
#[must_use]
pub fn get_priority_cleanup_work(&self, limit: usize) -> Vec<PendingWork> {
self.monitor.get_priority_work(limit)
}
/// Check if emergency intervention is needed.
#[must_use]
pub fn check_emergency_intervention(&self) -> bool {
let snapshot = self.get_debt_status();
matches!(
snapshot.alert_level,
DebtAlertLevel::Emergency | DebtAlertLevel::Critical
)
}
/// Execute emergency debt relief.
#[must_use]
pub fn execute_emergency_relief(&self, max_work_age: Duration) -> usize {
self.monitor.emergency_cleanup(max_work_age)
}
/// Generate a debt health report.
#[must_use]
pub fn generate_debt_report(&self) -> DebtHealthReport {
let snapshot = self.get_debt_status();
let recent_alerts = self.monitor.get_recent_alerts(10);
let recommendations = self.generate_recommendations(&snapshot);
let health_score = self.calculate_health_score(&snapshot);
DebtHealthReport {
snapshot,
recent_alerts,
recommendations,
health_score,
}
}
/// Background monitoring loop.
///
/// br-asupersync-b4ocgc: takes a pluggable [`DebtMonitorSleeper`]
/// instead of calling `std::thread::sleep` directly. Production
/// passes `Arc::new(std::thread::sleep)` (zero behaviour change);
/// LabRuntime tests pass a virtual-time sleeper that advances the
/// lab clock and yields to the next scheduled tick. Either way,
/// `replayable_system_time` (already used for `last_alert_check`)
/// gives the loop a deterministic notion of "now" — the only
/// remaining wall-clock dependency was this `thread::sleep`,
/// which the indirection now closes.
fn monitoring_loop(
monitor: Arc<CancellationDebtMonitor>,
shutdown: Arc<Mutex<bool>>,
check_interval: Duration,
alert_callback: Option<Box<dyn Fn(&DebtAlert) + Send + Sync>>,
sleeper: DebtMonitorSleeper,
) {
let mut last_alert_check = crate::observability::replayable_system_time();
loop {
// Check shutdown signal
{
let should_shutdown = *shutdown
.lock()
.expect("shutdown mutex should not be poisoned in monitoring loop");
if should_shutdown {
break;
}
}
// br-asupersync-p9wth4 — wrap the per-tick body in
// catch_unwind so a panic from monitor accessors, the
// user-supplied alert callback, or the alert-cleanup
// path does NOT kill the entire observability thread.
// The asupersync convention is that observability
// surfaces never panic; they degrade silently or escalate
// to a WARN log but keep running. Without this guard, a
// single bad payload in the alert callback (a format-
// string bug, a transient OOM, a panic in user code)
// takes the monitoring loop out for the lifetime of the
// process — exactly the failure mode this fix exists to
// close. AssertUnwindSafe is sound here because no state
// reachable from the closure is left in a half-mutated
// condition on panic: `monitor`/`alert_callback` are
// shared via Arc/Box and either accept arbitrary state
// or are the cause of the panic; `last_alert_check` is
// captured by reference and only updated on the success
// path inside the closure.
let now = crate::observability::replayable_system_time();
let monitor_ref = &monitor;
let alert_callback_ref = &alert_callback;
let last_alert_check_ref = &mut last_alert_check;
let tick_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
// Check for new alerts periodically
if now
.duration_since(*last_alert_check_ref)
.unwrap_or(Duration::ZERO)
>= Duration::from_secs(5)
{
if let Some(callback) = alert_callback_ref.as_ref() {
let recent_alerts = monitor_ref.get_recent_alerts(1);
for alert in recent_alerts {
callback(&alert);
}
}
*last_alert_check_ref = now;
}
// Clean up old alerts
monitor_ref.clear_old_alerts(Duration::from_hours(1));
}));
if let Err(_panic) = tick_result {
#[cfg(feature = "tracing-integration")]
tracing::warn!(
"debt monitoring tick panicked; loop continuing \
(br-asupersync-p9wth4)"
);
// Track that a panic was recovered so observers can
// see when the loop has been hit. Even when tracing
// is disabled, the increment is visible via the
// public accessor `panic_recovered_count()`.
monitor.record_monitoring_loop_panic();
}
// br-asupersync-b4ocgc: pluggable sleep — defaults to
// std::thread::sleep, overridable for virtual-time tests.
sleeper(check_interval);
}
}
/// Calculate priority based on cancel kind.
fn calculate_priority(&self, cancel_kind: CancelKind) -> u32 {
match cancel_kind {
CancelKind::Shutdown => 100,
CancelKind::Timeout => 80,
CancelKind::Deadline => 75,
CancelKind::User => 50,
_ => 10,
}
}
/// Generate health recommendations based on current state.
fn generate_recommendations(&self, snapshot: &DebtSnapshot) -> Vec<String> {
let mut recommendations = Vec::new();
match snapshot.alert_level {
DebtAlertLevel::Emergency => {
recommendations.push("Execute emergency cleanup immediately".to_string());
recommendations.push("Scale up cancellation processing".to_string());
recommendations.push("Investigate system-wide bottlenecks".to_string());
}
DebtAlertLevel::Critical => {
recommendations.push("Increase cancellation worker capacity".to_string());
recommendations.push("Implement work prioritization".to_string());
recommendations.push("Check for deadlocked entities".to_string());
}
DebtAlertLevel::Warning => {
recommendations.push("Monitor processing rates closely".to_string());
recommendations.push("Optimize cancellation handlers".to_string());
if snapshot.processing_rate < 10.0 {
recommendations
.push("Processing rate is very low - investigate bottlenecks".to_string());
}
}
DebtAlertLevel::Watch => {
recommendations.push("Continue monitoring debt trends".to_string());
if snapshot.oldest_work_age > Duration::from_secs(60) {
recommendations
.push("Some work items are aging - check processing pipeline".to_string());
}
}
DebtAlertLevel::Normal => {
recommendations.push("System operating normally".to_string());
}
}
// Entity-specific recommendations
for (entity_id, &depth) in &snapshot.entity_queue_depths {
if depth > 500 {
recommendations.push(format!(
"Entity {entity_id} has high queue depth ({depth}) - investigate"
));
}
}
recommendations
}
/// Calculate overall health score (0-100).
fn calculate_health_score(&self, snapshot: &DebtSnapshot) -> f64 {
let debt_score = (100.0 - snapshot.debt_percentage).max(0.0);
let rate_score = if snapshot.processing_rate > 100.0 {
100.0
} else {
snapshot.processing_rate.min(100.0)
};
let age_score = if snapshot.oldest_work_age < Duration::from_secs(10) {
100.0
} else if snapshot.oldest_work_age < Duration::from_secs(60) {
75.0
} else {
25.0
};
(debt_score + rate_score + age_score) / 3.0
}
}
impl Drop for DebtRuntimeIntegration {
fn drop(&mut self) {
self.stop_monitoring();
}
}
/// Comprehensive debt health report.
#[derive(Debug, Clone)]
pub struct DebtHealthReport {
/// Current debt snapshot.
pub snapshot: DebtSnapshot,
/// Recent alerts.
pub recent_alerts: Vec<DebtAlert>,
/// Health recommendations.
pub recommendations: Vec<String>,
/// Overall health score (0-100, higher is better).
pub health_score: f64,
}
/// Example integration showing how to wire debt monitoring into runtime events.
#[cfg(feature = "test-internals")]
pub mod integration_examples {
/// Example of how TaskRecord cancellation would be instrumented.
///
/// ```rust,ignore
/// impl TaskRecord {
/// pub fn request_cancel_with_budget(
/// &mut self,
/// reason: CancelReason,
/// cleanup_budget: Budget,
/// debt_integration: Option<&DebtRuntimeIntegration>,
/// ) -> bool {
/// // ... existing logic ...
///
/// match &mut self.state {
/// TaskState::Created | TaskState::Running => {
/// // NEW: Track cleanup work debt
/// if let Some(debt) = debt_integration {
/// let work_id = debt.on_task_cleanup_started(
/// self.id,
/// &reason,
/// reason.kind,
/// cleanup_budget.estimate_cleanup_work(),
/// );
/// self.debt_work_id = Some(work_id);
/// }
///
/// // ... continue with cancellation ...
/// }
/// // ... other states ...
/// }
/// }
///
/// pub fn complete(
/// &mut self,
/// outcome: TaskOutcome,
/// debt_integration: Option<&DebtRuntimeIntegration>,
/// ) {
/// // ... existing logic ...
///
/// // NEW: Mark cleanup debt as resolved
/// if let Some(work_id) = self.debt_work_id.take() {
/// if let Some(debt) = debt_integration {
/// debt.on_cleanup_completed(work_id);
/// }
/// }
/// }
/// }
/// ```
pub fn example_task_integration() {
// Documentation only
}
/// Example of how RegionRecord would track cleanup debt.
///
/// ```rust,ignore
/// impl RegionRecord {
/// pub fn begin_close(
/// &mut self,
/// reason: Option<CancelReason>,
/// debt_integration: Option<&DebtRuntimeIntegration>,
/// ) {
/// // ... existing logic ...
///
/// if let Some(reason) = &reason {
/// // NEW: Track region cleanup debt
/// if let Some(debt) = debt_integration {
/// let child_work_ids = self.children.iter()
/// .filter_map(|&child_id| self.get_child_debt_work_id(child_id))
/// .collect();
///
/// let work_id = debt.on_region_cleanup_started(
/// self.id,
/// reason,
/// reason.kind,
/// child_work_ids,
/// );
/// self.debt_work_id = Some(work_id);
/// }
/// }
/// }
/// }
/// ```
pub fn example_region_integration() {
// Documentation only
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::{
CancellationDebtConfig, CancellationDebtMonitor, DebtAlertLevel, DebtMonitorSleeper,
DebtRuntimeIntegration, DebtSnapshot, WorkType,
};
use crate::types::{CancelKind, CancelReason, TaskId};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[test]
fn test_integration_creation() {
let integration = DebtRuntimeIntegration::default();
let snapshot = integration.get_debt_status();
assert_eq!(snapshot.total_pending, 0);
assert_eq!(snapshot.debt_percentage, 0.0);
}
#[test]
fn test_task_cleanup_tracking() {
let integration = DebtRuntimeIntegration::default();
let task_id = TaskId::new_for_test(42, 0);
let cancel_reason = CancelReason::user("test");
let work_id =
integration.on_task_cleanup_started(task_id, &cancel_reason, CancelKind::User, 100);
let snapshot = integration.get_debt_status();
assert_eq!(snapshot.total_pending, 1);
assert!(
snapshot
.pending_by_type
.contains_key(&WorkType::TaskCleanup)
);
integration.on_cleanup_completed(work_id);
let snapshot = integration.get_debt_status();
assert_eq!(snapshot.total_pending, 0);
}
#[test]
fn test_priority_calculation() {
let integration = DebtRuntimeIntegration::default();
// Shutdown cancellation should get highest priority
let emergency_priority = integration.calculate_priority(CancelKind::Shutdown);
let user_priority = integration.calculate_priority(CancelKind::User);
assert!(emergency_priority > user_priority);
}
#[test]
fn test_health_score_calculation() {
let integration = DebtRuntimeIntegration::default();
let good_snapshot = DebtSnapshot {
snapshot_time: crate::observability::replayable_system_time(),
total_pending: 0,
pending_by_type: HashMap::new(),
debt_percentage: 5.0,
processing_rate: 200.0,
entity_queue_depths: HashMap::new(),
oldest_work_age: Duration::from_secs(1),
memory_usage_mb: 1.0,
alert_level: DebtAlertLevel::Normal,
};
let health_score = integration.calculate_health_score(&good_snapshot);
assert!(health_score > 90.0);
}
#[test]
fn test_batch_completion() {
let integration = DebtRuntimeIntegration::default();
let work_ids: Vec<u64> = (0..5)
.map(|i| {
integration.on_waker_cleanup_started(
format!("waker-{}", i),
&CancelReason::user("batch_test"),
CancelKind::User,
)
})
.collect();
let snapshot = integration.get_debt_status();
assert_eq!(snapshot.total_pending, 5);
let completed = integration.on_batch_cleanup_completed(&work_ids);
assert_eq!(completed, 5);
let snapshot = integration.get_debt_status();
assert_eq!(snapshot.total_pending, 0);
}
#[test]
fn test_emergency_intervention() {
let mut config = CancellationDebtConfig::default();
config.max_queue_depth = 10; // Very low threshold for testing
let integration = DebtRuntimeIntegration::new(config);
// Queue enough work to trigger emergency level
for i in 0..12 {
let _ = integration.on_task_cleanup_started(
TaskId::new_for_test(i, 0),
&CancelReason::user("emergency_test"),
CancelKind::User,
50,
);
}
assert!(integration.check_emergency_intervention());
let cleaned = integration.execute_emergency_relief(Duration::ZERO);
assert!(cleaned > 0);
}
// ====================================================================
// br-asupersync-b4ocgc: pluggable Sleeper covers the wall-clock
// dependency in monitoring_loop. The default uses thread::sleep
// (production behaviour). Tests inject a virtual-time sleeper to
// keep the loop deterministic and replayable under LabRuntime.
// ====================================================================
#[test]
fn b4ocgc_default_sleeper_is_thread_sleep_compatible() {
// Sanity: the default constructor produces an integration
// whose sleeper, when called with a tiny duration, returns
// promptly without panicking. We don't assert exact timing
// (wall-clock-dependent), only that the function shape is
// callable.
let integration = DebtRuntimeIntegration::default();
let start = std::time::Instant::now();
(integration.sleeper)(Duration::from_millis(1));
let elapsed = start.elapsed();
// Sanity bound: 1ms sleep should complete within 1s on any
// sane system. Generous to avoid CI flakes.
assert!(elapsed < Duration::from_secs(1));
}
#[test]
fn b4ocgc_with_sleeper_replaces_thread_sleep_in_loop() {
// Inject a virtual-time sleeper that records every duration
// it was called with. Drive monitoring_loop manually for a
// bounded number of iterations and assert the sleeper saw
// exactly the configured check_interval each tick — without
// ever blocking on wall-clock.
let sleep_log: Arc<Mutex<Vec<Duration>>> = Arc::new(Mutex::new(Vec::new()));
let log_for_sleeper = sleep_log.clone();
let max_calls = 3usize;
let shutdown = Arc::new(Mutex::new(false));
let shutdown_for_sleeper = shutdown.clone();
let virtual_sleeper: DebtMonitorSleeper = Arc::new(move |d: Duration| {
let mut log = log_for_sleeper
.lock()
.expect("sleeper log mutex should not be poisoned");
log.push(d);
// After max_calls iterations, set shutdown so the loop
// exits — preserves the loop semantics test without
// hanging the test process.
if log.len() >= max_calls {
*shutdown_for_sleeper
.lock()
.expect("sleeper shutdown mutex should not be poisoned") = true;
}
// No real sleep — we're under virtual time semantics.
});
let monitor = Arc::new(CancellationDebtMonitor::new(
CancellationDebtConfig::default(),
));
let check_interval = Duration::from_millis(123);
DebtRuntimeIntegration::monitoring_loop(
monitor,
shutdown.clone(),
check_interval,
None,
virtual_sleeper,
);
let calls = sleep_log
.lock()
.expect("sleep log mutex should not be poisoned");
assert!(
calls.len() >= max_calls,
"loop should have called the sleeper at least {max_calls} times, got {}",
calls.len()
);
for d in calls.iter() {
assert_eq!(*d, check_interval, "every sleep must use check_interval");
}
}
#[test]
fn b4ocgc_with_sleeper_builder_threads_through_to_loop() {
// Verify the with_sleeper builder's value is captured when
// start_monitoring is called (covered indirectly: we inspect
// the integration's sleeper field after building).
let probe = Arc::new(Mutex::new(0u32));
let probe_for_sleeper = probe.clone();
let sleeper: DebtMonitorSleeper = Arc::new(move |_d| {
*probe_for_sleeper
.lock()
.expect("probe mutex should not be poisoned in sleeper") += 1;
});
let integration = DebtRuntimeIntegration::default().with_sleeper(sleeper.clone());
// Direct invocation of the captured sleeper to confirm the
// field carries our closure (not the std::thread::sleep
// default).
(integration.sleeper)(Duration::from_secs(0));
assert_eq!(
*probe
.lock()
.expect("probe mutex should not be poisoned for assertion"),
1
);
}
/// br-asupersync-p9wth4 — Verify the panic-counter API on
/// `CancellationDebtMonitor`. The `record_monitoring_loop_panic`
/// hook is invoked from `monitoring_loop` whenever the
/// `catch_unwind` guard recovers a tick-body panic; operators
/// scrape `monitoring_loop_panic_count()` to detect that the
/// observability loop has been hit.
#[test]
fn monitoring_loop_panic_counter_increments() {
let monitor = CancellationDebtMonitor::default();
assert_eq!(monitor.monitoring_loop_panic_count(), 0);
monitor.record_monitoring_loop_panic();
monitor.record_monitoring_loop_panic();
assert_eq!(monitor.monitoring_loop_panic_count(), 2);
}
/// br-asupersync-p9wth4 — Direct in-line verification that a
/// panic in code shaped like the per-tick body is caught by the
/// same `catch_unwind` + `AssertUnwindSafe` pattern the loop
/// uses, and that we can increment the recovered-panic counter
/// from the catch path. This pins the catch_unwind/recovery
/// contract independent of the real monitoring_loop's timing
/// (which is awkward to drive deterministically without
/// virtual time).
#[test]
fn monitoring_loop_catch_unwind_pattern_recovers_panic() {
let monitor = Arc::new(CancellationDebtMonitor::default());
let monitor_ref = &monitor;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
// Mimics the per-tick closure from monitoring_loop. The
// panic stands in for a panicking alert callback or a
// monitor accessor that asserted internally.
let _ = monitor_ref.get_recent_alerts(1);
panic!("simulated tick-body panic (p9wth4)");
}));
assert!(result.is_err(), "catch_unwind must capture the panic");
monitor.record_monitoring_loop_panic();
assert_eq!(
monitor.monitoring_loop_panic_count(),
1,
"the recover path must be wired to the counter"
);
}
}