Skip to main content

pi/
hostcall_queue.rs

1//! Hostcall queue primitives with explicit reclamation telemetry.
2//!
3//! The fast lane uses a bounded lock-free ring (`ArrayQueue`). When pressure
4//! exceeds ring capacity, requests spill into a bounded overflow deque to
5//! preserve FIFO ordering across the two lanes.
6
7pub use crate::hostcall_s3_fifo::S3FifoFallbackReason;
8use crossbeam_queue::ArrayQueue;
9use std::collections::{BTreeMap, VecDeque};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12
13pub const HOSTCALL_FAST_RING_CAPACITY: usize = 256;
14pub const HOSTCALL_OVERFLOW_CAPACITY: usize = 2_048;
15const SAFE_FALLBACK_BACKLOG_MULTIPLIER: usize = 8;
16const SAFE_FALLBACK_BACKLOG_MIN: usize = 32;
17const S3_FIFO_GHOST_CAPACITY_MULTIPLIER: usize = 2;
18const S3_FIFO_GHOST_CAPACITY_MIN: usize = 16;
19const S3_FIFO_MIN_SIGNAL_SAMPLES: u64 = 32;
20const S3_FIFO_MAX_SIGNALLESS_STREAK: u64 = 64;
21const S3_FIFO_UNSTABLE_REJECTION_STREAK: u64 = 16;
22
23/// BRAVO-style lock bias mode for metadata contention handling.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum BravoBiasMode {
26    /// Neutral mode. No explicit read bias is applied.
27    Balanced,
28    /// Prefer reader throughput under stable read-heavy contention.
29    ReadBiased,
30    /// Temporary writer-favoring recovery mode after starvation risk.
31    WriterRecovery,
32}
33
34/// Deterministic contention signature computed from a fixed observation window.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ContentionSignature {
37    /// Window does not include enough operations for a stable decision.
38    InsufficientSamples,
39    /// Read-dominant contention with healthy writer behavior.
40    ReadDominant,
41    /// Mixed read/write contention without starvation indicators.
42    MixedContention,
43    /// Writer wait/timeout profile indicates starvation risk.
44    WriterStarvationRisk,
45    /// Write-dominant contention (or low reader pressure).
46    WriteDominant,
47}
48
49/// Observation bucket consumed by the BRAVO policy state machine.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub struct ContentionSample {
52    pub read_acquires: u64,
53    pub write_acquires: u64,
54    pub read_wait_p95_us: u64,
55    pub write_wait_p95_us: u64,
56    pub write_timeouts: u64,
57}
58
59impl ContentionSample {
60    #[must_use]
61    pub const fn total_acquires(self) -> u64 {
62        self.read_acquires.saturating_add(self.write_acquires)
63    }
64
65    #[must_use]
66    pub fn read_ratio_permille(self) -> u32 {
67        let total = self.total_acquires();
68        if total == 0 {
69            return 0;
70        }
71        let numerator = self.read_acquires.saturating_mul(1_000);
72        let ratio = numerator / total;
73        u32::try_from(ratio).unwrap_or(1_000)
74    }
75}
76
77/// Tuning knobs for deterministic BRAVO contention policy behavior.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub struct BravoContentionConfig {
80    pub min_total_acquires: u64,
81    pub read_dominant_ratio_permille: u32,
82    pub mixed_ratio_floor_permille: u32,
83    pub mixed_ratio_ceiling_permille: u32,
84    pub writer_starvation_wait_us: u64,
85    pub writer_starvation_timeouts: u64,
86    pub max_consecutive_read_bias_windows: u32,
87    pub writer_recovery_windows: u32,
88}
89
90impl Default for BravoContentionConfig {
91    fn default() -> Self {
92        Self {
93            min_total_acquires: 32,
94            read_dominant_ratio_permille: 800,
95            mixed_ratio_floor_permille: 450,
96            mixed_ratio_ceiling_permille: 799,
97            writer_starvation_wait_us: 8_000,
98            writer_starvation_timeouts: 2,
99            max_consecutive_read_bias_windows: 5,
100            writer_recovery_windows: 2,
101        }
102    }
103}
104
105/// One policy transition decision generated from an observation window.
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub struct BravoPolicyDecision {
108    pub previous_mode: BravoBiasMode,
109    pub next_mode: BravoBiasMode,
110    pub signature: ContentionSignature,
111    pub switched: bool,
112    pub rollback_triggered: bool,
113}
114
115/// Snapshot of contention policy internals for diagnostics and regression tests.
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub struct BravoPolicyTelemetry {
118    pub mode: BravoBiasMode,
119    pub transitions: u64,
120    pub rollbacks: u64,
121    pub windows_observed: u64,
122    pub consecutive_read_bias_windows: u32,
123    pub writer_recovery_remaining: u32,
124    pub last_signature: ContentionSignature,
125}
126
127/// Deterministic BRAVO-style contention policy state machine.
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub struct BravoContentionState {
130    config: BravoContentionConfig,
131    mode: BravoBiasMode,
132    transitions: u64,
133    rollbacks: u64,
134    windows_observed: u64,
135    consecutive_read_bias_windows: u32,
136    writer_recovery_remaining: u32,
137    last_signature: ContentionSignature,
138}
139
140impl BravoContentionState {
141    #[must_use]
142    pub const fn new(config: BravoContentionConfig) -> Self {
143        Self {
144            config,
145            mode: BravoBiasMode::Balanced,
146            transitions: 0,
147            rollbacks: 0,
148            windows_observed: 0,
149            consecutive_read_bias_windows: 0,
150            writer_recovery_remaining: 0,
151            last_signature: ContentionSignature::InsufficientSamples,
152        }
153    }
154
155    #[must_use]
156    pub const fn mode(self) -> BravoBiasMode {
157        self.mode
158    }
159
160    #[must_use]
161    pub const fn snapshot(self) -> BravoPolicyTelemetry {
162        BravoPolicyTelemetry {
163            mode: self.mode,
164            transitions: self.transitions,
165            rollbacks: self.rollbacks,
166            windows_observed: self.windows_observed,
167            consecutive_read_bias_windows: self.consecutive_read_bias_windows,
168            writer_recovery_remaining: self.writer_recovery_remaining,
169            last_signature: self.last_signature,
170        }
171    }
172
173    pub fn observe(&mut self, sample: ContentionSample) -> BravoPolicyDecision {
174        let previous_mode = self.mode;
175        let signature = Self::classify(sample, self.config);
176        self.windows_observed = self.windows_observed.saturating_add(1);
177
178        let mut rollback_triggered = false;
179        match self.mode {
180            BravoBiasMode::Balanced => match signature {
181                ContentionSignature::WriterStarvationRisk => {
182                    self.mode = BravoBiasMode::WriterRecovery;
183                    self.writer_recovery_remaining = self.config.writer_recovery_windows.max(1);
184                    self.consecutive_read_bias_windows = 0;
185                    self.rollbacks = self.rollbacks.saturating_add(1);
186                    rollback_triggered = true;
187                }
188                ContentionSignature::ReadDominant | ContentionSignature::MixedContention => {
189                    self.mode = BravoBiasMode::ReadBiased;
190                    self.consecutive_read_bias_windows = 1;
191                }
192                ContentionSignature::InsufficientSamples | ContentionSignature::WriteDominant => {
193                    self.consecutive_read_bias_windows = 0;
194                }
195            },
196            BravoBiasMode::ReadBiased => {
197                self.consecutive_read_bias_windows =
198                    self.consecutive_read_bias_windows.saturating_add(1);
199
200                let starvation = signature == ContentionSignature::WriterStarvationRisk;
201                let fairness_budget_exhausted = self.consecutive_read_bias_windows
202                    >= self.config.max_consecutive_read_bias_windows.max(1);
203
204                if starvation || fairness_budget_exhausted {
205                    self.mode = BravoBiasMode::WriterRecovery;
206                    self.writer_recovery_remaining = self.config.writer_recovery_windows.max(1);
207                    self.consecutive_read_bias_windows = 0;
208                    rollback_triggered = starvation;
209                    if starvation {
210                        self.rollbacks = self.rollbacks.saturating_add(1);
211                    }
212                } else if matches!(
213                    signature,
214                    ContentionSignature::InsufficientSamples | ContentionSignature::WriteDominant
215                ) {
216                    self.mode = BravoBiasMode::Balanced;
217                    self.consecutive_read_bias_windows = 0;
218                }
219            }
220            BravoBiasMode::WriterRecovery => {
221                self.consecutive_read_bias_windows = 0;
222                if signature == ContentionSignature::WriterStarvationRisk {
223                    self.writer_recovery_remaining = self.config.writer_recovery_windows.max(1);
224                } else if self.writer_recovery_remaining > 0 {
225                    self.writer_recovery_remaining -= 1;
226                }
227                if self.writer_recovery_remaining == 0 {
228                    self.mode = BravoBiasMode::Balanced;
229                }
230            }
231        }
232
233        if self.mode != previous_mode {
234            self.transitions = self.transitions.saturating_add(1);
235        }
236        self.last_signature = signature;
237
238        BravoPolicyDecision {
239            previous_mode,
240            next_mode: self.mode,
241            signature,
242            switched: self.mode != previous_mode,
243            rollback_triggered,
244        }
245    }
246
247    #[must_use]
248    pub fn classify(
249        sample: ContentionSample,
250        config: BravoContentionConfig,
251    ) -> ContentionSignature {
252        if sample.total_acquires() < config.min_total_acquires {
253            return ContentionSignature::InsufficientSamples;
254        }
255
256        if sample.write_wait_p95_us >= config.writer_starvation_wait_us
257            || sample.write_timeouts >= config.writer_starvation_timeouts
258        {
259            return ContentionSignature::WriterStarvationRisk;
260        }
261
262        let read_ratio = sample.read_ratio_permille();
263        let read_dominant_floor = config.read_dominant_ratio_permille.min(1_000);
264        if read_ratio >= read_dominant_floor {
265            return ContentionSignature::ReadDominant;
266        }
267
268        let mixed_floor = config.mixed_ratio_floor_permille.min(1_000);
269        let mixed_ceiling = config
270            .mixed_ratio_ceiling_permille
271            .clamp(mixed_floor, 1_000);
272        if read_ratio >= mixed_floor && read_ratio <= mixed_ceiling {
273            return ContentionSignature::MixedContention;
274        }
275
276        ContentionSignature::WriteDominant
277    }
278}
279
280impl Default for BravoContentionState {
281    fn default() -> Self {
282        Self::new(BravoContentionConfig::default())
283    }
284}
285
286/// Optional per-request tenant key used for fairness/admission accounting.
287///
288/// Implement this for queue payloads that can expose extension-level identity.
289/// Primitive test payloads may use the default `None` implementation.
290pub trait QueueTenant {
291    #[must_use]
292    fn tenant_key(&self) -> Option<&str> {
293        None
294    }
295}
296
297macro_rules! impl_queue_tenant_none {
298    ($($ty:ty),+ $(,)?) => {
299        $(
300            impl QueueTenant for $ty {}
301        )+
302    };
303}
304
305impl_queue_tenant_none!(
306    (),
307    bool,
308    char,
309    u8,
310    u16,
311    u32,
312    u64,
313    usize,
314    i8,
315    i16,
316    i32,
317    i64,
318    isize,
319    String,
320);
321
322/// Runtime mode for S3-FIFO-inspired queue admission.
323#[derive(Debug, Clone, Copy, PartialEq, Eq)]
324pub enum S3FifoMode {
325    Active,
326    ConservativeFifo,
327}
328
329/// Deterministic admission configuration for S3-FIFO-inspired behavior.
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub struct S3FifoConfig {
332    pub tenant_budget: usize,
333    pub ghost_capacity: usize,
334    pub min_signal_samples: u64,
335    pub max_signalless_streak: u64,
336    pub unstable_rejection_streak: u64,
337}
338
339impl S3FifoConfig {
340    #[must_use]
341    pub fn from_capacities(fast_capacity: usize, overflow_capacity: usize) -> Self {
342        let tenant_budget = (overflow_capacity / 2).max(1);
343        let ghost_capacity = fast_capacity
344            .saturating_add(overflow_capacity)
345            .saturating_mul(S3_FIFO_GHOST_CAPACITY_MULTIPLIER)
346            .max(S3_FIFO_GHOST_CAPACITY_MIN);
347        Self {
348            tenant_budget,
349            ghost_capacity,
350            min_signal_samples: S3_FIFO_MIN_SIGNAL_SAMPLES,
351            max_signalless_streak: S3_FIFO_MAX_SIGNALLESS_STREAK,
352            unstable_rejection_streak: S3_FIFO_UNSTABLE_REJECTION_STREAK,
353        }
354    }
355}
356
357/// Lightweight diagnostics snapshot for S3-FIFO admission internals.
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub struct S3FifoTelemetry {
360    pub mode: S3FifoMode,
361    pub fallback_reason: Option<S3FifoFallbackReason>,
362    pub ghost_depth: usize,
363    pub ghost_hits_total: u64,
364    pub fairness_rejected_total: u64,
365    pub signal_samples: u64,
366    pub signalless_streak: u64,
367    pub fallback_transitions: u64,
368    pub tenant_budget: usize,
369    pub active_tenants: usize,
370}
371
372#[derive(Debug, Clone)]
373struct S3FifoState {
374    config: S3FifoConfig,
375    mode: S3FifoMode,
376    fallback_reason: Option<S3FifoFallbackReason>,
377    ghost_order: BTreeMap<u64, String>,
378    ghost_lookup: BTreeMap<String, u64>,
379    ghost_gen: u64,
380    tenant_backlog: BTreeMap<String, usize>,
381    ghost_hits_total: u64,
382    fairness_rejected_total: u64,
383    signal_samples: u64,
384    signalless_streak: u64,
385    unstable_rejection_streak: u64,
386    fallback_transitions: u64,
387}
388
389impl S3FifoState {
390    #[must_use]
391    const fn new(config: S3FifoConfig) -> Self {
392        Self {
393            config,
394            mode: S3FifoMode::Active,
395            fallback_reason: None,
396            ghost_order: BTreeMap::new(),
397            ghost_lookup: BTreeMap::new(),
398            ghost_gen: 0,
399            tenant_backlog: BTreeMap::new(),
400            ghost_hits_total: 0,
401            fairness_rejected_total: 0,
402            signal_samples: 0,
403            signalless_streak: 0,
404            unstable_rejection_streak: 0,
405            fallback_transitions: 0,
406        }
407    }
408
409    fn observe_signal(&mut self, tenant_key: Option<&str>) {
410        if self.mode != S3FifoMode::Active {
411            return;
412        }
413        if tenant_key.is_some() {
414            self.signal_samples = self.signal_samples.saturating_add(1);
415            self.signalless_streak = 0;
416        } else {
417            self.signalless_streak = self.signalless_streak.saturating_add(1);
418            if self.signalless_streak >= self.config.max_signalless_streak
419                && self.signal_samples < self.config.min_signal_samples
420            {
421                self.transition_to_fallback(S3FifoFallbackReason::SignalQualityInsufficient);
422            }
423        }
424    }
425
426    fn allow_main_admission(&mut self, tenant_key: Option<&str>) -> bool {
427        if self.mode != S3FifoMode::Active {
428            return true;
429        }
430        let Some(tenant_key) = tenant_key else {
431            return true;
432        };
433        let backlog = self.tenant_backlog.get(tenant_key).copied().unwrap_or(0);
434        if backlog < self.config.tenant_budget {
435            return true;
436        }
437        if self.consume_ghost_hit(tenant_key) {
438            self.unstable_rejection_streak = 0;
439            return true;
440        }
441
442        self.fairness_rejected_total = self.fairness_rejected_total.saturating_add(1);
443        self.unstable_rejection_streak = self.unstable_rejection_streak.saturating_add(1);
444        self.record_ghost(tenant_key);
445        if self.unstable_rejection_streak >= self.config.unstable_rejection_streak {
446            self.transition_to_fallback(S3FifoFallbackReason::FairnessInstability);
447        }
448        false
449    }
450
451    fn on_main_enqueued(&mut self, tenant_key: Option<&str>) {
452        if self.mode != S3FifoMode::Active {
453            return;
454        }
455        if let Some(tenant_key) = tenant_key {
456            let entry = self
457                .tenant_backlog
458                .entry(tenant_key.to_string())
459                .or_insert(0);
460            *entry = entry.saturating_add(1);
461        }
462        self.unstable_rejection_streak = 0;
463    }
464
465    fn on_main_dequeued(&mut self, tenant_key: Option<&str>) {
466        if self.mode != S3FifoMode::Active {
467            return;
468        }
469        if let Some(tenant_key) = tenant_key {
470            if let Some(backlog) = self.tenant_backlog.get_mut(tenant_key) {
471                *backlog = backlog.saturating_sub(1);
472                if *backlog == 0 {
473                    self.tenant_backlog.remove(tenant_key);
474                }
475            }
476            self.record_ghost(tenant_key);
477        }
478    }
479
480    fn on_main_overflow_reject(&mut self, tenant_key: Option<&str>) {
481        if self.mode != S3FifoMode::Active {
482            return;
483        }
484        if let Some(tenant_key) = tenant_key {
485            self.record_ghost(tenant_key);
486        }
487        self.unstable_rejection_streak = self.unstable_rejection_streak.saturating_add(1);
488        if self.unstable_rejection_streak >= self.config.unstable_rejection_streak {
489            self.transition_to_fallback(S3FifoFallbackReason::FairnessInstability);
490        }
491    }
492
493    fn transition_to_fallback(&mut self, reason: S3FifoFallbackReason) {
494        if self.mode == S3FifoMode::ConservativeFifo {
495            return;
496        }
497        self.mode = S3FifoMode::ConservativeFifo;
498        self.fallback_reason = Some(reason);
499        self.fallback_transitions = self.fallback_transitions.saturating_add(1);
500        self.ghost_order.clear();
501        self.ghost_lookup.clear();
502        self.ghost_gen = 0;
503        self.tenant_backlog.clear();
504    }
505
506    fn consume_ghost_hit(&mut self, tenant_key: &str) -> bool {
507        if let Some(generation) = self.ghost_lookup.remove(tenant_key) {
508            self.ghost_order.remove(&generation);
509            self.ghost_hits_total = self.ghost_hits_total.saturating_add(1);
510            true
511        } else {
512            false
513        }
514    }
515
516    fn record_ghost(&mut self, tenant_key: &str) {
517        if tenant_key.is_empty() {
518            return;
519        }
520
521        // Prevent generation overflow from corrupting the ghost order map.
522        if self.ghost_gen == u64::MAX {
523            self.ghost_gen = 0;
524            self.ghost_order.clear();
525            self.ghost_lookup.clear();
526        }
527
528        self.ghost_gen = self.ghost_gen.saturating_add(1);
529
530        if let Some(gen_mut) = self.ghost_lookup.get_mut(tenant_key) {
531            let old_gen = *gen_mut;
532            *gen_mut = self.ghost_gen;
533            if let Some(k_reused) = self.ghost_order.remove(&old_gen) {
534                self.ghost_order.insert(self.ghost_gen, k_reused);
535            }
536        } else {
537            let key_owned = tenant_key.to_string();
538            self.ghost_lookup.insert(key_owned.clone(), self.ghost_gen);
539            self.ghost_order.insert(self.ghost_gen, key_owned);
540        }
541
542        while self.ghost_lookup.len() > self.config.ghost_capacity {
543            if let Some((_, popped_key)) = self.ghost_order.pop_first() {
544                self.ghost_lookup.remove(&popped_key);
545            } else {
546                break;
547            }
548        }
549    }
550
551    #[must_use]
552    fn snapshot(&self) -> S3FifoTelemetry {
553        S3FifoTelemetry {
554            mode: self.mode,
555            fallback_reason: self.fallback_reason,
556            ghost_depth: self.ghost_lookup.len(),
557            ghost_hits_total: self.ghost_hits_total,
558            fairness_rejected_total: self.fairness_rejected_total,
559            signal_samples: self.signal_samples,
560            signalless_streak: self.signalless_streak,
561            fallback_transitions: self.fallback_transitions,
562            tenant_budget: self.config.tenant_budget,
563            active_tenants: self.tenant_backlog.len(),
564        }
565    }
566}
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum HostcallQueueMode {
570    /// Use epoch-based retirement bookkeeping.
571    Ebr,
572    /// Disable EBR retirement and drop popped nodes immediately.
573    SafeFallback,
574}
575
576impl HostcallQueueMode {
577    #[must_use]
578    pub fn from_env() -> Self {
579        std::env::var("PI_HOSTCALL_QUEUE_RECLAIMER")
580            .ok()
581            .as_deref()
582            .and_then(Self::parse)
583            .unwrap_or(Self::SafeFallback)
584    }
585
586    fn parse(value: &str) -> Option<Self> {
587        match value.trim().to_ascii_lowercase().as_str() {
588            "ebr" | "epoch" | "epoch-based" => Some(Self::Ebr),
589            "fallback" | "safe-fallback" | "off" | "disabled" | "legacy" => {
590                Some(Self::SafeFallback)
591            }
592            _ => None,
593        }
594    }
595}
596
597#[derive(Debug, Clone, Copy, PartialEq, Eq)]
598pub enum HostcallQueueEnqueueResult {
599    FastPath { depth: usize },
600    OverflowPath { depth: usize, overflow_depth: usize },
601    Rejected { depth: usize, overflow_depth: usize },
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub struct HostcallQueueTelemetry {
606    pub fast_depth: usize,
607    pub overflow_depth: usize,
608    pub total_depth: usize,
609    pub max_depth_seen: usize,
610    pub overflow_enqueued_total: u64,
611    pub overflow_rejected_total: u64,
612    pub fast_capacity: usize,
613    pub overflow_capacity: usize,
614    pub reclamation_mode: HostcallQueueMode,
615    pub retired_backlog: usize,
616    pub reclaimed_total: u64,
617    pub current_epoch: u64,
618    pub epoch_lag: u64,
619    pub max_epoch_lag: u64,
620    pub reclamation_latency_max_epochs: u64,
621    pub fallback_transitions: u64,
622    pub active_epoch_pins: usize,
623    pub bravo_mode: BravoBiasMode,
624    pub bravo_transitions: u64,
625    pub bravo_rollbacks: u64,
626    pub bravo_consecutive_read_bias_windows: u32,
627    pub bravo_writer_recovery_remaining: u32,
628    pub bravo_last_signature: ContentionSignature,
629    pub s3fifo_mode: S3FifoMode,
630    pub s3fifo_fallback_reason: Option<S3FifoFallbackReason>,
631    pub s3fifo_ghost_depth: usize,
632    pub s3fifo_ghost_hits_total: u64,
633    pub s3fifo_fairness_rejected_total: u64,
634    pub s3fifo_signal_samples: u64,
635    pub s3fifo_signalless_streak: u64,
636    pub s3fifo_fallback_transitions: u64,
637    pub s3fifo_tenant_budget: usize,
638    pub s3fifo_active_tenants: usize,
639}
640
641#[derive(Debug)]
642struct RetiredNode<T> {
643    value: T,
644    retired_epoch: u64,
645}
646
647#[derive(Debug)]
648pub struct HostcallEpochPin {
649    active_epoch_pins: Arc<AtomicUsize>,
650}
651
652impl Drop for HostcallEpochPin {
653    fn drop(&mut self) {
654        let previous = self.active_epoch_pins.fetch_sub(1, Ordering::SeqCst);
655        debug_assert!(previous > 0, "epoch pin underflow");
656    }
657}
658
659#[derive(Debug)]
660pub struct HostcallRequestQueue<T: Clone + QueueTenant> {
661    fast: ArrayQueue<T>,
662    fast_capacity: usize,
663    overflow: VecDeque<T>,
664    overflow_enqueued_total: u64,
665    overflow_rejected_total: u64,
666    max_depth_seen: usize,
667    overflow_capacity: usize,
668    reclamation_mode: HostcallQueueMode,
669    active_epoch_pins: Arc<AtomicUsize>,
670    current_epoch: u64,
671    retired: VecDeque<RetiredNode<T>>,
672    reclaimed_total: u64,
673    max_epoch_lag: u64,
674    reclamation_latency_max_epochs: u64,
675    fallback_transitions: u64,
676    safe_fallback_backlog_threshold: usize,
677    contention_policy: BravoContentionState,
678    s3fifo: S3FifoState,
679}
680
681impl<T: Clone + QueueTenant> HostcallRequestQueue<T> {
682    #[must_use]
683    pub fn with_capacities(fast_capacity: usize, overflow_capacity: usize) -> Self {
684        Self::with_mode(
685            fast_capacity,
686            overflow_capacity,
687            HostcallQueueMode::from_env(),
688        )
689    }
690
691    #[must_use]
692    pub fn with_mode(
693        fast_capacity: usize,
694        overflow_capacity: usize,
695        reclamation_mode: HostcallQueueMode,
696    ) -> Self {
697        let fast_capacity = fast_capacity.max(1);
698        let overflow_capacity = overflow_capacity.max(1);
699        let safe_fallback_backlog_threshold = fast_capacity
700            .saturating_add(overflow_capacity)
701            .saturating_mul(SAFE_FALLBACK_BACKLOG_MULTIPLIER)
702            .max(SAFE_FALLBACK_BACKLOG_MIN);
703        let s3fifo = S3FifoState::new(S3FifoConfig::from_capacities(
704            fast_capacity,
705            overflow_capacity,
706        ));
707        Self {
708            fast: ArrayQueue::new(fast_capacity),
709            fast_capacity,
710            overflow: VecDeque::new(),
711            overflow_enqueued_total: 0,
712            overflow_rejected_total: 0,
713            max_depth_seen: 0,
714            overflow_capacity,
715            reclamation_mode,
716            active_epoch_pins: Arc::new(AtomicUsize::new(0)),
717            current_epoch: 0,
718            retired: VecDeque::new(),
719            reclaimed_total: 0,
720            max_epoch_lag: 0,
721            reclamation_latency_max_epochs: 0,
722            fallback_transitions: 0,
723            safe_fallback_backlog_threshold,
724            contention_policy: BravoContentionState::default(),
725            s3fifo,
726        }
727    }
728
729    #[must_use]
730    pub fn len(&self) -> usize {
731        self.fast.len() + self.overflow.len()
732    }
733
734    #[must_use]
735    pub fn is_empty(&self) -> bool {
736        self.fast.is_empty() && self.overflow.is_empty()
737    }
738
739    #[must_use]
740    pub const fn reclamation_mode(&self) -> HostcallQueueMode {
741        self.reclamation_mode
742    }
743
744    pub fn pin_epoch(&self) -> HostcallEpochPin {
745        self.active_epoch_pins.fetch_add(1, Ordering::SeqCst);
746        HostcallEpochPin {
747            active_epoch_pins: Arc::clone(&self.active_epoch_pins),
748        }
749    }
750
751    pub fn clear(&mut self) {
752        while self.fast.pop().is_some() {}
753        self.overflow.clear();
754        self.overflow_enqueued_total = 0;
755        self.overflow_rejected_total = 0;
756        self.max_depth_seen = 0;
757        self.current_epoch = 0;
758        self.retired.clear();
759        self.reclaimed_total = 0;
760        self.max_epoch_lag = 0;
761        self.reclamation_latency_max_epochs = 0;
762        self.fallback_transitions = 0;
763        self.contention_policy = BravoContentionState::default();
764        self.s3fifo = S3FifoState::new(S3FifoConfig::from_capacities(
765            self.fast_capacity,
766            self.overflow_capacity,
767        ));
768    }
769
770    pub fn push_back(&mut self, request: T) -> HostcallQueueEnqueueResult {
771        self.s3fifo.observe_signal(request.tenant_key());
772        let mut request = request;
773
774        // Preserve FIFO across lanes by pinning to overflow once spill begins.
775        if self.overflow.is_empty() {
776            match self.fast.push(request) {
777                Ok(()) => {
778                    self.bump_epoch();
779                    self.try_reclaim();
780                    let depth = self.len();
781                    self.max_depth_seen = self.max_depth_seen.max(depth);
782                    tracing::debug!(
783                        target: "pi.extensions.hostcall_queue",
784                        event = "hostcall_queue.enqueue",
785                        reason = "small_tier",
786                        depth,
787                        overflow_depth = self.overflow.len(),
788                        "hostcall admitted to fast tier"
789                    );
790                    return HostcallQueueEnqueueResult::FastPath { depth };
791                }
792                Err(returned) => request = returned,
793            }
794        }
795
796        let tenant_key = request.tenant_key().map(std::borrow::ToOwned::to_owned);
797
798        if !self.s3fifo.allow_main_admission(tenant_key.as_deref()) {
799            self.overflow_rejected_total = self.overflow_rejected_total.saturating_add(1);
800            tracing::debug!(
801                target: "pi.extensions.hostcall_queue",
802                event = "hostcall_queue.reject",
803                reason = "fairness_budget",
804                depth = self.len(),
805                overflow_depth = self.overflow.len(),
806                s3fifo_mode = ?self.s3fifo.snapshot().mode,
807                "hostcall rejected by S3-FIFO fairness budget"
808            );
809            return HostcallQueueEnqueueResult::Rejected {
810                depth: self.len(),
811                overflow_depth: self.overflow.len(),
812            };
813        }
814
815        if self.overflow.len() < self.overflow_capacity {
816            self.overflow.push_back(request);
817            self.overflow_enqueued_total = self.overflow_enqueued_total.saturating_add(1);
818            self.s3fifo.on_main_enqueued(tenant_key.as_deref());
819            self.bump_epoch();
820            self.try_reclaim();
821            let depth = self.len();
822            let overflow_depth = self.overflow.len();
823            self.max_depth_seen = self.max_depth_seen.max(depth);
824            tracing::debug!(
825                target: "pi.extensions.hostcall_queue",
826                event = "hostcall_queue.enqueue",
827                reason = "main_tier",
828                depth,
829                overflow_depth,
830                "hostcall admitted to overflow/main tier"
831            );
832            return HostcallQueueEnqueueResult::OverflowPath {
833                depth,
834                overflow_depth,
835            };
836        }
837
838        self.overflow_rejected_total = self.overflow_rejected_total.saturating_add(1);
839        self.s3fifo.on_main_overflow_reject(tenant_key.as_deref());
840        tracing::debug!(
841            target: "pi.extensions.hostcall_queue",
842            event = "hostcall_queue.reject",
843            reason = "overflow_capacity",
844            depth = self.len(),
845            overflow_depth = self.overflow.len(),
846            "hostcall rejected because overflow queue reached capacity"
847        );
848        HostcallQueueEnqueueResult::Rejected {
849            depth: self.len(),
850            overflow_depth: self.overflow.len(),
851        }
852    }
853
854    fn pop_front(&mut self) -> Option<T> {
855        let value = if let Some(value) = self.fast.pop() {
856            value
857        } else {
858            let value = self.overflow.pop_front()?;
859            let tenant_key = value.tenant_key().map(std::borrow::ToOwned::to_owned);
860            self.s3fifo.on_main_dequeued(tenant_key.as_deref());
861            value
862        };
863        self.bump_epoch();
864        if self.reclamation_mode == HostcallQueueMode::Ebr {
865            self.retire_for_reclamation(value.clone());
866        }
867        self.try_reclaim();
868        Some(value)
869    }
870
871    pub fn drain_all(&mut self) -> VecDeque<T> {
872        let mut drained = VecDeque::with_capacity(self.len());
873        while let Some(request) = self.pop_front() {
874            drained.push_back(request);
875        }
876        drained
877    }
878
879    /// Explicit reclamation point used by tests and slow-path maintenance.
880    pub fn force_reclaim(&mut self) {
881        self.bump_epoch();
882        self.try_reclaim();
883    }
884
885    /// Immediately disable EBR and switch to the safe fallback mode.
886    pub fn force_safe_fallback(&mut self) {
887        self.transition_to_safe_fallback();
888    }
889
890    /// Feed one deterministic contention observation window into the BRAVO
891    /// policy controller.
892    pub fn observe_contention_window(&mut self, sample: ContentionSample) -> BravoPolicyDecision {
893        self.contention_policy.observe(sample)
894    }
895
896    #[must_use]
897    pub const fn contention_policy_snapshot(&self) -> BravoPolicyTelemetry {
898        self.contention_policy.snapshot()
899    }
900
901    #[must_use]
902    pub fn snapshot(&self) -> HostcallQueueTelemetry {
903        let epoch_lag = self.retired.front().map_or(0, |node| {
904            self.current_epoch.saturating_sub(node.retired_epoch)
905        });
906        let contention = self.contention_policy.snapshot();
907        let s3fifo = self.s3fifo.snapshot();
908
909        HostcallQueueTelemetry {
910            fast_depth: self.fast.len(),
911            overflow_depth: self.overflow.len(),
912            total_depth: self.len(),
913            max_depth_seen: self.max_depth_seen,
914            overflow_enqueued_total: self.overflow_enqueued_total,
915            overflow_rejected_total: self.overflow_rejected_total,
916            fast_capacity: self.fast_capacity,
917            overflow_capacity: self.overflow_capacity,
918            reclamation_mode: self.reclamation_mode,
919            retired_backlog: self.retired.len(),
920            reclaimed_total: self.reclaimed_total,
921            current_epoch: self.current_epoch,
922            epoch_lag,
923            max_epoch_lag: self.max_epoch_lag,
924            reclamation_latency_max_epochs: self.reclamation_latency_max_epochs,
925            fallback_transitions: self.fallback_transitions,
926            active_epoch_pins: self.active_epoch_pins.load(Ordering::SeqCst),
927            bravo_mode: contention.mode,
928            bravo_transitions: contention.transitions,
929            bravo_rollbacks: contention.rollbacks,
930            bravo_consecutive_read_bias_windows: contention.consecutive_read_bias_windows,
931            bravo_writer_recovery_remaining: contention.writer_recovery_remaining,
932            bravo_last_signature: contention.last_signature,
933            s3fifo_mode: s3fifo.mode,
934            s3fifo_fallback_reason: s3fifo.fallback_reason,
935            s3fifo_ghost_depth: s3fifo.ghost_depth,
936            s3fifo_ghost_hits_total: s3fifo.ghost_hits_total,
937            s3fifo_fairness_rejected_total: s3fifo.fairness_rejected_total,
938            s3fifo_signal_samples: s3fifo.signal_samples,
939            s3fifo_signalless_streak: s3fifo.signalless_streak,
940            s3fifo_fallback_transitions: s3fifo.fallback_transitions,
941            s3fifo_tenant_budget: s3fifo.tenant_budget,
942            s3fifo_active_tenants: s3fifo.active_tenants,
943        }
944    }
945
946    const fn bump_epoch(&mut self) {
947        self.current_epoch = self.current_epoch.saturating_add(1);
948    }
949
950    fn retire_for_reclamation(&mut self, value: T) {
951        self.retired.push_back(RetiredNode {
952            value,
953            retired_epoch: self.current_epoch,
954        });
955    }
956
957    fn transition_to_safe_fallback(&mut self) {
958        if self.reclamation_mode == HostcallQueueMode::SafeFallback {
959            return;
960        }
961        self.reclamation_mode = HostcallQueueMode::SafeFallback;
962        self.fallback_transitions = self.fallback_transitions.saturating_add(1);
963        self.retired.clear();
964    }
965
966    fn try_reclaim(&mut self) {
967        if self.reclamation_mode != HostcallQueueMode::Ebr {
968            return;
969        }
970
971        let active = self.active_epoch_pins.load(Ordering::SeqCst);
972        if active > 0 {
973            if let Some(front) = self.retired.front() {
974                let lag = self.current_epoch.saturating_sub(front.retired_epoch);
975                self.max_epoch_lag = self.max_epoch_lag.max(lag);
976            }
977            if self.retired.len() > self.safe_fallback_backlog_threshold {
978                self.transition_to_safe_fallback();
979            }
980            return;
981        }
982
983        while self
984            .retired
985            .front()
986            .is_some_and(|front| front.retired_epoch < self.current_epoch)
987        {
988            if let Some(node) = self.retired.pop_front() {
989                let latency = self.current_epoch.saturating_sub(node.retired_epoch);
990                self.reclamation_latency_max_epochs =
991                    self.reclamation_latency_max_epochs.max(latency);
992                self.reclaimed_total = self.reclaimed_total.saturating_add(1);
993                drop(node.value);
994            }
995        }
996    }
997}
998
999impl<T: Clone + QueueTenant> Default for HostcallRequestQueue<T> {
1000    fn default() -> Self {
1001        Self::with_capacities(HOSTCALL_FAST_RING_CAPACITY, HOSTCALL_OVERFLOW_CAPACITY)
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008
1009    fn deterministic_config() -> BravoContentionConfig {
1010        BravoContentionConfig {
1011            min_total_acquires: 10,
1012            read_dominant_ratio_permille: 750,
1013            mixed_ratio_floor_permille: 400,
1014            mixed_ratio_ceiling_permille: 749,
1015            writer_starvation_wait_us: 4_000,
1016            writer_starvation_timeouts: 2,
1017            max_consecutive_read_bias_windows: 3,
1018            writer_recovery_windows: 2,
1019        }
1020    }
1021
1022    fn sample(
1023        reads: u64,
1024        writes: u64,
1025        read_wait_p95_us: u64,
1026        write_wait_p95_us: u64,
1027        write_timeouts: u64,
1028    ) -> ContentionSample {
1029        ContentionSample {
1030            read_acquires: reads,
1031            write_acquires: writes,
1032            read_wait_p95_us,
1033            write_wait_p95_us,
1034            write_timeouts,
1035        }
1036    }
1037
1038    #[derive(Debug, Clone, PartialEq, Eq)]
1039    struct TenantRequest {
1040        tenant: Option<&'static str>,
1041        value: u8,
1042    }
1043
1044    impl QueueTenant for TenantRequest {
1045        fn tenant_key(&self) -> Option<&str> {
1046            self.tenant
1047        }
1048    }
1049
1050    fn drive_signal_quality_fallback(queue: &mut HostcallRequestQueue<TenantRequest>, seed: u8) {
1051        for offset in 0..96_u8 {
1052            let _ = queue.push_back(TenantRequest {
1053                tenant: None,
1054                value: seed.wrapping_add(offset),
1055            });
1056            let _ = queue.drain_all();
1057        }
1058    }
1059
1060    #[test]
1061    fn hostcall_queue_mode_parsing_supports_ebr_and_fallback() {
1062        assert_eq!(
1063            HostcallQueueMode::parse("ebr"),
1064            Some(HostcallQueueMode::Ebr)
1065        );
1066        assert_eq!(
1067            HostcallQueueMode::parse("safe-fallback"),
1068            Some(HostcallQueueMode::SafeFallback)
1069        );
1070        assert_eq!(HostcallQueueMode::parse("nope"), None);
1071    }
1072
1073    #[test]
1074    fn contention_classifier_flags_writer_starvation_deterministically() {
1075        let config = deterministic_config();
1076        let starvation = sample(90, 10, 100, 10_000, 3);
1077        let signature = BravoContentionState::classify(starvation, config);
1078        assert_eq!(signature, ContentionSignature::WriterStarvationRisk);
1079
1080        let read_dominant = sample(90, 10, 100, 300, 0);
1081        let signature = BravoContentionState::classify(read_dominant, config);
1082        assert_eq!(signature, ContentionSignature::ReadDominant);
1083    }
1084
1085    #[test]
1086    fn bravo_policy_rolls_back_on_starvation_and_recovers() {
1087        let mut policy = BravoContentionState::new(deterministic_config());
1088
1089        let first = policy.observe(sample(80, 20, 120, 500, 0));
1090        assert_eq!(first.previous_mode, BravoBiasMode::Balanced);
1091        assert_eq!(first.next_mode, BravoBiasMode::ReadBiased);
1092        assert_eq!(first.signature, ContentionSignature::ReadDominant);
1093        assert!(first.switched);
1094
1095        let second = policy.observe(sample(85, 15, 100, 8_500, 3));
1096        assert_eq!(second.previous_mode, BravoBiasMode::ReadBiased);
1097        assert_eq!(second.next_mode, BravoBiasMode::WriterRecovery);
1098        assert_eq!(second.signature, ContentionSignature::WriterStarvationRisk);
1099        assert!(second.rollback_triggered);
1100
1101        let third = policy.observe(sample(30, 70, 200, 500, 0));
1102        assert_eq!(third.next_mode, BravoBiasMode::WriterRecovery);
1103        assert!(!third.switched);
1104
1105        let fourth = policy.observe(sample(35, 65, 200, 450, 0));
1106        assert_eq!(fourth.next_mode, BravoBiasMode::Balanced);
1107        assert!(fourth.switched);
1108
1109        let telemetry = policy.snapshot();
1110        assert_eq!(telemetry.mode, BravoBiasMode::Balanced);
1111        assert!(telemetry.rollbacks >= 1);
1112        assert!(telemetry.transitions >= 3);
1113    }
1114
1115    #[test]
1116    fn bravo_writer_recovery_refreshes_on_repeated_starvation_then_exits_cleanly() {
1117        let mut config = deterministic_config();
1118        config.writer_recovery_windows = 3;
1119        let mut policy = BravoContentionState::new(config);
1120
1121        let _ = policy.observe(sample(85, 15, 100, 400, 0));
1122        let first_starvation = policy.observe(sample(80, 20, 100, 9_000, 3));
1123        assert_eq!(first_starvation.previous_mode, BravoBiasMode::ReadBiased);
1124        assert_eq!(first_starvation.next_mode, BravoBiasMode::WriterRecovery);
1125        assert!(first_starvation.rollback_triggered);
1126        assert_eq!(
1127            first_starvation.signature,
1128            ContentionSignature::WriterStarvationRisk
1129        );
1130        assert_eq!(policy.snapshot().writer_recovery_remaining, 3);
1131
1132        let second_starvation = policy.observe(sample(75, 25, 100, 8_500, 2));
1133        assert_eq!(
1134            second_starvation.previous_mode,
1135            BravoBiasMode::WriterRecovery
1136        );
1137        assert_eq!(second_starvation.next_mode, BravoBiasMode::WriterRecovery);
1138        assert!(!second_starvation.switched);
1139        assert!(!second_starvation.rollback_triggered);
1140        assert_eq!(policy.snapshot().writer_recovery_remaining, 3);
1141
1142        let _ = policy.observe(sample(45, 55, 150, 450, 0));
1143        assert_eq!(policy.snapshot().writer_recovery_remaining, 2);
1144        let _ = policy.observe(sample(45, 55, 150, 450, 0));
1145        assert_eq!(policy.snapshot().writer_recovery_remaining, 1);
1146
1147        let exit = policy.observe(sample(45, 55, 150, 450, 0));
1148        assert_eq!(exit.previous_mode, BravoBiasMode::WriterRecovery);
1149        assert_eq!(exit.next_mode, BravoBiasMode::Balanced);
1150        assert!(exit.switched);
1151        assert!(!exit.rollback_triggered);
1152
1153        let telemetry = policy.snapshot();
1154        assert_eq!(telemetry.mode, BravoBiasMode::Balanced);
1155        assert_eq!(telemetry.writer_recovery_remaining, 0);
1156        assert_eq!(
1157            telemetry.last_signature,
1158            ContentionSignature::MixedContention
1159        );
1160        assert_eq!(telemetry.rollbacks, 1);
1161        assert!(telemetry.transitions >= 3);
1162    }
1163
1164    #[test]
1165    fn bravo_zero_window_config_clamps_to_single_recovery_window() {
1166        let mut config = deterministic_config();
1167        config.max_consecutive_read_bias_windows = 0;
1168        config.writer_recovery_windows = 0;
1169        let mut policy = BravoContentionState::new(config);
1170
1171        let first = policy.observe(sample(85, 15, 100, 300, 0));
1172        assert_eq!(first.next_mode, BravoBiasMode::ReadBiased);
1173        assert_eq!(policy.snapshot().consecutive_read_bias_windows, 1);
1174
1175        let second = policy.observe(sample(86, 14, 100, 320, 0));
1176        assert_eq!(second.previous_mode, BravoBiasMode::ReadBiased);
1177        assert_eq!(second.next_mode, BravoBiasMode::WriterRecovery);
1178        assert_eq!(second.signature, ContentionSignature::ReadDominant);
1179        assert!(!second.rollback_triggered);
1180
1181        let recovery = policy.snapshot();
1182        assert_eq!(recovery.writer_recovery_remaining, 1);
1183        assert_eq!(recovery.consecutive_read_bias_windows, 0);
1184
1185        let third = policy.observe(sample(45, 55, 120, 450, 0));
1186        assert_eq!(third.previous_mode, BravoBiasMode::WriterRecovery);
1187        assert_eq!(third.next_mode, BravoBiasMode::Balanced);
1188        assert!(third.switched);
1189        assert_eq!(policy.snapshot().writer_recovery_remaining, 0);
1190    }
1191
1192    #[test]
1193    fn bravo_writer_recovery_starvation_refresh_clamps_to_one_when_config_is_zero() {
1194        let mut config = deterministic_config();
1195        config.writer_recovery_windows = 0;
1196        let mut policy = BravoContentionState::new(config);
1197
1198        let _ = policy.observe(sample(82, 18, 100, 280, 0));
1199        let starvation = policy.observe(sample(75, 25, 100, 8_200, 3));
1200        assert_eq!(starvation.previous_mode, BravoBiasMode::ReadBiased);
1201        assert_eq!(starvation.next_mode, BravoBiasMode::WriterRecovery);
1202        assert!(starvation.rollback_triggered);
1203        assert_eq!(policy.snapshot().writer_recovery_remaining, 1);
1204
1205        let repeated_starvation = policy.observe(sample(70, 30, 100, 8_600, 2));
1206        assert_eq!(
1207            repeated_starvation.previous_mode,
1208            BravoBiasMode::WriterRecovery
1209        );
1210        assert_eq!(repeated_starvation.next_mode, BravoBiasMode::WriterRecovery);
1211        assert!(!repeated_starvation.rollback_triggered);
1212        assert_eq!(
1213            repeated_starvation.signature,
1214            ContentionSignature::WriterStarvationRisk
1215        );
1216        assert_eq!(policy.snapshot().writer_recovery_remaining, 1);
1217
1218        let exit = policy.observe(sample(48, 52, 140, 420, 0));
1219        assert_eq!(exit.previous_mode, BravoBiasMode::WriterRecovery);
1220        assert_eq!(exit.next_mode, BravoBiasMode::Balanced);
1221        assert_eq!(policy.snapshot().writer_recovery_remaining, 0);
1222    }
1223
1224    #[test]
1225    fn bravo_policy_enforces_writer_fairness_budget() {
1226        let mut config = deterministic_config();
1227        config.max_consecutive_read_bias_windows = 2;
1228        config.writer_recovery_windows = 1;
1229        let mut policy = BravoContentionState::new(config);
1230
1231        let _ = policy.observe(sample(80, 20, 100, 250, 0));
1232        let second = policy.observe(sample(85, 15, 100, 260, 0));
1233        assert_eq!(second.next_mode, BravoBiasMode::WriterRecovery);
1234        assert_eq!(second.signature, ContentionSignature::ReadDominant);
1235        assert!(!second.rollback_triggered);
1236
1237        let recovery = policy.observe(sample(40, 60, 150, 400, 0));
1238        assert_eq!(recovery.next_mode, BravoBiasMode::Balanced);
1239
1240        let telemetry = policy.snapshot();
1241        assert_eq!(telemetry.mode, BravoBiasMode::Balanced);
1242        assert_eq!(telemetry.writer_recovery_remaining, 0);
1243    }
1244
1245    #[test]
1246    fn queue_snapshot_exposes_bravo_policy_telemetry() {
1247        let mut queue: HostcallRequestQueue<u8> =
1248            HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::SafeFallback);
1249
1250        let decision = queue.observe_contention_window(sample(70, 30, 120, 350, 0));
1251        assert_eq!(decision.next_mode, BravoBiasMode::ReadBiased);
1252
1253        let snapshot = queue.snapshot();
1254        assert_eq!(snapshot.bravo_mode, BravoBiasMode::ReadBiased);
1255        assert_eq!(
1256            snapshot.bravo_last_signature,
1257            ContentionSignature::MixedContention
1258        );
1259        assert!(snapshot.bravo_transitions >= 1);
1260    }
1261
1262    #[test]
1263    fn queue_spills_to_overflow_with_stable_order() {
1264        let mut queue = HostcallRequestQueue::with_mode(2, 4, HostcallQueueMode::SafeFallback);
1265        assert!(matches!(
1266            queue.push_back(0_u8),
1267            HostcallQueueEnqueueResult::FastPath { .. }
1268        ));
1269        assert!(matches!(
1270            queue.push_back(1_u8),
1271            HostcallQueueEnqueueResult::FastPath { .. }
1272        ));
1273        assert!(matches!(
1274            queue.push_back(2_u8),
1275            HostcallQueueEnqueueResult::OverflowPath { .. }
1276        ));
1277
1278        let snapshot = queue.snapshot();
1279        assert_eq!(snapshot.fast_depth, 2);
1280        assert_eq!(snapshot.overflow_depth, 1);
1281        assert_eq!(snapshot.total_depth, 3);
1282        assert_eq!(snapshot.overflow_enqueued_total, 1);
1283
1284        let drained = queue.drain_all();
1285        assert_eq!(drained.into_iter().collect::<Vec<_>>(), vec![0, 1, 2]);
1286    }
1287
1288    #[test]
1289    fn queue_rejects_when_overflow_capacity_reached() {
1290        let mut queue = HostcallRequestQueue::with_mode(1, 1, HostcallQueueMode::SafeFallback);
1291        assert!(matches!(
1292            queue.push_back(0_u8),
1293            HostcallQueueEnqueueResult::FastPath { .. }
1294        ));
1295        assert!(matches!(
1296            queue.push_back(1_u8),
1297            HostcallQueueEnqueueResult::OverflowPath { .. }
1298        ));
1299        assert!(matches!(
1300            queue.push_back(2_u8),
1301            HostcallQueueEnqueueResult::Rejected { .. }
1302        ));
1303
1304        let snapshot = queue.snapshot();
1305        assert_eq!(snapshot.total_depth, 2);
1306        assert_eq!(snapshot.overflow_depth, 1);
1307        assert_eq!(snapshot.overflow_rejected_total, 1);
1308    }
1309
1310    #[test]
1311    fn s3fifo_fairness_budget_rejects_noisy_tenant_before_overflow_full() {
1312        let mut queue = HostcallRequestQueue::with_mode(1, 3, HostcallQueueMode::SafeFallback);
1313
1314        assert!(matches!(
1315            queue.push_back(TenantRequest {
1316                tenant: Some("ext.noisy"),
1317                value: 0
1318            }),
1319            HostcallQueueEnqueueResult::FastPath { .. }
1320        ));
1321        assert!(matches!(
1322            queue.push_back(TenantRequest {
1323                tenant: Some("ext.noisy"),
1324                value: 1
1325            }),
1326            HostcallQueueEnqueueResult::OverflowPath { .. }
1327        ));
1328        assert!(matches!(
1329            queue.push_back(TenantRequest {
1330                tenant: Some("ext.noisy"),
1331                value: 2
1332            }),
1333            HostcallQueueEnqueueResult::Rejected { .. }
1334        ));
1335
1336        let snapshot = queue.snapshot();
1337        assert_eq!(snapshot.s3fifo_mode, S3FifoMode::Active);
1338        assert_eq!(snapshot.s3fifo_tenant_budget, 1);
1339        assert_eq!(snapshot.s3fifo_fairness_rejected_total, 1);
1340        assert_eq!(snapshot.overflow_rejected_total, 1);
1341    }
1342
1343    #[test]
1344    fn s3fifo_ghost_hits_allow_reentry_after_prior_rejection() {
1345        let mut queue = HostcallRequestQueue::with_mode(1, 3, HostcallQueueMode::SafeFallback);
1346
1347        assert!(matches!(
1348            queue.push_back(TenantRequest {
1349                tenant: Some("ext.ghost"),
1350                value: 0
1351            }),
1352            HostcallQueueEnqueueResult::FastPath { .. }
1353        ));
1354        assert!(matches!(
1355            queue.push_back(TenantRequest {
1356                tenant: Some("ext.ghost"),
1357                value: 1
1358            }),
1359            HostcallQueueEnqueueResult::OverflowPath { .. }
1360        ));
1361        assert!(matches!(
1362            queue.push_back(TenantRequest {
1363                tenant: Some("ext.ghost"),
1364                value: 2
1365            }),
1366            HostcallQueueEnqueueResult::Rejected { .. }
1367        ));
1368
1369        let drained = queue.drain_all();
1370        assert_eq!(
1371            drained
1372                .into_iter()
1373                .map(|entry| entry.value)
1374                .collect::<Vec<_>>(),
1375            vec![0, 1]
1376        );
1377
1378        assert!(matches!(
1379            queue.push_back(TenantRequest {
1380                tenant: Some("ext.ghost"),
1381                value: 3
1382            }),
1383            HostcallQueueEnqueueResult::FastPath { .. }
1384        ));
1385        assert!(matches!(
1386            queue.push_back(TenantRequest {
1387                tenant: Some("ext.ghost"),
1388                value: 4
1389            }),
1390            HostcallQueueEnqueueResult::OverflowPath { .. }
1391        ));
1392        assert!(matches!(
1393            queue.push_back(TenantRequest {
1394                tenant: Some("ext.ghost"),
1395                value: 5
1396            }),
1397            HostcallQueueEnqueueResult::OverflowPath { .. }
1398        ));
1399
1400        let snapshot = queue.snapshot();
1401        assert!(snapshot.s3fifo_ghost_hits_total >= 1);
1402        assert_eq!(snapshot.s3fifo_fairness_rejected_total, 1);
1403    }
1404
1405    #[test]
1406    fn s3fifo_falls_back_to_conservative_fifo_when_signal_is_insufficient() {
1407        let mut queue = HostcallRequestQueue::with_mode(1, 2, HostcallQueueMode::SafeFallback);
1408
1409        for value in 0..96_u8 {
1410            let _ = queue.push_back(value);
1411            let _ = queue.drain_all();
1412        }
1413
1414        let snapshot = queue.snapshot();
1415        assert_eq!(snapshot.s3fifo_mode, S3FifoMode::ConservativeFifo);
1416        assert_eq!(
1417            snapshot.s3fifo_fallback_reason,
1418            Some(S3FifoFallbackReason::SignalQualityInsufficient)
1419        );
1420        assert!(snapshot.s3fifo_fallback_transitions >= 1);
1421    }
1422
1423    #[test]
1424    fn s3fifo_fairness_fallback_reason_and_transition_count_stay_stable() {
1425        let mut queue = HostcallRequestQueue::with_mode(1, 1, HostcallQueueMode::SafeFallback);
1426
1427        assert!(matches!(
1428            queue.push_back(0_u8),
1429            HostcallQueueEnqueueResult::FastPath { .. }
1430        ));
1431        assert!(matches!(
1432            queue.push_back(1_u8),
1433            HostcallQueueEnqueueResult::OverflowPath { .. }
1434        ));
1435
1436        for value in 2_u8..40_u8 {
1437            let _ = queue.push_back(value);
1438        }
1439
1440        let fallback = queue.snapshot();
1441        assert_eq!(fallback.s3fifo_mode, S3FifoMode::ConservativeFifo);
1442        assert_eq!(
1443            fallback.s3fifo_fallback_reason,
1444            Some(S3FifoFallbackReason::FairnessInstability)
1445        );
1446        assert_eq!(fallback.s3fifo_fallback_transitions, 1);
1447        let fairness_rejections_before = fallback.s3fifo_fairness_rejected_total;
1448
1449        for value in 40_u8..80_u8 {
1450            let _ = queue.push_back(value);
1451            let _ = queue.drain_all();
1452        }
1453
1454        let stable = queue.snapshot();
1455        assert_eq!(stable.s3fifo_mode, S3FifoMode::ConservativeFifo);
1456        assert_eq!(
1457            stable.s3fifo_fallback_reason,
1458            Some(S3FifoFallbackReason::FairnessInstability)
1459        );
1460        assert_eq!(stable.s3fifo_fallback_transitions, 1);
1461        assert_eq!(
1462            stable.s3fifo_fairness_rejected_total,
1463            fairness_rejections_before
1464        );
1465    }
1466
1467    #[test]
1468    fn s3fifo_signal_quality_fallback_reason_does_not_flip_under_later_pressure() {
1469        let mut queue = HostcallRequestQueue::with_mode(1, 2, HostcallQueueMode::SafeFallback);
1470
1471        for value in 0..96_u8 {
1472            let _ = queue.push_back(TenantRequest {
1473                tenant: None,
1474                value,
1475            });
1476            let _ = queue.drain_all();
1477        }
1478
1479        let fallback = queue.snapshot();
1480        assert_eq!(fallback.s3fifo_mode, S3FifoMode::ConservativeFifo);
1481        assert_eq!(
1482            fallback.s3fifo_fallback_reason,
1483            Some(S3FifoFallbackReason::SignalQualityInsufficient)
1484        );
1485        assert_eq!(fallback.s3fifo_fallback_transitions, 1);
1486        let fairness_rejections_before = fallback.s3fifo_fairness_rejected_total;
1487
1488        for value in 0_u8..32_u8 {
1489            let _ = queue.push_back(TenantRequest {
1490                tenant: Some("ext.noisy"),
1491                value,
1492            });
1493            let _ = queue.push_back(TenantRequest {
1494                tenant: Some("ext.noisy"),
1495                value: value.saturating_add(1),
1496            });
1497            let _ = queue.drain_all();
1498        }
1499
1500        let stable = queue.snapshot();
1501        assert_eq!(stable.s3fifo_mode, S3FifoMode::ConservativeFifo);
1502        assert_eq!(
1503            stable.s3fifo_fallback_reason,
1504            Some(S3FifoFallbackReason::SignalQualityInsufficient)
1505        );
1506        assert_eq!(stable.s3fifo_fallback_transitions, 1);
1507        assert_eq!(
1508            stable.s3fifo_fairness_rejected_total,
1509            fairness_rejections_before
1510        );
1511    }
1512
1513    #[test]
1514    fn ebr_reclamation_tracks_lag_and_latency() {
1515        let mut queue = HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::Ebr);
1516        let pin = queue.pin_epoch();
1517        assert!(matches!(
1518            queue.push_back(1_u8),
1519            HostcallQueueEnqueueResult::FastPath { .. }
1520        ));
1521        assert!(matches!(
1522            queue.push_back(2_u8),
1523            HostcallQueueEnqueueResult::FastPath { .. }
1524        ));
1525        let drained = queue.drain_all();
1526        assert_eq!(drained.len(), 2);
1527
1528        queue.force_reclaim();
1529        let blocked = queue.snapshot();
1530        assert_eq!(blocked.reclamation_mode, HostcallQueueMode::Ebr);
1531        assert_eq!(blocked.retired_backlog, 2);
1532        assert_eq!(blocked.reclaimed_total, 0);
1533        assert!(blocked.epoch_lag >= 1);
1534
1535        drop(pin);
1536        queue.force_reclaim();
1537        let reclaimed = queue.snapshot();
1538        assert_eq!(reclaimed.retired_backlog, 0);
1539        assert!(reclaimed.reclaimed_total >= 2);
1540        assert!(reclaimed.reclamation_latency_max_epochs >= 1);
1541    }
1542
1543    #[test]
1544    fn safe_fallback_mode_skips_retirement_tracking() {
1545        let mut queue = HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::SafeFallback);
1546        let _pin = queue.pin_epoch();
1547        assert!(matches!(
1548            queue.push_back(1_u8),
1549            HostcallQueueEnqueueResult::FastPath { .. }
1550        ));
1551        let _ = queue.drain_all();
1552        queue.force_reclaim();
1553
1554        let snapshot = queue.snapshot();
1555        assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::SafeFallback);
1556        assert_eq!(snapshot.retired_backlog, 0);
1557        assert_eq!(snapshot.reclaimed_total, 0);
1558    }
1559
1560    #[test]
1561    fn ebr_auto_falls_back_when_retired_backlog_runs_away() {
1562        let mut queue = HostcallRequestQueue::with_mode(1, 1, HostcallQueueMode::Ebr);
1563        let _pin = queue.pin_epoch();
1564
1565        // Keep a pin active while repeatedly retiring nodes so backlog exceeds
1566        // the safety threshold and forces fallback.
1567        for value in 0..64_u8 {
1568            let result = queue.push_back(value);
1569            assert!(
1570                !matches!(result, HostcallQueueEnqueueResult::Rejected { .. }),
1571                "queue should accept one item per cycle"
1572            );
1573            let drained = queue.drain_all();
1574            assert_eq!(drained.len(), 1);
1575            queue.force_reclaim();
1576        }
1577
1578        let snapshot = queue.snapshot();
1579        assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::SafeFallback);
1580        assert!(snapshot.fallback_transitions >= 1);
1581    }
1582
1583    #[test]
1584    fn ebr_stress_cycle_keeps_retired_backlog_bounded() {
1585        let mut queue = HostcallRequestQueue::with_mode(4, 8, HostcallQueueMode::Ebr);
1586
1587        for value in 0..10_000_u32 {
1588            let _ = queue.push_back(value);
1589            let drained = queue.drain_all();
1590            assert_eq!(drained.len(), 1);
1591            if value % 64 == 0 {
1592                queue.force_reclaim();
1593            }
1594        }
1595
1596        queue.force_reclaim();
1597        let snapshot = queue.snapshot();
1598        assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::Ebr);
1599        assert_eq!(snapshot.retired_backlog, 0);
1600        assert!(snapshot.reclaimed_total >= 10_000);
1601    }
1602
1603    // ── Additional public API coverage ──
1604
1605    #[test]
1606    fn contention_sample_total_acquires_sums_reads_and_writes() {
1607        let s = sample(100, 50, 0, 0, 0);
1608        assert_eq!(s.total_acquires(), 150);
1609
1610        let zero = ContentionSample::default();
1611        assert_eq!(zero.total_acquires(), 0);
1612
1613        let max = ContentionSample {
1614            read_acquires: u64::MAX,
1615            write_acquires: 1,
1616            ..Default::default()
1617        };
1618        assert_eq!(max.total_acquires(), u64::MAX, "saturating_add on overflow");
1619    }
1620
1621    #[test]
1622    fn contention_sample_read_ratio_permille_values() {
1623        // All reads → 1000 permille
1624        let all_reads = sample(100, 0, 0, 0, 0);
1625        assert_eq!(all_reads.read_ratio_permille(), 1000);
1626
1627        // All writes → 0 permille
1628        let all_writes = sample(0, 100, 0, 0, 0);
1629        assert_eq!(all_writes.read_ratio_permille(), 0);
1630
1631        // Balanced → ~500 permille
1632        let balanced = sample(50, 50, 0, 0, 0);
1633        assert_eq!(balanced.read_ratio_permille(), 500);
1634
1635        // Zero total → 0 (no division by zero)
1636        let zero = ContentionSample::default();
1637        assert_eq!(zero.read_ratio_permille(), 0);
1638
1639        // 75% reads → 750 permille
1640        let three_quarter = sample(75, 25, 0, 0, 0);
1641        assert_eq!(three_quarter.read_ratio_permille(), 750);
1642    }
1643
1644    #[test]
1645    fn bravo_contention_state_mode_accessor() {
1646        let state = BravoContentionState::new(deterministic_config());
1647        assert_eq!(state.mode(), BravoBiasMode::Balanced);
1648
1649        let mut state2 = BravoContentionState::new(deterministic_config());
1650        // Feed read-dominant sample to transition to ReadBiased
1651        let _ = state2.observe(sample(80, 10, 0, 0, 0));
1652        assert_eq!(state2.mode(), BravoBiasMode::ReadBiased);
1653    }
1654
1655    #[test]
1656    fn s3fifo_config_from_capacities_computes_fields() {
1657        let config = S3FifoConfig::from_capacities(256, 2048);
1658        // tenant_budget = overflow/2 = 1024
1659        assert_eq!(config.tenant_budget, 1024);
1660        // ghost_capacity = (256+2048)*2 = 4608, above min of 16
1661        assert_eq!(config.ghost_capacity, 4608);
1662        assert_eq!(config.min_signal_samples, 32);
1663        assert_eq!(config.max_signalless_streak, 64);
1664        assert_eq!(config.unstable_rejection_streak, 16);
1665
1666        // Small capacities → enforced minimums
1667        let small = S3FifoConfig::from_capacities(1, 1);
1668        assert_eq!(small.tenant_budget, 1); // max(1/2, 1) = max(0, 1) = 1
1669        assert_eq!(small.ghost_capacity, 16); // max((1+1)*2, 16) = max(4, 16) = 16
1670    }
1671
1672    #[test]
1673    fn queue_with_capacities_creates_functional_queue() {
1674        let mut queue: HostcallRequestQueue<u32> = HostcallRequestQueue::with_capacities(4, 8);
1675        assert!(queue.is_empty());
1676        assert_eq!(queue.len(), 0);
1677
1678        let result = queue.push_back(42);
1679        assert!(matches!(
1680            result,
1681            HostcallQueueEnqueueResult::FastPath { .. }
1682        ));
1683        assert!(!queue.is_empty());
1684        assert_eq!(queue.len(), 1);
1685
1686        let snapshot = queue.snapshot();
1687        assert_eq!(snapshot.fast_capacity, 4);
1688        assert_eq!(snapshot.overflow_capacity, 8);
1689    }
1690
1691    #[test]
1692    fn queue_clear_resets_state() {
1693        let mut queue = HostcallRequestQueue::with_mode(2, 4, HostcallQueueMode::Ebr);
1694        let _ = queue.push_back(1_u8);
1695        let _ = queue.push_back(2_u8);
1696        let _ = queue.push_back(3_u8); // spills to overflow
1697        assert!(!queue.is_empty());
1698
1699        queue.clear();
1700        assert!(queue.is_empty());
1701        assert_eq!(queue.len(), 0);
1702        let snapshot = queue.snapshot();
1703        assert_eq!(snapshot.max_depth_seen, 0);
1704        assert_eq!(snapshot.overflow_enqueued_total, 0);
1705        assert_eq!(snapshot.overflow_rejected_total, 0);
1706    }
1707
1708    #[test]
1709    fn queue_clear_resets_s3fifo_fallback_and_counters() {
1710        let mut queue = HostcallRequestQueue::with_mode(1, 3, HostcallQueueMode::SafeFallback);
1711
1712        assert!(matches!(
1713            queue.push_back(TenantRequest {
1714                tenant: Some("ext.reset"),
1715                value: 0
1716            }),
1717            HostcallQueueEnqueueResult::FastPath { .. }
1718        ));
1719        assert!(matches!(
1720            queue.push_back(TenantRequest {
1721                tenant: Some("ext.reset"),
1722                value: 1
1723            }),
1724            HostcallQueueEnqueueResult::OverflowPath { .. }
1725        ));
1726        assert!(matches!(
1727            queue.push_back(TenantRequest {
1728                tenant: Some("ext.reset"),
1729                value: 2
1730            }),
1731            HostcallQueueEnqueueResult::Rejected { .. }
1732        ));
1733
1734        let _ = queue.drain_all();
1735
1736        assert!(matches!(
1737            queue.push_back(TenantRequest {
1738                tenant: Some("ext.reset"),
1739                value: 3
1740            }),
1741            HostcallQueueEnqueueResult::FastPath { .. }
1742        ));
1743        assert!(matches!(
1744            queue.push_back(TenantRequest {
1745                tenant: Some("ext.reset"),
1746                value: 4
1747            }),
1748            HostcallQueueEnqueueResult::OverflowPath { .. }
1749        ));
1750        assert!(matches!(
1751            queue.push_back(TenantRequest {
1752                tenant: Some("ext.reset"),
1753                value: 5
1754            }),
1755            HostcallQueueEnqueueResult::OverflowPath { .. }
1756        ));
1757
1758        drive_signal_quality_fallback(&mut queue, 6);
1759
1760        let before_clear = queue.snapshot();
1761        assert_eq!(before_clear.s3fifo_mode, S3FifoMode::ConservativeFifo);
1762        assert_eq!(
1763            before_clear.s3fifo_fallback_reason,
1764            Some(S3FifoFallbackReason::SignalQualityInsufficient)
1765        );
1766        assert_eq!(before_clear.s3fifo_fallback_transitions, 1);
1767        assert!(before_clear.s3fifo_ghost_hits_total >= 1);
1768        assert!(before_clear.s3fifo_fairness_rejected_total >= 1);
1769        assert!(before_clear.s3fifo_signal_samples >= 1);
1770
1771        queue.clear();
1772
1773        let cleared = queue.snapshot();
1774        assert_eq!(cleared.s3fifo_mode, S3FifoMode::Active);
1775        assert_eq!(cleared.s3fifo_fallback_reason, None);
1776        assert_eq!(cleared.s3fifo_ghost_depth, 0);
1777        assert_eq!(cleared.s3fifo_ghost_hits_total, 0);
1778        assert_eq!(cleared.s3fifo_fairness_rejected_total, 0);
1779        assert_eq!(cleared.s3fifo_signal_samples, 0);
1780        assert_eq!(cleared.s3fifo_signalless_streak, 0);
1781        assert_eq!(cleared.s3fifo_fallback_transitions, 0);
1782        assert_eq!(cleared.s3fifo_active_tenants, 0);
1783    }
1784
1785    #[test]
1786    fn queue_clear_allows_s3fifo_fallback_to_retrigger_from_clean_state() {
1787        let mut queue = HostcallRequestQueue::with_mode(1, 2, HostcallQueueMode::SafeFallback);
1788
1789        drive_signal_quality_fallback(&mut queue, 0);
1790
1791        let first = queue.snapshot();
1792        assert_eq!(first.s3fifo_mode, S3FifoMode::ConservativeFifo);
1793        assert_eq!(
1794            first.s3fifo_fallback_reason,
1795            Some(S3FifoFallbackReason::SignalQualityInsufficient)
1796        );
1797        assert_eq!(first.s3fifo_fallback_transitions, 1);
1798
1799        queue.clear();
1800
1801        let after_clear = queue.snapshot();
1802        assert_eq!(after_clear.s3fifo_mode, S3FifoMode::Active);
1803        assert_eq!(after_clear.s3fifo_fallback_reason, None);
1804        assert_eq!(after_clear.s3fifo_fallback_transitions, 0);
1805
1806        drive_signal_quality_fallback(&mut queue, 120);
1807
1808        let retriggered = queue.snapshot();
1809        assert_eq!(retriggered.s3fifo_mode, S3FifoMode::ConservativeFifo);
1810        assert_eq!(
1811            retriggered.s3fifo_fallback_reason,
1812            Some(S3FifoFallbackReason::SignalQualityInsufficient)
1813        );
1814        assert_eq!(retriggered.s3fifo_fallback_transitions, 1);
1815    }
1816
1817    #[test]
1818    fn queue_reclamation_mode_accessor() {
1819        let ebr = HostcallRequestQueue::<u8>::with_mode(2, 2, HostcallQueueMode::Ebr);
1820        assert_eq!(ebr.reclamation_mode(), HostcallQueueMode::Ebr);
1821
1822        let fallback = HostcallRequestQueue::<u8>::with_mode(2, 2, HostcallQueueMode::SafeFallback);
1823        assert_eq!(fallback.reclamation_mode(), HostcallQueueMode::SafeFallback);
1824    }
1825
1826    #[test]
1827    fn queue_force_safe_fallback_switches_mode() {
1828        let mut queue: HostcallRequestQueue<u8> =
1829            HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::Ebr);
1830        assert_eq!(queue.reclamation_mode(), HostcallQueueMode::Ebr);
1831
1832        queue.force_safe_fallback();
1833        assert_eq!(queue.reclamation_mode(), HostcallQueueMode::SafeFallback);
1834        let snapshot = queue.snapshot();
1835        assert_eq!(snapshot.fallback_transitions, 1);
1836
1837        // Calling again is idempotent (no extra transition counted)
1838        queue.force_safe_fallback();
1839        let snapshot2 = queue.snapshot();
1840        assert_eq!(snapshot2.fallback_transitions, 1);
1841    }
1842
1843    #[test]
1844    fn queue_default_uses_standard_capacities() {
1845        let queue: HostcallRequestQueue<u8> = HostcallRequestQueue::default();
1846        let snapshot = queue.snapshot();
1847        assert_eq!(snapshot.fast_capacity, HOSTCALL_FAST_RING_CAPACITY);
1848        assert_eq!(snapshot.overflow_capacity, HOSTCALL_OVERFLOW_CAPACITY);
1849    }
1850
1851    // ── Property tests ──
1852
1853    mod proptest_bravo {
1854        use super::*;
1855        use proptest::prelude::*;
1856
1857        fn arb_sample() -> impl Strategy<Value = ContentionSample> {
1858            (
1859                0..10_000u64,
1860                0..10_000u64,
1861                0..50_000u64,
1862                0..50_000u64,
1863                0..100u64,
1864            )
1865                .prop_map(|(reads, writes, r_wait, w_wait, w_timeouts)| {
1866                    ContentionSample {
1867                        read_acquires: reads,
1868                        write_acquires: writes,
1869                        read_wait_p95_us: r_wait,
1870                        write_wait_p95_us: w_wait,
1871                        write_timeouts: w_timeouts,
1872                    }
1873                })
1874        }
1875
1876        fn arb_config() -> impl Strategy<Value = BravoContentionConfig> {
1877            (
1878                1..200u64,
1879                500..1000u32,
1880                100..500u32,
1881                500..999u32,
1882                1000..20_000u64,
1883                1..10u64,
1884                1..10u32,
1885                1..5u32,
1886            )
1887                .prop_map(
1888                    |(
1889                        min_acq,
1890                        rd_ratio,
1891                        mixed_floor,
1892                        mixed_ceil,
1893                        starve_wait,
1894                        starve_to,
1895                        max_rb,
1896                        wr_windows,
1897                    )| {
1898                        BravoContentionConfig {
1899                            min_total_acquires: min_acq,
1900                            read_dominant_ratio_permille: rd_ratio,
1901                            mixed_ratio_floor_permille: mixed_floor,
1902                            mixed_ratio_ceiling_permille: mixed_ceil.max(mixed_floor),
1903                            writer_starvation_wait_us: starve_wait,
1904                            writer_starvation_timeouts: starve_to,
1905                            max_consecutive_read_bias_windows: max_rb,
1906                            writer_recovery_windows: wr_windows,
1907                        }
1908                    },
1909                )
1910        }
1911
1912        proptest! {
1913            #[test]
1914            fn classify_is_deterministic(
1915                sample in arb_sample(),
1916                cfg in arb_config(),
1917            ) {
1918                let s1 = BravoContentionState::classify(sample, cfg);
1919                let s2 = BravoContentionState::classify(sample, cfg);
1920                assert_eq!(s1, s2, "same inputs must produce same signature");
1921            }
1922
1923            #[test]
1924            fn read_ratio_permille_bounded_0_to_1000(
1925                reads in 0..u64::MAX / 2,
1926                writes in 0..u64::MAX / 2,
1927            ) {
1928                let s = ContentionSample {
1929                    read_acquires: reads,
1930                    write_acquires: writes,
1931                    ..Default::default()
1932                };
1933                let ratio = s.read_ratio_permille();
1934                assert!(ratio <= 1000, "ratio was {ratio}, expected <= 1000");
1935            }
1936
1937            #[test]
1938            fn total_acquires_at_least_each_component(
1939                reads in 0..u64::MAX / 2,
1940                writes in 0..u64::MAX / 2,
1941            ) {
1942                let s = ContentionSample {
1943                    read_acquires: reads,
1944                    write_acquires: writes,
1945                    ..Default::default()
1946                };
1947                let total = s.total_acquires();
1948                assert!(total >= reads, "total must be >= reads");
1949                assert!(total >= writes, "total must be >= writes");
1950            }
1951
1952            #[test]
1953            fn mode_always_valid_after_observation_sequence(
1954                cfg in arb_config(),
1955                samples in prop::collection::vec(arb_sample(), 1..30),
1956            ) {
1957                let mut state = BravoContentionState::new(cfg);
1958                for sample in &samples {
1959                    let decision = state.observe(*sample);
1960                    assert!(
1961                        matches!(
1962                            decision.next_mode,
1963                            BravoBiasMode::Balanced
1964                                | BravoBiasMode::ReadBiased
1965                                | BravoBiasMode::WriterRecovery
1966                        ),
1967                        "mode must be a valid variant"
1968                    );
1969                    assert_eq!(decision.switched, decision.previous_mode != decision.next_mode);
1970                }
1971            }
1972
1973            #[test]
1974            fn counters_monotonically_nondecreasing(
1975                cfg in arb_config(),
1976                samples in prop::collection::vec(arb_sample(), 1..30),
1977            ) {
1978                let mut state = BravoContentionState::new(cfg);
1979                let mut prev_transitions = 0u64;
1980                let mut prev_rollbacks = 0u64;
1981                let mut prev_windows = 0u64;
1982
1983                for sample in &samples {
1984                    let _ = state.observe(*sample);
1985                    let snap = state.snapshot();
1986                    assert!(snap.transitions >= prev_transitions);
1987                    assert!(snap.rollbacks >= prev_rollbacks);
1988                    assert!(snap.windows_observed >= prev_windows);
1989                    prev_transitions = snap.transitions;
1990                    prev_rollbacks = snap.rollbacks;
1991                    prev_windows = snap.windows_observed;
1992                }
1993            }
1994
1995            #[test]
1996            fn windows_observed_equals_call_count(
1997                cfg in arb_config(),
1998                samples in prop::collection::vec(arb_sample(), 1..30),
1999            ) {
2000                let mut state = BravoContentionState::new(cfg);
2001                for sample in &samples {
2002                    let _ = state.observe(*sample);
2003                }
2004                let snap = state.snapshot();
2005                assert_eq!(
2006                    snap.windows_observed,
2007                    samples.len() as u64,
2008                    "windows_observed must equal number of observe() calls"
2009                );
2010            }
2011
2012            #[test]
2013            fn initial_state_is_balanced_with_zero_counters(
2014                cfg in arb_config(),
2015            ) {
2016                let state = BravoContentionState::new(cfg);
2017                let snap = state.snapshot();
2018                assert_eq!(snap.mode, BravoBiasMode::Balanced);
2019                assert_eq!(snap.transitions, 0);
2020                assert_eq!(snap.rollbacks, 0);
2021                assert_eq!(snap.windows_observed, 0);
2022                assert_eq!(snap.consecutive_read_bias_windows, 0);
2023                assert_eq!(snap.writer_recovery_remaining, 0);
2024            }
2025        }
2026    }
2027
2028    #[test]
2029    #[ignore = "loom model checker SIGSEGV: spin-wait loops exhaust state space"]
2030    fn loom_epoch_pin_blocks_reclamation_until_release() {
2031        use loom::sync::atomic::{AtomicBool, Ordering as LoomOrdering};
2032        use loom::sync::{Arc, Mutex};
2033        use loom::thread;
2034
2035        loom::model(|| {
2036            let queue = Arc::new(Mutex::new(HostcallRequestQueue::with_mode(
2037                1,
2038                2,
2039                HostcallQueueMode::Ebr,
2040            )));
2041            let pin_ready = Arc::new(AtomicBool::new(false));
2042            let release_pin = Arc::new(AtomicBool::new(false));
2043
2044            let queue_for_pin = Arc::clone(&queue);
2045            let pin_ready_for_thread = Arc::clone(&pin_ready);
2046            let release_pin_for_thread = Arc::clone(&release_pin);
2047            let pin_thread = thread::spawn(move || {
2048                let pin = queue_for_pin.lock().expect("lock queue").pin_epoch();
2049                pin_ready_for_thread.store(true, LoomOrdering::SeqCst);
2050                while !release_pin_for_thread.load(LoomOrdering::SeqCst) {
2051                    thread::yield_now();
2052                }
2053                drop(pin);
2054            });
2055
2056            let queue_for_worker = Arc::clone(&queue);
2057            let pin_ready_for_worker = Arc::clone(&pin_ready);
2058            let worker = thread::spawn(move || {
2059                while !pin_ready_for_worker.load(LoomOrdering::SeqCst) {
2060                    thread::yield_now();
2061                }
2062
2063                let mut queue = queue_for_worker.lock().expect("lock queue");
2064                let _ = queue.push_back(1_u8);
2065                let _ = queue.push_back(2_u8);
2066                let drained = queue.drain_all();
2067                assert_eq!(drained.len(), 2);
2068                queue.force_reclaim();
2069                let snapshot = queue.snapshot();
2070                assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::Ebr);
2071                assert!(snapshot.retired_backlog >= 2);
2072                assert_eq!(snapshot.reclaimed_total, 0);
2073                drop(queue);
2074            });
2075
2076            worker.join().expect("worker join");
2077            release_pin.store(true, LoomOrdering::SeqCst);
2078            pin_thread.join().expect("pin thread join");
2079
2080            let mut queue = queue.lock().expect("lock queue");
2081            queue.force_reclaim();
2082            let snapshot = queue.snapshot();
2083            assert_eq!(snapshot.retired_backlog, 0);
2084            assert!(snapshot.reclaimed_total >= 2);
2085            drop(queue);
2086        });
2087    }
2088
2089    #[test]
2090    #[ignore = "loom model checker SIGSEGV: needs cfg(loom) feature gate"]
2091    fn loom_concurrent_enqueue_dequeue_keeps_values_unique() {
2092        use loom::sync::{Arc, Mutex};
2093        use loom::thread;
2094
2095        loom::model(|| {
2096            let queue = Arc::new(Mutex::new(HostcallRequestQueue::with_mode(
2097                2,
2098                2,
2099                HostcallQueueMode::SafeFallback,
2100            )));
2101
2102            let queue_a = Arc::clone(&queue);
2103            let producer_a = thread::spawn(move || {
2104                let mut queue = queue_a.lock().expect("lock queue");
2105                let _ = queue.push_back(10_u8);
2106            });
2107
2108            let queue_b = Arc::clone(&queue);
2109            let producer_b = thread::spawn(move || {
2110                let mut queue = queue_b.lock().expect("lock queue");
2111                let _ = queue.push_back(11_u8);
2112            });
2113
2114            producer_a.join().expect("producer_a join");
2115            producer_b.join().expect("producer_b join");
2116
2117            let mut queue = queue.lock().expect("lock queue");
2118            let drained = queue.drain_all();
2119            drop(queue);
2120            let mut values = drained.into_iter().collect::<Vec<_>>();
2121            values.sort_unstable();
2122            assert_eq!(values, vec![10, 11]);
2123        });
2124    }
2125}