picoem-common 0.2.1

Shared primitives for the picoem RP2350/RP2040 emulator workspace: Memory, ClockTree, Pacer, PIO building blocks, threading helpers.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
//! Hybrid spin+park barrier for worker-thread synchronisation.
//!
//! A fixed-party rendezvous point: each `wait()` caller blocks until
//! `parties` threads have arrived, then all release simultaneously and
//! the barrier is ready for the next round. Promoted from the Phase 0c
//! prototype (`threading_micro.rs`).
//!
//! See `wrk_docs/2026.04.17 - LLD - Threaded Dual-Core Phase 2 V4.md` §2.
//!
//! ## Mechanism
//!
//! A generation counter distinguishes barrier rounds. The last arriver
//! resets `count`, bumps `generation`, and broadcasts via `Condvar`.
//! Earlier arrivers first spin for a short budget (`SPIN_BUDGET`) — the
//! fast path when all workers converge within a few hundred ns — and
//! only then fall back to `Condvar::wait` so they don't burn CPU while
//! one productive worker runs a long quantum alone. This matters under
//! single-core workloads where three of four workers have nothing to
//! do: the pure-spin variant had them pegging `pause` for the entire
//! quantum, bouncing cache lines against the productive core.
//!
//! ## Poisoning
//!
//! If a worker panics before reaching the barrier, the remaining
//! threads would wait forever. The coordinator catches the panic
//! (Phase 4) and calls [`SpinBarrier::poison`], which unblocks all
//! current and future waiters with [`BarrierResult::Poisoned`] via the
//! same `Condvar` broadcast.
//!
//! ## Watchdog (Stage 5, HLD V1 §6.6)
//!
//! A worker that deadlocks *without* panicking — a PIO block stuck in
//! an infinite `JMP` loop, a CPU core polling an MMIO read that never
//! resolves — leaves the poison path untouched. Every `wait` call
//! therefore carries its own wall-clock deadline
//! ([`DEFAULT_DEADLINE`] unless constructed via [`SpinBarrier::with_deadline`]).
//! The spin loop checks `Instant::now()` every [`WATCHDOG_STRIDE`]
//! iterations and the park loop uses
//! [`Condvar::wait_timeout`](std::sync::Condvar::wait_timeout) with the
//! remaining budget, so expiry is detected promptly on both the hot
//! and cold paths. On expiry, the first waiter records the elapsed
//! time, flips [`SpinBarrier::timed_out`], and calls
//! [`SpinBarrier::poison`] so peer waiters wake into a
//! [`BarrierResult::TimedOut`] exit instead of a latent hang.
//!
//! Phase 0c measured ~425 ns mean round-trip (4-way) on the pure-spin
//! variant; the hybrid keeps that fast path when all workers arrive in
//! close succession.
//!
//! ## Cross-chip reuse
//!
//! This type is chip-agnostic and may move to `picoem-common` in
//! Phase 3 when the RP2040 threaded path lands.

use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering::*};
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};

/// Spin iterations before falling back to `Condvar::wait`. `spin_loop()`
/// hints take ~20 ns on current x86, so 512 iterations is ~10 μs of
/// spin headroom. The previous value (128, ~2.5 μs) was tuned for a
/// general-purpose rendezvous where early arrivers should yield
/// quickly to the productive worker. That tuning is wrong for the
/// ThreadedEmulator's actual per-quantum shape: on OneROM-class
/// peripheral-heavy workloads, worker-to-worker arrival stagger
/// routinely hits 2 μs+ (PIO2 finishes ~2.5 μs after PIO0/core0 in
/// the §1.1 critical-path model), so a 2.5 μs budget forces every
/// barrier round through `park_cv.wait` / `notify_all` — a pair of
/// kernel transitions costing several microseconds each and erasing
/// the win from parallelising the blocks in the first place.
///
/// At 512 iterations (~10 μs) no parking occurs in realistic OneROM
/// workloads — measured via `threading_micro` §9 late-arriver sampler:
/// p50 400-600 ns, p99 ≤ 1000 ns round-trip. The cost ceiling rises
/// symmetrically: worst-case burn is ~160 μs per barrier if every
/// worker is idle while one runs 100 μs+ alone. On the dedicated
/// pinned host cores the ThreadedEmulator targets (§2.5), that burn
/// is dissipating host CPU that nothing else wants; on a general-
/// purpose host sharing cores with other workloads, 160 μs of hot
/// spin per quantum would be unacceptable, but that configuration is
/// out of scope for this runtime.
///
/// See `wrk_journals/2026.04.22 - JRN - Threaded PIO Split
/// Implementation.md` for the measurement data backing these numbers.
const SPIN_BUDGET: u32 = 512;

/// Watchdog wall-clock check cadence. The spin loop re-reads `Instant::now()`
/// every `WATCHDOG_STRIDE` iterations to bound per-iteration overhead. At
/// `~20 ns` per spin hint, `1024` iterations is `~20 μs` between checks —
/// comfortably below the default 5-second deadline but cheap enough to
/// keep the fast path clean (see `threading_micro` regression target
/// <2%). The stride is a power of two so the test is a bitwise `and`.
const WATCHDOG_STRIDE: u32 = 1024;

/// Default wall-clock deadline for [`SpinBarrier::wait`]. Picked as a
/// simple upper bound that catches pathological deadlocks without firing
/// on any realistic per-quantum rendezvous (Phase 0c measured p99 ≤ 1 μs
/// for a 4-way barrier; the ThreadedEmulator's 6-way case runs quanta
/// well under a millisecond even on slow hosts). HLD V1 §6.6 suggested a
/// clock-derived formula (`quantum_cycles × 256 / min_clock_hz + 5 s`);
/// a fixed 5 s is strictly simpler and leaves room to refine later.
pub const DEFAULT_DEADLINE: Duration = Duration::from_secs(5);

/// Outcome of a [`SpinBarrier::wait`] call.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BarrierResult {
    /// All `parties` arrived; this waiter has been released.
    Released,
    /// The barrier was poisoned before release; caller should unwind.
    Poisoned,
    /// The per-barrier wall-clock deadline expired before all `parties`
    /// arrived. The waiter that observes this value has already poisoned
    /// the barrier so the remaining waiters wake on their next check.
    /// `elapsed_ms` is the wall-clock time between the waiter's own
    /// [`SpinBarrier::wait`] entry and the expiry detection, for
    /// diagnostics only (capped at `u32::MAX` ms ≈ 49 days).
    TimedOut { elapsed_ms: u32 },
}

/// Fixed-party hybrid barrier with panic-safe poisoning.
///
/// Constructed once and shared (typically behind an `Arc`) across the
/// worker threads. Both `wait()` and `poison()` take `&self`, so no
/// external locking is needed. Name retained from the Phase 0c
/// spin-only prototype (`SpinBarrier`) for call-site compatibility;
/// the implementation is now spin-then-park.
pub struct SpinBarrier {
    generation: AtomicU32,
    count: AtomicU32,
    parties: u32,
    poisoned: AtomicBool,
    /// Set when any waiter observes deadline expiry (distinct from
    /// ordinary panic-triggered poisoning). The watchdog sets this
    /// flag *and* calls [`Self::poison`] so other waiters wake out of
    /// park, but the coordinator can distinguish a watchdog-fired exit
    /// from a panic-fired exit by reading [`Self::timed_out`].
    timed_out: AtomicBool,
    /// Elapsed milliseconds from the first timing-out waiter's entry to
    /// its detection of expiry. Written once by the first waiter to
    /// trip the watchdog, zero otherwise. Surfaced to callers via the
    /// `TimedOut { elapsed_ms }` variant and a getter.
    timeout_elapsed_ms: AtomicU64,
    /// Wall-clock deadline applied to every waiter individually (not a
    /// shared round deadline). Each [`Self::wait`] call times from its
    /// own entry.
    deadline: Duration,
    /// Held briefly by the last arriver (around the `generation` store)
    /// and by earlier arrivers once their spin budget is exhausted
    /// (across a `Condvar::wait`). Uncontended in the fast path.
    park_mu: Mutex<()>,
    park_cv: Condvar,
}

impl SpinBarrier {
    /// Create a barrier that releases when `parties` threads arrive.
    /// Uses the default wall-clock deadline ([`DEFAULT_DEADLINE`]).
    ///
    /// Panics if `parties < 2` — a single-party barrier is degenerate
    /// and almost always indicates a bug at the call site.
    pub fn new(parties: u32) -> Self {
        Self::with_deadline(parties, DEFAULT_DEADLINE)
    }

    /// Create a barrier with an explicit per-waiter wall-clock deadline.
    ///
    /// Primarily intended for tests that need a short deadline to
    /// exercise the watchdog path; production code should use
    /// [`Self::new`] and inherit [`DEFAULT_DEADLINE`].
    pub fn with_deadline(parties: u32, deadline: Duration) -> Self {
        assert!(parties >= 2);
        Self {
            generation: AtomicU32::new(0),
            count: AtomicU32::new(0),
            parties,
            poisoned: AtomicBool::new(false),
            timed_out: AtomicBool::new(false),
            timeout_elapsed_ms: AtomicU64::new(0),
            deadline,
            park_mu: Mutex::new(()),
            park_cv: Condvar::new(),
        }
    }

    /// Block until all `parties` threads have arrived at this barrier.
    ///
    /// Returns [`BarrierResult::Released`] on normal release,
    /// [`BarrierResult::Poisoned`] if [`Self::poison`] was called
    /// while this thread was waiting (or before it entered `wait`),
    /// or [`BarrierResult::TimedOut`] if the per-waiter wall-clock
    /// deadline elapsed before all parties arrived. On timeout the
    /// observing waiter poisons the barrier so remaining waiters exit
    /// promptly; the barrier is single-use after that.
    pub fn wait(&self) -> BarrierResult {
        let start = Instant::now();
        if self.poisoned.load(Acquire) {
            return self.poisoned_or_timed_out_result(start);
        }
        let cur_gen = self.generation.load(Acquire);
        let n = self.count.fetch_add(1, AcqRel) + 1;
        if n == self.parties {
            // Last arrival: bump generation under the park mutex so any
            // waiter currently inside `park_cv.wait_while` — which holds
            // `park_mu` around its predicate check — linearises on the
            // old-gen side or the new-gen side, never in a window that
            // could miss the broadcast.
            {
                let _g = self.park_mu.lock().unwrap();
                self.count.store(0, Relaxed);
                self.generation.store(cur_gen.wrapping_add(1), Release);
            }
            self.park_cv.notify_all();
            return BarrierResult::Released;
        }

        // Earlier arriver — spin briefly on the fast path. The watchdog
        // check is amortised across `WATCHDOG_STRIDE` iterations so the
        // fast-path cost of reading `Instant::now()` lands well under
        // 1% of the per-barrier-round figure measured by
        // `threading_micro`.
        let mut i: u32 = 0;
        for _ in 0..SPIN_BUDGET {
            if self.poisoned.load(Acquire) {
                return self.poisoned_or_timed_out_result(start);
            }
            if self.generation.load(Acquire) != cur_gen {
                return BarrierResult::Released;
            }
            i = i.wrapping_add(1);
            if i & (WATCHDOG_STRIDE - 1) == 0 && start.elapsed() >= self.deadline {
                return self.trip_watchdog(start);
            }
            std::hint::spin_loop();
        }

        // Fast path exhausted — sleep on the condvar with a remaining-
        // budget timeout so the watchdog still fires even if every
        // waiter parked before expiry. Idle-for-most-of-the-quantum
        // workers hit this path and stop burning CPU cycles that the
        // productive worker would otherwise lose to cache-coherence
        // traffic on the shared barrier lines.
        let mut g = self.park_mu.lock().unwrap();
        loop {
            // Check release first so a late-spin-path generation bump
            // we missed still exits as `Released` rather than falling
            // into the poisoned-or-timed-out result path.
            if self.generation.load(Acquire) != cur_gen {
                return BarrierResult::Released;
            }
            if self.poisoned.load(Acquire) {
                return self.poisoned_or_timed_out_result(start);
            }
            let remaining = self.deadline.saturating_sub(start.elapsed());
            if remaining.is_zero() {
                drop(g);
                return self.trip_watchdog(start);
            }
            let (gg, _timeout) = self.park_cv.wait_timeout(g, remaining).unwrap();
            g = gg;
            // The top-of-loop predicate handles both the "notified"
            // and the "spurious / timeout" returns uniformly, so we
            // don't need to branch on `timeout.timed_out()` here.
        }
    }

    /// Return either `Poisoned` or `TimedOut` depending on whether the
    /// poisoning was caused by watchdog expiry. Shared by every early-
    /// exit path in `wait`. `start` is only consulted when the calling
    /// waiter is reporting its own elapsed time and no prior waiter has
    /// written `timeout_elapsed_ms`.
    #[inline]
    fn poisoned_or_timed_out_result(&self, start: Instant) -> BarrierResult {
        if self.timed_out.load(Acquire) {
            let recorded = self.timeout_elapsed_ms.load(Acquire);
            let elapsed_ms = if recorded != 0 {
                recorded.min(u32::MAX as u64) as u32
            } else {
                start.elapsed().as_millis().min(u32::MAX as u128) as u32
            };
            BarrierResult::TimedOut { elapsed_ms }
        } else {
            BarrierResult::Poisoned
        }
    }

    /// First waiter to detect deadline expiry records the elapsed time,
    /// flips `timed_out`, and poisons the barrier so other waiters wake
    /// up immediately. Subsequent callers re-enter via
    /// [`Self::poisoned_or_timed_out_result`] and observe the recorded
    /// elapsed time.
    #[inline]
    fn trip_watchdog(&self, start: Instant) -> BarrierResult {
        let elapsed = start.elapsed();
        let elapsed_ms = elapsed.as_millis().min(u32::MAX as u128) as u32;
        // Race: two waiters can both elapse before the first CAS. The
        // first to store wins; the second observes the recorded value
        // via `poisoned_or_timed_out_result`.
        let _ = self
            .timeout_elapsed_ms
            .compare_exchange(0, elapsed_ms as u64, AcqRel, Acquire);
        self.timed_out.store(true, Release);
        self.poison();
        BarrierResult::TimedOut { elapsed_ms }
    }

    /// `true` if the barrier has transitioned to the timed-out state
    /// (any waiter observed the deadline expire). Distinct from
    /// ordinary [`Self::poison`] so the coordinator can attribute a
    /// runtime exit to a watchdog rather than a panic.
    pub fn timed_out(&self) -> bool {
        self.timed_out.load(Acquire)
    }

    /// Elapsed-milliseconds snapshot recorded when the watchdog first
    /// fired; zero if the barrier never timed out. `u32` because a
    /// `Duration` isn't `Clone`-friendly inside the emulator error
    /// variant that surfaces this value.
    pub fn timeout_elapsed_ms(&self) -> u32 {
        self.timeout_elapsed_ms.load(Acquire).min(u32::MAX as u64) as u32
    }

    /// Abort all current and future waiters with [`BarrierResult::Poisoned`].
    ///
    /// One-way switch: once poisoned, the barrier stays poisoned for
    /// its lifetime. Intended for use by a panic-recovery coordinator.
    /// Broadcasts on the park condvar so any sleeping waiter wakes up
    /// immediately rather than on the next timeout.
    pub fn poison(&self) {
        // Take/drop the mutex to linearise with `park_cv.wait` predicate
        // checks on the sleeping-waiter side, same reasoning as the
        // generation store in `wait`.
        {
            let _g = self.park_mu.lock().unwrap();
            self.poisoned.store(true, Release);
        }
        self.park_cv.notify_all();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::sync::atomic::{
        AtomicU32, AtomicUsize,
        Ordering::{self, SeqCst},
    };
    use std::thread;
    use std::time::Duration;

    #[test]
    fn all_threads_released() {
        let barrier = Arc::new(SpinBarrier::new(4));
        let flags: [Arc<AtomicU32>; 4] = [
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(0)),
            Arc::new(AtomicU32::new(0)),
        ];

        let handles: Vec<_> = (0..4)
            .map(|i| {
                let b = Arc::clone(&barrier);
                let f = Arc::clone(&flags[i]);
                thread::spawn(move || match b.wait() {
                    BarrierResult::Released => f.store(1, SeqCst),
                    BarrierResult::Poisoned => panic!("unexpected poison"),
                    BarrierResult::TimedOut { .. } => panic!("unexpected timeout"),
                })
            })
            .collect();

        for h in handles {
            h.join().expect("thread panicked");
        }

        for f in &flags {
            assert_eq!(f.load(SeqCst), 1, "thread did not set released flag");
        }
    }

    #[test]
    fn multiple_rounds() {
        const PARTIES: u32 = 4;
        const ROUNDS: u32 = 10;

        let barrier = Arc::new(SpinBarrier::new(PARTIES));
        let counter = Arc::new(AtomicUsize::new(0));

        let handles: Vec<_> = (0..PARTIES)
            .map(|_| {
                let b = Arc::clone(&barrier);
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..ROUNDS {
                        match b.wait() {
                            BarrierResult::Released => {
                                c.fetch_add(1, SeqCst);
                            }
                            BarrierResult::Poisoned => panic!("unexpected poison"),
                            BarrierResult::TimedOut { .. } => panic!("unexpected timeout"),
                        }
                    }
                })
            })
            .collect();

        for h in handles {
            h.join().expect("thread panicked");
        }

        assert_eq!(
            counter.load(SeqCst),
            (PARTIES * ROUNDS) as usize,
            "counter should equal parties * rounds"
        );
    }

    /// HLD V5 §4 item 12 — `ThreadedEmulator` now rendezvouses six
    /// workers (core0, core1, pio0, pio1, pio2, coord) per quantum, so
    /// the primitive's own unit-test surface should cover that arity
    /// explicitly. Sibling of `multiple_rounds` with `PARTIES = 6`.
    #[test]
    fn multiple_rounds_6way() {
        const PARTIES: u32 = 6;
        const ROUNDS: u32 = 10;

        let barrier = Arc::new(SpinBarrier::new(PARTIES));
        let counter = Arc::new(AtomicUsize::new(0));

        let handles: Vec<_> = (0..PARTIES)
            .map(|_| {
                let b = Arc::clone(&barrier);
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..ROUNDS {
                        match b.wait() {
                            BarrierResult::Released => {
                                c.fetch_add(1, SeqCst);
                            }
                            BarrierResult::Poisoned => panic!("unexpected poison"),
                            BarrierResult::TimedOut { .. } => panic!("unexpected timeout"),
                        }
                    }
                })
            })
            .collect();

        for h in handles {
            h.join().expect("thread panicked");
        }

        assert_eq!(
            counter.load(SeqCst),
            (PARTIES * ROUNDS) as usize,
            "counter should equal parties * rounds"
        );
    }

    /// Asymmetric-arrival sibling to `multiple_rounds_6way`. One worker
    /// busy-waits ~2 μs past the others before entering `wait()`. With
    /// `SPIN_BUDGET=512` (~10 μs spin ceiling) no worker should park.
    /// A regression that lowers `SPIN_BUDGET` below the ~2 μs stagger
    /// ceiling would cause `Condvar::wait`/`notify_all` cycles —
    /// measurable as a huge p99 spike in the microbench but previously
    /// undetected by `cargo test`.
    ///
    /// This test doesn't measure timing; it just confirms 10 rounds
    /// complete (no deadlock, no poison), exercising the late-arriver
    /// code path that pure-symmetric `multiple_rounds` misses.
    #[test]
    fn parties_6_asymmetric_arrival_does_not_park() {
        const PARTIES: u32 = 6;
        const ROUNDS: u32 = 10;
        let barrier = Arc::new(SpinBarrier::new(PARTIES));
        let counter = Arc::new(AtomicU32::new(0));

        let handles: Vec<_> = (0..PARTIES)
            .map(|tid| {
                let b = Arc::clone(&barrier);
                let c = Arc::clone(&counter);
                thread::spawn(move || {
                    for _ in 0..ROUNDS {
                        if tid == PARTIES - 1 {
                            // Late arriver busy-waits ~2 μs.
                            let t0 = std::time::Instant::now();
                            while t0.elapsed() < Duration::from_micros(2) {
                                std::hint::spin_loop();
                            }
                        }
                        match b.wait() {
                            BarrierResult::Released => {
                                c.fetch_add(1, SeqCst);
                            }
                            BarrierResult::Poisoned => panic!("unexpected poison"),
                            BarrierResult::TimedOut { .. } => panic!("unexpected timeout"),
                        }
                    }
                })
            })
            .collect();

        for h in handles {
            h.join().expect("thread panicked");
        }
        assert_eq!(counter.load(SeqCst), PARTIES * ROUNDS);
    }

    #[test]
    fn poison_breaks_waiters() {
        // 4-party barrier but only 3 waiters: without poisoning they
        // would spin forever. Main thread waits until all three workers
        // have entered the barrier (observable via `entered`), then
        // poisons. The small trailing sleep gives each worker time to
        // reach the spin loop after incrementing the counter — still
        // technically racy but far more robust than a flat 50ms wait.
        let barrier = Arc::new(SpinBarrier::new(4));
        let entered = Arc::new(AtomicU32::new(0));

        let handles: Vec<_> = (0..3)
            .map(|_| {
                let b = Arc::clone(&barrier);
                let e = Arc::clone(&entered);
                thread::spawn(move || {
                    e.fetch_add(1, Ordering::Release);
                    b.wait()
                })
            })
            .collect();

        while entered.load(Ordering::Acquire) < 3 {
            thread::sleep(Duration::from_millis(1));
        }
        thread::sleep(Duration::from_millis(10));
        barrier.poison();

        for h in handles {
            let result = h.join().expect("thread panicked");
            assert_eq!(
                result,
                BarrierResult::Poisoned,
                "waiter should have returned Poisoned"
            );
        }
    }

    /// Stage 5 (HLD V1 §6.6): a stalled worker must no longer deadlock
    /// the rendezvous. Two parties, only one arrives; the other waiter
    /// must observe `TimedOut` within the configured deadline + a small
    /// wake-up cushion. Uses a short 150 ms deadline so the test runs
    /// in well under a second; the production default is 5 s.
    #[test]
    fn barrier_watchdog_fires_when_worker_stalls() {
        const DEADLINE: Duration = Duration::from_millis(150);
        let barrier = Arc::new(SpinBarrier::with_deadline(2, DEADLINE));
        let b = Arc::clone(&barrier);

        let start = std::time::Instant::now();
        let handle = thread::spawn(move || b.wait());
        // Main thread never arrives — the spawned waiter should trip
        // the watchdog all by itself.
        let result = handle.join().expect("worker panicked");

        let elapsed = start.elapsed();
        // Watchdog should fire between the deadline and the deadline
        // plus a generous host-scheduler cushion. The lower bound is
        // the interesting correctness check; the upper bound catches
        // regressions where the stride-based check forgets to also
        // honour the condvar-timeout path.
        assert!(
            elapsed >= DEADLINE,
            "watchdog fired too early: {elapsed:?} < {DEADLINE:?}"
        );
        assert!(
            elapsed < DEADLINE + Duration::from_millis(500),
            "watchdog took too long: {elapsed:?} > {:?}",
            DEADLINE + Duration::from_millis(500)
        );

        match result {
            BarrierResult::TimedOut { elapsed_ms } => {
                assert!(
                    elapsed_ms >= DEADLINE.as_millis() as u32,
                    "reported elapsed_ms {elapsed_ms} < deadline {}",
                    DEADLINE.as_millis()
                );
            }
            other => panic!("expected TimedOut, got {other:?}"),
        }

        // One-shot poison semantics: every subsequent wait returns the
        // same variant without blocking. Without this property a
        // panicking coordinator could re-enter a watchdog-fired barrier
        // and hang again.
        let reentry = barrier.wait();
        match reentry {
            BarrierResult::TimedOut { .. } => {}
            other => panic!("re-entry after timeout should yield TimedOut, got {other:?}"),
        }
        assert!(barrier.timed_out());
        assert!(barrier.timeout_elapsed_ms() >= DEADLINE.as_millis() as u32);
    }

    /// Coverage: L195 (entry-poisoned check returns true) + the
    /// `Poisoned` arm of `poisoned_or_timed_out_result` (L281, the
    /// `timed_out == false` branch). All other failure-path tests poison
    /// while waiters are already inside the barrier; this one calls
    /// `poison()` BEFORE any thread enters `wait()`, so the very first
    /// load of `self.poisoned` at the top of `wait` short-circuits to
    /// the `Poisoned` result without ever touching the count or
    /// generation atomics. Also asserts `timed_out()` stays false — a
    /// regression that conflated panic-poison with watchdog-poison
    /// would surface here.
    #[test]
    fn wait_after_poison_returns_poisoned_immediately() {
        let barrier = SpinBarrier::new(4);
        barrier.poison();
        // No thread spawning needed — the entry guard at L195 should
        // return immediately on the calling thread.
        let r = barrier.wait();
        assert_eq!(r, BarrierResult::Poisoned);
        assert!(!barrier.timed_out());
        assert_eq!(barrier.timeout_elapsed_ms(), 0);

        // Re-entry should also return Poisoned (one-shot poison
        // semantics), still without flipping timed_out.
        assert_eq!(barrier.wait(), BarrierResult::Poisoned);
        assert!(!barrier.timed_out());
    }

    /// Coverage: L246 (park-loop generation-bump observed → Released).
    /// `parties_6_asymmetric_arrival_does_not_park` deliberately stays
    /// inside the spin budget; this test inverts that — one waiter
    /// sleeps for 50 ms (well beyond SPIN_BUDGET≈10 μs) before arriving,
    /// so the other definitely exhausts its spin budget and falls into
    /// `park_cv.wait_timeout`. When the late arriver finally bumps the
    /// generation, the parked waiter must wake via the broadcast and
    /// take the L246 early-Released branch.
    #[test]
    fn waiter_parks_then_observes_release() {
        let barrier = Arc::new(SpinBarrier::new(2));
        let b = Arc::clone(&barrier);

        let early = thread::spawn(move || b.wait());

        // Give the early thread time to exhaust its spin budget and
        // park on the condvar. SPIN_BUDGET≈10 μs, so 50 ms is overkill
        // but cheap.
        thread::sleep(Duration::from_millis(50));

        // Late arrival — last-arriver path bumps generation under the
        // park mutex and broadcasts, releasing the parked waiter.
        let late = barrier.wait();
        assert_eq!(late, BarrierResult::Released);
        assert_eq!(
            early.join().expect("early panicked"),
            BarrierResult::Released
        );
        assert!(!barrier.timed_out());
    }

    /// Coverage: L249 (park-loop poisoned check returns true → Poisoned).
    /// `poison_breaks_waiters` poisons while waiters are still inside
    /// the spin budget (10 ms cushion sleep), so they observe poison
    /// at L222 in the spin loop. This test forces the waiter past the
    /// spin budget into `park_cv.wait_timeout`, *then* poisons. After
    /// the broadcast wakes the waiter, the top-of-loop predicate at
    /// L249 must return `Poisoned`.
    #[test]
    fn waiter_parks_then_observes_poison() {
        // Long deadline so the watchdog never fires.
        let barrier = Arc::new(SpinBarrier::with_deadline(2, Duration::from_secs(30)));
        let b = Arc::clone(&barrier);

        let h = thread::spawn(move || b.wait());

        // Drive the spawned waiter past its ~10 μs spin budget so it
        // parks on the condvar before we poison.
        thread::sleep(Duration::from_millis(50));
        barrier.poison();

        let r = h.join().expect("waiter panicked");
        assert_eq!(r, BarrierResult::Poisoned);
        assert!(!barrier.timed_out());
    }

    /// Coverage: L253 (park-loop `remaining.is_zero()` → trip_watchdog
    /// from the park path). `barrier_watchdog_fires_when_worker_stalls`
    /// uses a 150 ms deadline so the watchdog is most likely tripped
    /// from the condvar `wait_timeout` return path, but exactly *which*
    /// branch trips first depends on timing; this test reduces the
    /// remaining-budget window further by setting a tight 80 ms deadline
    /// and only probing the result variant + the `timed_out()` flag —
    /// the structural assertion is "park-then-timeout works", not the
    /// precise wall-clock margin (the existing test already pins that).
    #[test]
    fn watchdog_fires_from_park_path() {
        const DEADLINE: Duration = Duration::from_millis(80);
        let barrier = Arc::new(SpinBarrier::with_deadline(2, DEADLINE));
        let b = Arc::clone(&barrier);

        // Single waiter, no peer — guaranteed to park after the spin
        // budget and then trip the watchdog when the condvar
        // wait_timeout returns with remaining == 0.
        let r = thread::spawn(move || b.wait())
            .join()
            .expect("waiter panicked");

        match r {
            BarrierResult::TimedOut { elapsed_ms } => {
                assert!(elapsed_ms as u128 >= DEADLINE.as_millis());
            }
            other => panic!("expected TimedOut, got {other:?}"),
        }
        assert!(barrier.timed_out());
        assert!(barrier.timeout_elapsed_ms() >= DEADLINE.as_millis() as u32);
    }

    /// Targets L229: watchdog tripped during spin (the
    /// `start.elapsed() >= self.deadline` arm of the compound
    /// stride-and-deadline check). Tiny 50 ms deadline + a sole waiter
    /// so the spawned thread races through SPIN_BUDGET=512 iterations
    /// in well under 50 ms, then keeps spinning until the watchdog
    /// stride hits and the elapsed-time check trips. The other party
    /// (this test thread) never arrives.
    #[test]
    fn watchdog_trips_during_spin_2party_50ms() {
        const DEADLINE: Duration = Duration::from_millis(50);
        let barrier = Arc::new(SpinBarrier::with_deadline(2, DEADLINE));
        let b = Arc::clone(&barrier);

        let r = thread::spawn(move || b.wait())
            .join()
            .expect("waiter panicked");

        match r {
            BarrierResult::TimedOut { elapsed_ms } => {
                // 30 ms slack on the lower bound for slow CI hosts.
                assert!(
                    elapsed_ms >= 30,
                    "elapsed_ms {elapsed_ms} should be at least 30 (deadline 50)"
                );
            }
            other => panic!("expected TimedOut, got {other:?}"),
        }
        assert!(barrier.timed_out());
    }

    /// Targets L253: watchdog fires from the park path when the
    /// condvar `wait_timeout` returns with `remaining == 0`. Uses a
    /// 200 ms deadline so the spin budget (~10 μs) is utterly
    /// dwarfed and the waiter is guaranteed to be parked when the
    /// deadline elapses.
    #[test]
    fn watchdog_trips_during_sleep_2party_200ms() {
        const DEADLINE: Duration = Duration::from_millis(200);
        let barrier = Arc::new(SpinBarrier::with_deadline(2, DEADLINE));
        let b = Arc::clone(&barrier);

        let r = thread::spawn(move || b.wait())
            .join()
            .expect("waiter panicked");

        match r {
            BarrierResult::TimedOut { elapsed_ms } => {
                assert!(
                    elapsed_ms >= 150,
                    "elapsed_ms {elapsed_ms} should be at least 150 (deadline 200)"
                );
            }
            other => panic!("expected TimedOut, got {other:?}"),
        }
        assert!(barrier.timed_out());
        assert!(barrier.timeout_elapsed_ms() >= 150);
    }

    /// Targets L195 + the Poisoned arm of `poisoned_or_timed_out_result`
    /// (L281, the `timed_out == false` branch of L272). Thread A
    /// poisons before thread B even calls `wait()`; B's first load of
    /// `self.poisoned` short-circuits to Poisoned without ever touching
    /// count or generation atomics.
    #[test]
    fn poisoned_on_entry_via_separate_thread() {
        let barrier = Arc::new(SpinBarrier::new(2));
        let b_poisoner = Arc::clone(&barrier);

        // Thread A: poison and exit. Joining A guarantees the poison
        // store has happened-before the subsequent wait() call below
        // (join is a synchronisation point).
        thread::spawn(move || {
            b_poisoner.poison();
        })
        .join()
        .expect("poisoner panicked");

        // Thread B (this thread): wait should observe poison at L195.
        let r = barrier.wait();
        assert_eq!(r, BarrierResult::Poisoned);
        assert!(!barrier.timed_out());
    }

    /// Targets L222: poisoned check inside the spin loop. 2-party
    /// barrier; one waiter parks-then-spins via the count fetch_add,
    /// then the main thread poisons while the spawned thread is still
    /// inside the spin window. To ensure the spawned thread is in the
    /// spin loop (not in the entry guard), gate its arrival on a
    /// counter the main thread reads before poisoning.
    #[test]
    fn poisoned_during_spin_loop() {
        let barrier = Arc::new(SpinBarrier::new(2));
        let entered = Arc::new(AtomicU32::new(0));

        let b = Arc::clone(&barrier);
        let e = Arc::clone(&entered);
        let h = thread::spawn(move || {
            e.store(1, SeqCst);
            b.wait()
        });

        // Wait until the spawned thread has at least signalled it's
        // about to enter wait(). It will be in either the spin loop
        // or the park path; both observe poison correctly.
        while entered.load(SeqCst) == 0 {
            thread::sleep(Duration::from_micros(10));
        }
        // Tiny extra delay so the spawned thread is past the entry
        // guard (L195) and into the count-fetch_add then the spin
        // loop.
        thread::sleep(Duration::from_millis(1));
        barrier.poison();

        let r = h.join().expect("waiter panicked");
        assert_eq!(r, BarrierResult::Poisoned);
        assert!(!barrier.timed_out());
    }

    /// Targets L249: poisoned check inside the park-loop top
    /// predicate. Drive the spawned waiter past its ~10 μs spin
    /// budget by sleeping the main thread for 50 ms before poisoning.
    /// The waiter is guaranteed to be parked on `park_cv.wait_timeout`;
    /// the broadcast wakes it and the L249 check returns true.
    #[test]
    fn poisoned_during_condvar_sleep() {
        // Long deadline so the watchdog cannot fire first.
        let barrier = Arc::new(SpinBarrier::with_deadline(2, Duration::from_secs(30)));
        let b = Arc::clone(&barrier);
        let h = thread::spawn(move || b.wait());

        // 50 ms >> SPIN_BUDGET (~10 μs), so the spawned waiter is
        // definitely parked on the condvar by the time we poison.
        thread::sleep(Duration::from_millis(50));
        barrier.poison();

        let r = h.join().expect("waiter panicked");
        assert_eq!(r, BarrierResult::Poisoned);
        assert!(!barrier.timed_out());
    }

    /// Targets L272 + L274 together: `poisoned_or_timed_out_result`
    /// must return `TimedOut` (not `Poisoned`) when `timed_out == true`,
    /// and the `recorded != 0` arm of L274 must be exercised on
    /// re-entry after the first waiter has written
    /// `timeout_elapsed_ms`. Step 1 trips the watchdog (records
    /// elapsed_ms != 0 and flips timed_out). Step 2 calls wait()
    /// again: the entry guard at L195 sees poisoned=true, dispatches
    /// to `poisoned_or_timed_out_result`, which now takes the
    /// `timed_out=true` branch (L272) AND the `recorded != 0` branch
    /// (L274) because the first call already wrote a non-zero
    /// elapsed_ms.
    #[test]
    fn poisoned_or_timed_out_result_distinguishes_variants() {
        // First, trip the watchdog on a fresh barrier — this
        // exercises the trip_watchdog path and the TimedOut arm of
        // poisoned_or_timed_out_result on subsequent waiters.
        const DEADLINE: Duration = Duration::from_millis(50);
        let timed_out_barrier = SpinBarrier::with_deadline(2, DEADLINE);
        let r1 = timed_out_barrier.wait();
        // First waiter directly returns from trip_watchdog with the
        // computed elapsed_ms.
        let first_elapsed = match r1 {
            BarrierResult::TimedOut { elapsed_ms } => elapsed_ms,
            other => panic!("expected TimedOut on first wait, got {other:?}"),
        };
        assert!(timed_out_barrier.timed_out());
        // Step 2: re-entry. L195 sees poisoned, dispatches to
        // poisoned_or_timed_out_result. timed_out=true → L272 true
        // branch. timeout_elapsed_ms != 0 → L274 true branch.
        let r2 = timed_out_barrier.wait();
        match r2 {
            BarrierResult::TimedOut { elapsed_ms } => {
                // The recorded value is what the first trip wrote;
                // a re-entry should report >= the original (it reads
                // the recorded ms exactly, not the calling thread's
                // own elapsed time).
                assert_eq!(
                    elapsed_ms, first_elapsed,
                    "re-entry should surface the recorded watchdog elapsed_ms"
                );
            }
            other => panic!("expected TimedOut on re-entry, got {other:?}"),
        }

        // Now exercise the Poisoned arm of L272 (timed_out == false)
        // on a fresh barrier that is poisoned without the watchdog
        // ever firing.
        let poison_only_barrier = SpinBarrier::new(2);
        poison_only_barrier.poison();
        assert!(!poison_only_barrier.timed_out());
        let r3 = poison_only_barrier.wait();
        assert_eq!(r3, BarrierResult::Poisoned);
        // Direct check: timeout_elapsed_ms() returns 0 when the
        // watchdog never fired, distinguishing this case from the
        // recorded-elapsed path above.
        assert_eq!(poison_only_barrier.timeout_elapsed_ms(), 0);
    }

    /// Coverage: the asymmetric variant where the lone-arriver hits the
    /// `n == parties` last-arrival path (L200 true branch) on a
    /// 2-party barrier *without* any concurrent threads — purely
    /// structural exercise of the lock-bump-broadcast sequence with no
    /// waiters to wake. Ensures the broadcast on an empty waiter set
    /// is harmless and the barrier re-arms cleanly for a second round.
    #[test]
    fn solo_round_then_normal_round() {
        let barrier = Arc::new(SpinBarrier::new(2));

        // Round 1: solo two-arrival sequence on the same thread. The
        // first wait would normally block, so we instead do the
        // simplest 2-party round on two threads to advance the
        // generation, then verify a *second* round still works after
        // the first cleanly unwinds.
        let b1 = Arc::clone(&barrier);
        let h = thread::spawn(move || b1.wait());
        let r_main = barrier.wait();
        let r_other = h.join().expect("other panicked");
        assert_eq!(r_main, BarrierResult::Released);
        assert_eq!(r_other, BarrierResult::Released);

        // Round 2 — re-arm check. The generation counter has wrapped
        // by exactly one; if the count was not reset to zero by the
        // last arriver, this round would deadlock.
        let b2 = Arc::clone(&barrier);
        let h2 = thread::spawn(move || b2.wait());
        let r_main2 = barrier.wait();
        let r_other2 = h2.join().expect("other panicked");
        assert_eq!(r_main2, BarrierResult::Released);
        assert_eq!(r_other2, BarrierResult::Released);
        assert!(!barrier.timed_out());
    }
}