1pub use crate::hostcall_s3_fifo::S3FifoFallbackReason;
8use crossbeam_queue::ArrayQueue;
9use std::collections::{BTreeMap, VecDeque};
10use std::sync::Arc;
11use std::sync::atomic::{AtomicUsize, Ordering};
12
13pub const HOSTCALL_FAST_RING_CAPACITY: usize = 256;
14pub const HOSTCALL_OVERFLOW_CAPACITY: usize = 2_048;
15const SAFE_FALLBACK_BACKLOG_MULTIPLIER: usize = 8;
16const SAFE_FALLBACK_BACKLOG_MIN: usize = 32;
17const S3_FIFO_GHOST_CAPACITY_MULTIPLIER: usize = 2;
18const S3_FIFO_GHOST_CAPACITY_MIN: usize = 16;
19const S3_FIFO_MIN_SIGNAL_SAMPLES: u64 = 32;
20const S3_FIFO_MAX_SIGNALLESS_STREAK: u64 = 64;
21const S3_FIFO_UNSTABLE_REJECTION_STREAK: u64 = 16;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum BravoBiasMode {
26 Balanced,
28 ReadBiased,
30 WriterRecovery,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum ContentionSignature {
37 InsufficientSamples,
39 ReadDominant,
41 MixedContention,
43 WriterStarvationRisk,
45 WriteDominant,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
51pub struct ContentionSample {
52 pub read_acquires: u64,
53 pub write_acquires: u64,
54 pub read_wait_p95_us: u64,
55 pub write_wait_p95_us: u64,
56 pub write_timeouts: u64,
57}
58
59impl ContentionSample {
60 #[must_use]
61 pub const fn total_acquires(self) -> u64 {
62 self.read_acquires.saturating_add(self.write_acquires)
63 }
64
65 #[must_use]
66 pub fn read_ratio_permille(self) -> u32 {
67 let total = self.total_acquires();
68 if total == 0 {
69 return 0;
70 }
71 let numerator = self.read_acquires.saturating_mul(1_000);
72 let ratio = numerator / total;
73 u32::try_from(ratio).unwrap_or(1_000)
74 }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub struct BravoContentionConfig {
80 pub min_total_acquires: u64,
81 pub read_dominant_ratio_permille: u32,
82 pub mixed_ratio_floor_permille: u32,
83 pub mixed_ratio_ceiling_permille: u32,
84 pub writer_starvation_wait_us: u64,
85 pub writer_starvation_timeouts: u64,
86 pub max_consecutive_read_bias_windows: u32,
87 pub writer_recovery_windows: u32,
88}
89
90impl Default for BravoContentionConfig {
91 fn default() -> Self {
92 Self {
93 min_total_acquires: 32,
94 read_dominant_ratio_permille: 800,
95 mixed_ratio_floor_permille: 450,
96 mixed_ratio_ceiling_permille: 799,
97 writer_starvation_wait_us: 8_000,
98 writer_starvation_timeouts: 2,
99 max_consecutive_read_bias_windows: 5,
100 writer_recovery_windows: 2,
101 }
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub struct BravoPolicyDecision {
108 pub previous_mode: BravoBiasMode,
109 pub next_mode: BravoBiasMode,
110 pub signature: ContentionSignature,
111 pub switched: bool,
112 pub rollback_triggered: bool,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub struct BravoPolicyTelemetry {
118 pub mode: BravoBiasMode,
119 pub transitions: u64,
120 pub rollbacks: u64,
121 pub windows_observed: u64,
122 pub consecutive_read_bias_windows: u32,
123 pub writer_recovery_remaining: u32,
124 pub last_signature: ContentionSignature,
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub struct BravoContentionState {
130 config: BravoContentionConfig,
131 mode: BravoBiasMode,
132 transitions: u64,
133 rollbacks: u64,
134 windows_observed: u64,
135 consecutive_read_bias_windows: u32,
136 writer_recovery_remaining: u32,
137 last_signature: ContentionSignature,
138}
139
140impl BravoContentionState {
141 #[must_use]
142 pub const fn new(config: BravoContentionConfig) -> Self {
143 Self {
144 config,
145 mode: BravoBiasMode::Balanced,
146 transitions: 0,
147 rollbacks: 0,
148 windows_observed: 0,
149 consecutive_read_bias_windows: 0,
150 writer_recovery_remaining: 0,
151 last_signature: ContentionSignature::InsufficientSamples,
152 }
153 }
154
155 #[must_use]
156 pub const fn mode(self) -> BravoBiasMode {
157 self.mode
158 }
159
160 #[must_use]
161 pub const fn snapshot(self) -> BravoPolicyTelemetry {
162 BravoPolicyTelemetry {
163 mode: self.mode,
164 transitions: self.transitions,
165 rollbacks: self.rollbacks,
166 windows_observed: self.windows_observed,
167 consecutive_read_bias_windows: self.consecutive_read_bias_windows,
168 writer_recovery_remaining: self.writer_recovery_remaining,
169 last_signature: self.last_signature,
170 }
171 }
172
173 pub fn observe(&mut self, sample: ContentionSample) -> BravoPolicyDecision {
174 let previous_mode = self.mode;
175 let signature = Self::classify(sample, self.config);
176 self.windows_observed = self.windows_observed.saturating_add(1);
177
178 let mut rollback_triggered = false;
179 match self.mode {
180 BravoBiasMode::Balanced => match signature {
181 ContentionSignature::WriterStarvationRisk => {
182 self.mode = BravoBiasMode::WriterRecovery;
183 self.writer_recovery_remaining = self.config.writer_recovery_windows.max(1);
184 self.consecutive_read_bias_windows = 0;
185 self.rollbacks = self.rollbacks.saturating_add(1);
186 rollback_triggered = true;
187 }
188 ContentionSignature::ReadDominant | ContentionSignature::MixedContention => {
189 self.mode = BravoBiasMode::ReadBiased;
190 self.consecutive_read_bias_windows = 1;
191 }
192 ContentionSignature::InsufficientSamples | ContentionSignature::WriteDominant => {
193 self.consecutive_read_bias_windows = 0;
194 }
195 },
196 BravoBiasMode::ReadBiased => {
197 self.consecutive_read_bias_windows =
198 self.consecutive_read_bias_windows.saturating_add(1);
199
200 let starvation = signature == ContentionSignature::WriterStarvationRisk;
201 let fairness_budget_exhausted = self.consecutive_read_bias_windows
202 >= self.config.max_consecutive_read_bias_windows.max(1);
203
204 if starvation || fairness_budget_exhausted {
205 self.mode = BravoBiasMode::WriterRecovery;
206 self.writer_recovery_remaining = self.config.writer_recovery_windows.max(1);
207 self.consecutive_read_bias_windows = 0;
208 rollback_triggered = starvation;
209 if starvation {
210 self.rollbacks = self.rollbacks.saturating_add(1);
211 }
212 } else if matches!(
213 signature,
214 ContentionSignature::InsufficientSamples | ContentionSignature::WriteDominant
215 ) {
216 self.mode = BravoBiasMode::Balanced;
217 self.consecutive_read_bias_windows = 0;
218 }
219 }
220 BravoBiasMode::WriterRecovery => {
221 self.consecutive_read_bias_windows = 0;
222 if signature == ContentionSignature::WriterStarvationRisk {
223 self.writer_recovery_remaining = self.config.writer_recovery_windows.max(1);
224 } else if self.writer_recovery_remaining > 0 {
225 self.writer_recovery_remaining -= 1;
226 }
227 if self.writer_recovery_remaining == 0 {
228 self.mode = BravoBiasMode::Balanced;
229 }
230 }
231 }
232
233 if self.mode != previous_mode {
234 self.transitions = self.transitions.saturating_add(1);
235 }
236 self.last_signature = signature;
237
238 BravoPolicyDecision {
239 previous_mode,
240 next_mode: self.mode,
241 signature,
242 switched: self.mode != previous_mode,
243 rollback_triggered,
244 }
245 }
246
247 #[must_use]
248 pub fn classify(
249 sample: ContentionSample,
250 config: BravoContentionConfig,
251 ) -> ContentionSignature {
252 if sample.total_acquires() < config.min_total_acquires {
253 return ContentionSignature::InsufficientSamples;
254 }
255
256 if sample.write_wait_p95_us >= config.writer_starvation_wait_us
257 || sample.write_timeouts >= config.writer_starvation_timeouts
258 {
259 return ContentionSignature::WriterStarvationRisk;
260 }
261
262 let read_ratio = sample.read_ratio_permille();
263 let read_dominant_floor = config.read_dominant_ratio_permille.min(1_000);
264 if read_ratio >= read_dominant_floor {
265 return ContentionSignature::ReadDominant;
266 }
267
268 let mixed_floor = config.mixed_ratio_floor_permille.min(1_000);
269 let mixed_ceiling = config
270 .mixed_ratio_ceiling_permille
271 .clamp(mixed_floor, 1_000);
272 if read_ratio >= mixed_floor && read_ratio <= mixed_ceiling {
273 return ContentionSignature::MixedContention;
274 }
275
276 ContentionSignature::WriteDominant
277 }
278}
279
280impl Default for BravoContentionState {
281 fn default() -> Self {
282 Self::new(BravoContentionConfig::default())
283 }
284}
285
286pub trait QueueTenant {
291 #[must_use]
292 fn tenant_key(&self) -> Option<&str> {
293 None
294 }
295}
296
297macro_rules! impl_queue_tenant_none {
298 ($($ty:ty),+ $(,)?) => {
299 $(
300 impl QueueTenant for $ty {}
301 )+
302 };
303}
304
305impl_queue_tenant_none!(
306 (),
307 bool,
308 char,
309 u8,
310 u16,
311 u32,
312 u64,
313 usize,
314 i8,
315 i16,
316 i32,
317 i64,
318 isize,
319 String,
320);
321
322#[derive(Debug, Clone, Copy, PartialEq, Eq)]
324pub enum S3FifoMode {
325 Active,
326 ConservativeFifo,
327}
328
329#[derive(Debug, Clone, Copy, PartialEq, Eq)]
331pub struct S3FifoConfig {
332 pub tenant_budget: usize,
333 pub ghost_capacity: usize,
334 pub min_signal_samples: u64,
335 pub max_signalless_streak: u64,
336 pub unstable_rejection_streak: u64,
337}
338
339impl S3FifoConfig {
340 #[must_use]
341 pub fn from_capacities(fast_capacity: usize, overflow_capacity: usize) -> Self {
342 let tenant_budget = (overflow_capacity / 2).max(1);
343 let ghost_capacity = fast_capacity
344 .saturating_add(overflow_capacity)
345 .saturating_mul(S3_FIFO_GHOST_CAPACITY_MULTIPLIER)
346 .max(S3_FIFO_GHOST_CAPACITY_MIN);
347 Self {
348 tenant_budget,
349 ghost_capacity,
350 min_signal_samples: S3_FIFO_MIN_SIGNAL_SAMPLES,
351 max_signalless_streak: S3_FIFO_MAX_SIGNALLESS_STREAK,
352 unstable_rejection_streak: S3_FIFO_UNSTABLE_REJECTION_STREAK,
353 }
354 }
355}
356
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub struct S3FifoTelemetry {
360 pub mode: S3FifoMode,
361 pub fallback_reason: Option<S3FifoFallbackReason>,
362 pub ghost_depth: usize,
363 pub ghost_hits_total: u64,
364 pub fairness_rejected_total: u64,
365 pub signal_samples: u64,
366 pub signalless_streak: u64,
367 pub fallback_transitions: u64,
368 pub tenant_budget: usize,
369 pub active_tenants: usize,
370}
371
372#[derive(Debug, Clone)]
373struct S3FifoState {
374 config: S3FifoConfig,
375 mode: S3FifoMode,
376 fallback_reason: Option<S3FifoFallbackReason>,
377 ghost_order: BTreeMap<u64, String>,
378 ghost_lookup: BTreeMap<String, u64>,
379 ghost_gen: u64,
380 tenant_backlog: BTreeMap<String, usize>,
381 ghost_hits_total: u64,
382 fairness_rejected_total: u64,
383 signal_samples: u64,
384 signalless_streak: u64,
385 unstable_rejection_streak: u64,
386 fallback_transitions: u64,
387}
388
389impl S3FifoState {
390 #[must_use]
391 const fn new(config: S3FifoConfig) -> Self {
392 Self {
393 config,
394 mode: S3FifoMode::Active,
395 fallback_reason: None,
396 ghost_order: BTreeMap::new(),
397 ghost_lookup: BTreeMap::new(),
398 ghost_gen: 0,
399 tenant_backlog: BTreeMap::new(),
400 ghost_hits_total: 0,
401 fairness_rejected_total: 0,
402 signal_samples: 0,
403 signalless_streak: 0,
404 unstable_rejection_streak: 0,
405 fallback_transitions: 0,
406 }
407 }
408
409 fn observe_signal(&mut self, tenant_key: Option<&str>) {
410 if self.mode != S3FifoMode::Active {
411 return;
412 }
413 if tenant_key.is_some() {
414 self.signal_samples = self.signal_samples.saturating_add(1);
415 self.signalless_streak = 0;
416 } else {
417 self.signalless_streak = self.signalless_streak.saturating_add(1);
418 if self.signalless_streak >= self.config.max_signalless_streak
419 && self.signal_samples < self.config.min_signal_samples
420 {
421 self.transition_to_fallback(S3FifoFallbackReason::SignalQualityInsufficient);
422 }
423 }
424 }
425
426 fn allow_main_admission(&mut self, tenant_key: Option<&str>) -> bool {
427 if self.mode != S3FifoMode::Active {
428 return true;
429 }
430 let Some(tenant_key) = tenant_key else {
431 return true;
432 };
433 let backlog = self.tenant_backlog.get(tenant_key).copied().unwrap_or(0);
434 if backlog < self.config.tenant_budget {
435 return true;
436 }
437 if self.consume_ghost_hit(tenant_key) {
438 self.unstable_rejection_streak = 0;
439 return true;
440 }
441
442 self.fairness_rejected_total = self.fairness_rejected_total.saturating_add(1);
443 self.unstable_rejection_streak = self.unstable_rejection_streak.saturating_add(1);
444 self.record_ghost(tenant_key);
445 if self.unstable_rejection_streak >= self.config.unstable_rejection_streak {
446 self.transition_to_fallback(S3FifoFallbackReason::FairnessInstability);
447 }
448 false
449 }
450
451 fn on_main_enqueued(&mut self, tenant_key: Option<&str>) {
452 if self.mode != S3FifoMode::Active {
453 return;
454 }
455 if let Some(tenant_key) = tenant_key {
456 let entry = self
457 .tenant_backlog
458 .entry(tenant_key.to_string())
459 .or_insert(0);
460 *entry = entry.saturating_add(1);
461 }
462 self.unstable_rejection_streak = 0;
463 }
464
465 fn on_main_dequeued(&mut self, tenant_key: Option<&str>) {
466 if self.mode != S3FifoMode::Active {
467 return;
468 }
469 if let Some(tenant_key) = tenant_key {
470 if let Some(backlog) = self.tenant_backlog.get_mut(tenant_key) {
471 *backlog = backlog.saturating_sub(1);
472 if *backlog == 0 {
473 self.tenant_backlog.remove(tenant_key);
474 }
475 }
476 self.record_ghost(tenant_key);
477 }
478 }
479
480 fn on_main_overflow_reject(&mut self, tenant_key: Option<&str>) {
481 if self.mode != S3FifoMode::Active {
482 return;
483 }
484 if let Some(tenant_key) = tenant_key {
485 self.record_ghost(tenant_key);
486 }
487 self.unstable_rejection_streak = self.unstable_rejection_streak.saturating_add(1);
488 if self.unstable_rejection_streak >= self.config.unstable_rejection_streak {
489 self.transition_to_fallback(S3FifoFallbackReason::FairnessInstability);
490 }
491 }
492
493 fn transition_to_fallback(&mut self, reason: S3FifoFallbackReason) {
494 if self.mode == S3FifoMode::ConservativeFifo {
495 return;
496 }
497 self.mode = S3FifoMode::ConservativeFifo;
498 self.fallback_reason = Some(reason);
499 self.fallback_transitions = self.fallback_transitions.saturating_add(1);
500 self.ghost_order.clear();
501 self.ghost_lookup.clear();
502 self.ghost_gen = 0;
503 self.tenant_backlog.clear();
504 }
505
506 fn consume_ghost_hit(&mut self, tenant_key: &str) -> bool {
507 if let Some(generation) = self.ghost_lookup.remove(tenant_key) {
508 self.ghost_order.remove(&generation);
509 self.ghost_hits_total = self.ghost_hits_total.saturating_add(1);
510 true
511 } else {
512 false
513 }
514 }
515
516 fn record_ghost(&mut self, tenant_key: &str) {
517 if tenant_key.is_empty() {
518 return;
519 }
520
521 if self.ghost_gen == u64::MAX {
523 self.ghost_gen = 0;
524 self.ghost_order.clear();
525 self.ghost_lookup.clear();
526 }
527
528 self.ghost_gen = self.ghost_gen.saturating_add(1);
529
530 if let Some(gen_mut) = self.ghost_lookup.get_mut(tenant_key) {
531 let old_gen = *gen_mut;
532 *gen_mut = self.ghost_gen;
533 if let Some(k_reused) = self.ghost_order.remove(&old_gen) {
534 self.ghost_order.insert(self.ghost_gen, k_reused);
535 }
536 } else {
537 let key_owned = tenant_key.to_string();
538 self.ghost_lookup.insert(key_owned.clone(), self.ghost_gen);
539 self.ghost_order.insert(self.ghost_gen, key_owned);
540 }
541
542 while self.ghost_lookup.len() > self.config.ghost_capacity {
543 if let Some((_, popped_key)) = self.ghost_order.pop_first() {
544 self.ghost_lookup.remove(&popped_key);
545 } else {
546 break;
547 }
548 }
549 }
550
551 #[must_use]
552 fn snapshot(&self) -> S3FifoTelemetry {
553 S3FifoTelemetry {
554 mode: self.mode,
555 fallback_reason: self.fallback_reason,
556 ghost_depth: self.ghost_lookup.len(),
557 ghost_hits_total: self.ghost_hits_total,
558 fairness_rejected_total: self.fairness_rejected_total,
559 signal_samples: self.signal_samples,
560 signalless_streak: self.signalless_streak,
561 fallback_transitions: self.fallback_transitions,
562 tenant_budget: self.config.tenant_budget,
563 active_tenants: self.tenant_backlog.len(),
564 }
565 }
566}
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum HostcallQueueMode {
570 Ebr,
572 SafeFallback,
574}
575
576impl HostcallQueueMode {
577 #[must_use]
578 pub fn from_env() -> Self {
579 std::env::var("PI_HOSTCALL_QUEUE_RECLAIMER")
580 .ok()
581 .as_deref()
582 .and_then(Self::parse)
583 .unwrap_or(Self::SafeFallback)
584 }
585
586 fn parse(value: &str) -> Option<Self> {
587 match value.trim().to_ascii_lowercase().as_str() {
588 "ebr" | "epoch" | "epoch-based" => Some(Self::Ebr),
589 "fallback" | "safe-fallback" | "off" | "disabled" | "legacy" => {
590 Some(Self::SafeFallback)
591 }
592 _ => None,
593 }
594 }
595}
596
597#[derive(Debug, Clone, Copy, PartialEq, Eq)]
598pub enum HostcallQueueEnqueueResult {
599 FastPath { depth: usize },
600 OverflowPath { depth: usize, overflow_depth: usize },
601 Rejected { depth: usize, overflow_depth: usize },
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub struct HostcallQueueTelemetry {
606 pub fast_depth: usize,
607 pub overflow_depth: usize,
608 pub total_depth: usize,
609 pub max_depth_seen: usize,
610 pub overflow_enqueued_total: u64,
611 pub overflow_rejected_total: u64,
612 pub fast_capacity: usize,
613 pub overflow_capacity: usize,
614 pub reclamation_mode: HostcallQueueMode,
615 pub retired_backlog: usize,
616 pub reclaimed_total: u64,
617 pub current_epoch: u64,
618 pub epoch_lag: u64,
619 pub max_epoch_lag: u64,
620 pub reclamation_latency_max_epochs: u64,
621 pub fallback_transitions: u64,
622 pub active_epoch_pins: usize,
623 pub bravo_mode: BravoBiasMode,
624 pub bravo_transitions: u64,
625 pub bravo_rollbacks: u64,
626 pub bravo_consecutive_read_bias_windows: u32,
627 pub bravo_writer_recovery_remaining: u32,
628 pub bravo_last_signature: ContentionSignature,
629 pub s3fifo_mode: S3FifoMode,
630 pub s3fifo_fallback_reason: Option<S3FifoFallbackReason>,
631 pub s3fifo_ghost_depth: usize,
632 pub s3fifo_ghost_hits_total: u64,
633 pub s3fifo_fairness_rejected_total: u64,
634 pub s3fifo_signal_samples: u64,
635 pub s3fifo_signalless_streak: u64,
636 pub s3fifo_fallback_transitions: u64,
637 pub s3fifo_tenant_budget: usize,
638 pub s3fifo_active_tenants: usize,
639}
640
641#[derive(Debug)]
642struct RetiredNode<T> {
643 value: T,
644 retired_epoch: u64,
645}
646
647#[derive(Debug)]
648pub struct HostcallEpochPin {
649 active_epoch_pins: Arc<AtomicUsize>,
650}
651
652impl Drop for HostcallEpochPin {
653 fn drop(&mut self) {
654 let previous = self.active_epoch_pins.fetch_sub(1, Ordering::SeqCst);
655 debug_assert!(previous > 0, "epoch pin underflow");
656 }
657}
658
659#[derive(Debug)]
660pub struct HostcallRequestQueue<T: Clone + QueueTenant> {
661 fast: ArrayQueue<T>,
662 fast_capacity: usize,
663 overflow: VecDeque<T>,
664 overflow_enqueued_total: u64,
665 overflow_rejected_total: u64,
666 max_depth_seen: usize,
667 overflow_capacity: usize,
668 reclamation_mode: HostcallQueueMode,
669 active_epoch_pins: Arc<AtomicUsize>,
670 current_epoch: u64,
671 retired: VecDeque<RetiredNode<T>>,
672 reclaimed_total: u64,
673 max_epoch_lag: u64,
674 reclamation_latency_max_epochs: u64,
675 fallback_transitions: u64,
676 safe_fallback_backlog_threshold: usize,
677 contention_policy: BravoContentionState,
678 s3fifo: S3FifoState,
679}
680
681impl<T: Clone + QueueTenant> HostcallRequestQueue<T> {
682 #[must_use]
683 pub fn with_capacities(fast_capacity: usize, overflow_capacity: usize) -> Self {
684 Self::with_mode(
685 fast_capacity,
686 overflow_capacity,
687 HostcallQueueMode::from_env(),
688 )
689 }
690
691 #[must_use]
692 pub fn with_mode(
693 fast_capacity: usize,
694 overflow_capacity: usize,
695 reclamation_mode: HostcallQueueMode,
696 ) -> Self {
697 let fast_capacity = fast_capacity.max(1);
698 let overflow_capacity = overflow_capacity.max(1);
699 let safe_fallback_backlog_threshold = fast_capacity
700 .saturating_add(overflow_capacity)
701 .saturating_mul(SAFE_FALLBACK_BACKLOG_MULTIPLIER)
702 .max(SAFE_FALLBACK_BACKLOG_MIN);
703 let s3fifo = S3FifoState::new(S3FifoConfig::from_capacities(
704 fast_capacity,
705 overflow_capacity,
706 ));
707 Self {
708 fast: ArrayQueue::new(fast_capacity),
709 fast_capacity,
710 overflow: VecDeque::new(),
711 overflow_enqueued_total: 0,
712 overflow_rejected_total: 0,
713 max_depth_seen: 0,
714 overflow_capacity,
715 reclamation_mode,
716 active_epoch_pins: Arc::new(AtomicUsize::new(0)),
717 current_epoch: 0,
718 retired: VecDeque::new(),
719 reclaimed_total: 0,
720 max_epoch_lag: 0,
721 reclamation_latency_max_epochs: 0,
722 fallback_transitions: 0,
723 safe_fallback_backlog_threshold,
724 contention_policy: BravoContentionState::default(),
725 s3fifo,
726 }
727 }
728
729 #[must_use]
730 pub fn len(&self) -> usize {
731 self.fast.len() + self.overflow.len()
732 }
733
734 #[must_use]
735 pub fn is_empty(&self) -> bool {
736 self.fast.is_empty() && self.overflow.is_empty()
737 }
738
739 #[must_use]
740 pub const fn reclamation_mode(&self) -> HostcallQueueMode {
741 self.reclamation_mode
742 }
743
744 pub fn pin_epoch(&self) -> HostcallEpochPin {
745 self.active_epoch_pins.fetch_add(1, Ordering::SeqCst);
746 HostcallEpochPin {
747 active_epoch_pins: Arc::clone(&self.active_epoch_pins),
748 }
749 }
750
751 pub fn clear(&mut self) {
752 while self.fast.pop().is_some() {}
753 self.overflow.clear();
754 self.overflow_enqueued_total = 0;
755 self.overflow_rejected_total = 0;
756 self.max_depth_seen = 0;
757 self.current_epoch = 0;
758 self.retired.clear();
759 self.reclaimed_total = 0;
760 self.max_epoch_lag = 0;
761 self.reclamation_latency_max_epochs = 0;
762 self.fallback_transitions = 0;
763 self.contention_policy = BravoContentionState::default();
764 self.s3fifo = S3FifoState::new(S3FifoConfig::from_capacities(
765 self.fast_capacity,
766 self.overflow_capacity,
767 ));
768 }
769
770 pub fn push_back(&mut self, request: T) -> HostcallQueueEnqueueResult {
771 self.s3fifo.observe_signal(request.tenant_key());
772 let mut request = request;
773
774 if self.overflow.is_empty() {
776 match self.fast.push(request) {
777 Ok(()) => {
778 self.bump_epoch();
779 self.try_reclaim();
780 let depth = self.len();
781 self.max_depth_seen = self.max_depth_seen.max(depth);
782 tracing::debug!(
783 target: "pi.extensions.hostcall_queue",
784 event = "hostcall_queue.enqueue",
785 reason = "small_tier",
786 depth,
787 overflow_depth = self.overflow.len(),
788 "hostcall admitted to fast tier"
789 );
790 return HostcallQueueEnqueueResult::FastPath { depth };
791 }
792 Err(returned) => request = returned,
793 }
794 }
795
796 let tenant_key = request.tenant_key().map(std::borrow::ToOwned::to_owned);
797
798 if !self.s3fifo.allow_main_admission(tenant_key.as_deref()) {
799 self.overflow_rejected_total = self.overflow_rejected_total.saturating_add(1);
800 tracing::debug!(
801 target: "pi.extensions.hostcall_queue",
802 event = "hostcall_queue.reject",
803 reason = "fairness_budget",
804 depth = self.len(),
805 overflow_depth = self.overflow.len(),
806 s3fifo_mode = ?self.s3fifo.snapshot().mode,
807 "hostcall rejected by S3-FIFO fairness budget"
808 );
809 return HostcallQueueEnqueueResult::Rejected {
810 depth: self.len(),
811 overflow_depth: self.overflow.len(),
812 };
813 }
814
815 if self.overflow.len() < self.overflow_capacity {
816 self.overflow.push_back(request);
817 self.overflow_enqueued_total = self.overflow_enqueued_total.saturating_add(1);
818 self.s3fifo.on_main_enqueued(tenant_key.as_deref());
819 self.bump_epoch();
820 self.try_reclaim();
821 let depth = self.len();
822 let overflow_depth = self.overflow.len();
823 self.max_depth_seen = self.max_depth_seen.max(depth);
824 tracing::debug!(
825 target: "pi.extensions.hostcall_queue",
826 event = "hostcall_queue.enqueue",
827 reason = "main_tier",
828 depth,
829 overflow_depth,
830 "hostcall admitted to overflow/main tier"
831 );
832 return HostcallQueueEnqueueResult::OverflowPath {
833 depth,
834 overflow_depth,
835 };
836 }
837
838 self.overflow_rejected_total = self.overflow_rejected_total.saturating_add(1);
839 self.s3fifo.on_main_overflow_reject(tenant_key.as_deref());
840 tracing::debug!(
841 target: "pi.extensions.hostcall_queue",
842 event = "hostcall_queue.reject",
843 reason = "overflow_capacity",
844 depth = self.len(),
845 overflow_depth = self.overflow.len(),
846 "hostcall rejected because overflow queue reached capacity"
847 );
848 HostcallQueueEnqueueResult::Rejected {
849 depth: self.len(),
850 overflow_depth: self.overflow.len(),
851 }
852 }
853
854 fn pop_front(&mut self) -> Option<T> {
855 let value = if let Some(value) = self.fast.pop() {
856 value
857 } else {
858 let value = self.overflow.pop_front()?;
859 let tenant_key = value.tenant_key().map(std::borrow::ToOwned::to_owned);
860 self.s3fifo.on_main_dequeued(tenant_key.as_deref());
861 value
862 };
863 self.bump_epoch();
864 if self.reclamation_mode == HostcallQueueMode::Ebr {
865 self.retire_for_reclamation(value.clone());
866 }
867 self.try_reclaim();
868 Some(value)
869 }
870
871 pub fn drain_all(&mut self) -> VecDeque<T> {
872 let mut drained = VecDeque::with_capacity(self.len());
873 while let Some(request) = self.pop_front() {
874 drained.push_back(request);
875 }
876 drained
877 }
878
879 pub fn force_reclaim(&mut self) {
881 self.bump_epoch();
882 self.try_reclaim();
883 }
884
885 pub fn force_safe_fallback(&mut self) {
887 self.transition_to_safe_fallback();
888 }
889
890 pub fn observe_contention_window(&mut self, sample: ContentionSample) -> BravoPolicyDecision {
893 self.contention_policy.observe(sample)
894 }
895
896 #[must_use]
897 pub const fn contention_policy_snapshot(&self) -> BravoPolicyTelemetry {
898 self.contention_policy.snapshot()
899 }
900
901 #[must_use]
902 pub fn snapshot(&self) -> HostcallQueueTelemetry {
903 let epoch_lag = self.retired.front().map_or(0, |node| {
904 self.current_epoch.saturating_sub(node.retired_epoch)
905 });
906 let contention = self.contention_policy.snapshot();
907 let s3fifo = self.s3fifo.snapshot();
908
909 HostcallQueueTelemetry {
910 fast_depth: self.fast.len(),
911 overflow_depth: self.overflow.len(),
912 total_depth: self.len(),
913 max_depth_seen: self.max_depth_seen,
914 overflow_enqueued_total: self.overflow_enqueued_total,
915 overflow_rejected_total: self.overflow_rejected_total,
916 fast_capacity: self.fast_capacity,
917 overflow_capacity: self.overflow_capacity,
918 reclamation_mode: self.reclamation_mode,
919 retired_backlog: self.retired.len(),
920 reclaimed_total: self.reclaimed_total,
921 current_epoch: self.current_epoch,
922 epoch_lag,
923 max_epoch_lag: self.max_epoch_lag,
924 reclamation_latency_max_epochs: self.reclamation_latency_max_epochs,
925 fallback_transitions: self.fallback_transitions,
926 active_epoch_pins: self.active_epoch_pins.load(Ordering::SeqCst),
927 bravo_mode: contention.mode,
928 bravo_transitions: contention.transitions,
929 bravo_rollbacks: contention.rollbacks,
930 bravo_consecutive_read_bias_windows: contention.consecutive_read_bias_windows,
931 bravo_writer_recovery_remaining: contention.writer_recovery_remaining,
932 bravo_last_signature: contention.last_signature,
933 s3fifo_mode: s3fifo.mode,
934 s3fifo_fallback_reason: s3fifo.fallback_reason,
935 s3fifo_ghost_depth: s3fifo.ghost_depth,
936 s3fifo_ghost_hits_total: s3fifo.ghost_hits_total,
937 s3fifo_fairness_rejected_total: s3fifo.fairness_rejected_total,
938 s3fifo_signal_samples: s3fifo.signal_samples,
939 s3fifo_signalless_streak: s3fifo.signalless_streak,
940 s3fifo_fallback_transitions: s3fifo.fallback_transitions,
941 s3fifo_tenant_budget: s3fifo.tenant_budget,
942 s3fifo_active_tenants: s3fifo.active_tenants,
943 }
944 }
945
946 const fn bump_epoch(&mut self) {
947 self.current_epoch = self.current_epoch.saturating_add(1);
948 }
949
950 fn retire_for_reclamation(&mut self, value: T) {
951 self.retired.push_back(RetiredNode {
952 value,
953 retired_epoch: self.current_epoch,
954 });
955 }
956
957 fn transition_to_safe_fallback(&mut self) {
958 if self.reclamation_mode == HostcallQueueMode::SafeFallback {
959 return;
960 }
961 self.reclamation_mode = HostcallQueueMode::SafeFallback;
962 self.fallback_transitions = self.fallback_transitions.saturating_add(1);
963 self.retired.clear();
964 }
965
966 fn try_reclaim(&mut self) {
967 if self.reclamation_mode != HostcallQueueMode::Ebr {
968 return;
969 }
970
971 let active = self.active_epoch_pins.load(Ordering::SeqCst);
972 if active > 0 {
973 if let Some(front) = self.retired.front() {
974 let lag = self.current_epoch.saturating_sub(front.retired_epoch);
975 self.max_epoch_lag = self.max_epoch_lag.max(lag);
976 }
977 if self.retired.len() > self.safe_fallback_backlog_threshold {
978 self.transition_to_safe_fallback();
979 }
980 return;
981 }
982
983 while self
984 .retired
985 .front()
986 .is_some_and(|front| front.retired_epoch < self.current_epoch)
987 {
988 if let Some(node) = self.retired.pop_front() {
989 let latency = self.current_epoch.saturating_sub(node.retired_epoch);
990 self.reclamation_latency_max_epochs =
991 self.reclamation_latency_max_epochs.max(latency);
992 self.reclaimed_total = self.reclaimed_total.saturating_add(1);
993 drop(node.value);
994 }
995 }
996 }
997}
998
999impl<T: Clone + QueueTenant> Default for HostcallRequestQueue<T> {
1000 fn default() -> Self {
1001 Self::with_capacities(HOSTCALL_FAST_RING_CAPACITY, HOSTCALL_OVERFLOW_CAPACITY)
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008
1009 fn deterministic_config() -> BravoContentionConfig {
1010 BravoContentionConfig {
1011 min_total_acquires: 10,
1012 read_dominant_ratio_permille: 750,
1013 mixed_ratio_floor_permille: 400,
1014 mixed_ratio_ceiling_permille: 749,
1015 writer_starvation_wait_us: 4_000,
1016 writer_starvation_timeouts: 2,
1017 max_consecutive_read_bias_windows: 3,
1018 writer_recovery_windows: 2,
1019 }
1020 }
1021
1022 fn sample(
1023 reads: u64,
1024 writes: u64,
1025 read_wait_p95_us: u64,
1026 write_wait_p95_us: u64,
1027 write_timeouts: u64,
1028 ) -> ContentionSample {
1029 ContentionSample {
1030 read_acquires: reads,
1031 write_acquires: writes,
1032 read_wait_p95_us,
1033 write_wait_p95_us,
1034 write_timeouts,
1035 }
1036 }
1037
1038 #[derive(Debug, Clone, PartialEq, Eq)]
1039 struct TenantRequest {
1040 tenant: Option<&'static str>,
1041 value: u8,
1042 }
1043
1044 impl QueueTenant for TenantRequest {
1045 fn tenant_key(&self) -> Option<&str> {
1046 self.tenant
1047 }
1048 }
1049
1050 fn drive_signal_quality_fallback(queue: &mut HostcallRequestQueue<TenantRequest>, seed: u8) {
1051 for offset in 0..96_u8 {
1052 let _ = queue.push_back(TenantRequest {
1053 tenant: None,
1054 value: seed.wrapping_add(offset),
1055 });
1056 let _ = queue.drain_all();
1057 }
1058 }
1059
1060 #[test]
1061 fn hostcall_queue_mode_parsing_supports_ebr_and_fallback() {
1062 assert_eq!(
1063 HostcallQueueMode::parse("ebr"),
1064 Some(HostcallQueueMode::Ebr)
1065 );
1066 assert_eq!(
1067 HostcallQueueMode::parse("safe-fallback"),
1068 Some(HostcallQueueMode::SafeFallback)
1069 );
1070 assert_eq!(HostcallQueueMode::parse("nope"), None);
1071 }
1072
1073 #[test]
1074 fn contention_classifier_flags_writer_starvation_deterministically() {
1075 let config = deterministic_config();
1076 let starvation = sample(90, 10, 100, 10_000, 3);
1077 let signature = BravoContentionState::classify(starvation, config);
1078 assert_eq!(signature, ContentionSignature::WriterStarvationRisk);
1079
1080 let read_dominant = sample(90, 10, 100, 300, 0);
1081 let signature = BravoContentionState::classify(read_dominant, config);
1082 assert_eq!(signature, ContentionSignature::ReadDominant);
1083 }
1084
1085 #[test]
1086 fn bravo_policy_rolls_back_on_starvation_and_recovers() {
1087 let mut policy = BravoContentionState::new(deterministic_config());
1088
1089 let first = policy.observe(sample(80, 20, 120, 500, 0));
1090 assert_eq!(first.previous_mode, BravoBiasMode::Balanced);
1091 assert_eq!(first.next_mode, BravoBiasMode::ReadBiased);
1092 assert_eq!(first.signature, ContentionSignature::ReadDominant);
1093 assert!(first.switched);
1094
1095 let second = policy.observe(sample(85, 15, 100, 8_500, 3));
1096 assert_eq!(second.previous_mode, BravoBiasMode::ReadBiased);
1097 assert_eq!(second.next_mode, BravoBiasMode::WriterRecovery);
1098 assert_eq!(second.signature, ContentionSignature::WriterStarvationRisk);
1099 assert!(second.rollback_triggered);
1100
1101 let third = policy.observe(sample(30, 70, 200, 500, 0));
1102 assert_eq!(third.next_mode, BravoBiasMode::WriterRecovery);
1103 assert!(!third.switched);
1104
1105 let fourth = policy.observe(sample(35, 65, 200, 450, 0));
1106 assert_eq!(fourth.next_mode, BravoBiasMode::Balanced);
1107 assert!(fourth.switched);
1108
1109 let telemetry = policy.snapshot();
1110 assert_eq!(telemetry.mode, BravoBiasMode::Balanced);
1111 assert!(telemetry.rollbacks >= 1);
1112 assert!(telemetry.transitions >= 3);
1113 }
1114
1115 #[test]
1116 fn bravo_writer_recovery_refreshes_on_repeated_starvation_then_exits_cleanly() {
1117 let mut config = deterministic_config();
1118 config.writer_recovery_windows = 3;
1119 let mut policy = BravoContentionState::new(config);
1120
1121 let _ = policy.observe(sample(85, 15, 100, 400, 0));
1122 let first_starvation = policy.observe(sample(80, 20, 100, 9_000, 3));
1123 assert_eq!(first_starvation.previous_mode, BravoBiasMode::ReadBiased);
1124 assert_eq!(first_starvation.next_mode, BravoBiasMode::WriterRecovery);
1125 assert!(first_starvation.rollback_triggered);
1126 assert_eq!(
1127 first_starvation.signature,
1128 ContentionSignature::WriterStarvationRisk
1129 );
1130 assert_eq!(policy.snapshot().writer_recovery_remaining, 3);
1131
1132 let second_starvation = policy.observe(sample(75, 25, 100, 8_500, 2));
1133 assert_eq!(
1134 second_starvation.previous_mode,
1135 BravoBiasMode::WriterRecovery
1136 );
1137 assert_eq!(second_starvation.next_mode, BravoBiasMode::WriterRecovery);
1138 assert!(!second_starvation.switched);
1139 assert!(!second_starvation.rollback_triggered);
1140 assert_eq!(policy.snapshot().writer_recovery_remaining, 3);
1141
1142 let _ = policy.observe(sample(45, 55, 150, 450, 0));
1143 assert_eq!(policy.snapshot().writer_recovery_remaining, 2);
1144 let _ = policy.observe(sample(45, 55, 150, 450, 0));
1145 assert_eq!(policy.snapshot().writer_recovery_remaining, 1);
1146
1147 let exit = policy.observe(sample(45, 55, 150, 450, 0));
1148 assert_eq!(exit.previous_mode, BravoBiasMode::WriterRecovery);
1149 assert_eq!(exit.next_mode, BravoBiasMode::Balanced);
1150 assert!(exit.switched);
1151 assert!(!exit.rollback_triggered);
1152
1153 let telemetry = policy.snapshot();
1154 assert_eq!(telemetry.mode, BravoBiasMode::Balanced);
1155 assert_eq!(telemetry.writer_recovery_remaining, 0);
1156 assert_eq!(
1157 telemetry.last_signature,
1158 ContentionSignature::MixedContention
1159 );
1160 assert_eq!(telemetry.rollbacks, 1);
1161 assert!(telemetry.transitions >= 3);
1162 }
1163
1164 #[test]
1165 fn bravo_zero_window_config_clamps_to_single_recovery_window() {
1166 let mut config = deterministic_config();
1167 config.max_consecutive_read_bias_windows = 0;
1168 config.writer_recovery_windows = 0;
1169 let mut policy = BravoContentionState::new(config);
1170
1171 let first = policy.observe(sample(85, 15, 100, 300, 0));
1172 assert_eq!(first.next_mode, BravoBiasMode::ReadBiased);
1173 assert_eq!(policy.snapshot().consecutive_read_bias_windows, 1);
1174
1175 let second = policy.observe(sample(86, 14, 100, 320, 0));
1176 assert_eq!(second.previous_mode, BravoBiasMode::ReadBiased);
1177 assert_eq!(second.next_mode, BravoBiasMode::WriterRecovery);
1178 assert_eq!(second.signature, ContentionSignature::ReadDominant);
1179 assert!(!second.rollback_triggered);
1180
1181 let recovery = policy.snapshot();
1182 assert_eq!(recovery.writer_recovery_remaining, 1);
1183 assert_eq!(recovery.consecutive_read_bias_windows, 0);
1184
1185 let third = policy.observe(sample(45, 55, 120, 450, 0));
1186 assert_eq!(third.previous_mode, BravoBiasMode::WriterRecovery);
1187 assert_eq!(third.next_mode, BravoBiasMode::Balanced);
1188 assert!(third.switched);
1189 assert_eq!(policy.snapshot().writer_recovery_remaining, 0);
1190 }
1191
1192 #[test]
1193 fn bravo_writer_recovery_starvation_refresh_clamps_to_one_when_config_is_zero() {
1194 let mut config = deterministic_config();
1195 config.writer_recovery_windows = 0;
1196 let mut policy = BravoContentionState::new(config);
1197
1198 let _ = policy.observe(sample(82, 18, 100, 280, 0));
1199 let starvation = policy.observe(sample(75, 25, 100, 8_200, 3));
1200 assert_eq!(starvation.previous_mode, BravoBiasMode::ReadBiased);
1201 assert_eq!(starvation.next_mode, BravoBiasMode::WriterRecovery);
1202 assert!(starvation.rollback_triggered);
1203 assert_eq!(policy.snapshot().writer_recovery_remaining, 1);
1204
1205 let repeated_starvation = policy.observe(sample(70, 30, 100, 8_600, 2));
1206 assert_eq!(
1207 repeated_starvation.previous_mode,
1208 BravoBiasMode::WriterRecovery
1209 );
1210 assert_eq!(repeated_starvation.next_mode, BravoBiasMode::WriterRecovery);
1211 assert!(!repeated_starvation.rollback_triggered);
1212 assert_eq!(
1213 repeated_starvation.signature,
1214 ContentionSignature::WriterStarvationRisk
1215 );
1216 assert_eq!(policy.snapshot().writer_recovery_remaining, 1);
1217
1218 let exit = policy.observe(sample(48, 52, 140, 420, 0));
1219 assert_eq!(exit.previous_mode, BravoBiasMode::WriterRecovery);
1220 assert_eq!(exit.next_mode, BravoBiasMode::Balanced);
1221 assert_eq!(policy.snapshot().writer_recovery_remaining, 0);
1222 }
1223
1224 #[test]
1225 fn bravo_policy_enforces_writer_fairness_budget() {
1226 let mut config = deterministic_config();
1227 config.max_consecutive_read_bias_windows = 2;
1228 config.writer_recovery_windows = 1;
1229 let mut policy = BravoContentionState::new(config);
1230
1231 let _ = policy.observe(sample(80, 20, 100, 250, 0));
1232 let second = policy.observe(sample(85, 15, 100, 260, 0));
1233 assert_eq!(second.next_mode, BravoBiasMode::WriterRecovery);
1234 assert_eq!(second.signature, ContentionSignature::ReadDominant);
1235 assert!(!second.rollback_triggered);
1236
1237 let recovery = policy.observe(sample(40, 60, 150, 400, 0));
1238 assert_eq!(recovery.next_mode, BravoBiasMode::Balanced);
1239
1240 let telemetry = policy.snapshot();
1241 assert_eq!(telemetry.mode, BravoBiasMode::Balanced);
1242 assert_eq!(telemetry.writer_recovery_remaining, 0);
1243 }
1244
1245 #[test]
1246 fn queue_snapshot_exposes_bravo_policy_telemetry() {
1247 let mut queue: HostcallRequestQueue<u8> =
1248 HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::SafeFallback);
1249
1250 let decision = queue.observe_contention_window(sample(70, 30, 120, 350, 0));
1251 assert_eq!(decision.next_mode, BravoBiasMode::ReadBiased);
1252
1253 let snapshot = queue.snapshot();
1254 assert_eq!(snapshot.bravo_mode, BravoBiasMode::ReadBiased);
1255 assert_eq!(
1256 snapshot.bravo_last_signature,
1257 ContentionSignature::MixedContention
1258 );
1259 assert!(snapshot.bravo_transitions >= 1);
1260 }
1261
1262 #[test]
1263 fn queue_spills_to_overflow_with_stable_order() {
1264 let mut queue = HostcallRequestQueue::with_mode(2, 4, HostcallQueueMode::SafeFallback);
1265 assert!(matches!(
1266 queue.push_back(0_u8),
1267 HostcallQueueEnqueueResult::FastPath { .. }
1268 ));
1269 assert!(matches!(
1270 queue.push_back(1_u8),
1271 HostcallQueueEnqueueResult::FastPath { .. }
1272 ));
1273 assert!(matches!(
1274 queue.push_back(2_u8),
1275 HostcallQueueEnqueueResult::OverflowPath { .. }
1276 ));
1277
1278 let snapshot = queue.snapshot();
1279 assert_eq!(snapshot.fast_depth, 2);
1280 assert_eq!(snapshot.overflow_depth, 1);
1281 assert_eq!(snapshot.total_depth, 3);
1282 assert_eq!(snapshot.overflow_enqueued_total, 1);
1283
1284 let drained = queue.drain_all();
1285 assert_eq!(drained.into_iter().collect::<Vec<_>>(), vec![0, 1, 2]);
1286 }
1287
1288 #[test]
1289 fn queue_rejects_when_overflow_capacity_reached() {
1290 let mut queue = HostcallRequestQueue::with_mode(1, 1, HostcallQueueMode::SafeFallback);
1291 assert!(matches!(
1292 queue.push_back(0_u8),
1293 HostcallQueueEnqueueResult::FastPath { .. }
1294 ));
1295 assert!(matches!(
1296 queue.push_back(1_u8),
1297 HostcallQueueEnqueueResult::OverflowPath { .. }
1298 ));
1299 assert!(matches!(
1300 queue.push_back(2_u8),
1301 HostcallQueueEnqueueResult::Rejected { .. }
1302 ));
1303
1304 let snapshot = queue.snapshot();
1305 assert_eq!(snapshot.total_depth, 2);
1306 assert_eq!(snapshot.overflow_depth, 1);
1307 assert_eq!(snapshot.overflow_rejected_total, 1);
1308 }
1309
1310 #[test]
1311 fn s3fifo_fairness_budget_rejects_noisy_tenant_before_overflow_full() {
1312 let mut queue = HostcallRequestQueue::with_mode(1, 3, HostcallQueueMode::SafeFallback);
1313
1314 assert!(matches!(
1315 queue.push_back(TenantRequest {
1316 tenant: Some("ext.noisy"),
1317 value: 0
1318 }),
1319 HostcallQueueEnqueueResult::FastPath { .. }
1320 ));
1321 assert!(matches!(
1322 queue.push_back(TenantRequest {
1323 tenant: Some("ext.noisy"),
1324 value: 1
1325 }),
1326 HostcallQueueEnqueueResult::OverflowPath { .. }
1327 ));
1328 assert!(matches!(
1329 queue.push_back(TenantRequest {
1330 tenant: Some("ext.noisy"),
1331 value: 2
1332 }),
1333 HostcallQueueEnqueueResult::Rejected { .. }
1334 ));
1335
1336 let snapshot = queue.snapshot();
1337 assert_eq!(snapshot.s3fifo_mode, S3FifoMode::Active);
1338 assert_eq!(snapshot.s3fifo_tenant_budget, 1);
1339 assert_eq!(snapshot.s3fifo_fairness_rejected_total, 1);
1340 assert_eq!(snapshot.overflow_rejected_total, 1);
1341 }
1342
1343 #[test]
1344 fn s3fifo_ghost_hits_allow_reentry_after_prior_rejection() {
1345 let mut queue = HostcallRequestQueue::with_mode(1, 3, HostcallQueueMode::SafeFallback);
1346
1347 assert!(matches!(
1348 queue.push_back(TenantRequest {
1349 tenant: Some("ext.ghost"),
1350 value: 0
1351 }),
1352 HostcallQueueEnqueueResult::FastPath { .. }
1353 ));
1354 assert!(matches!(
1355 queue.push_back(TenantRequest {
1356 tenant: Some("ext.ghost"),
1357 value: 1
1358 }),
1359 HostcallQueueEnqueueResult::OverflowPath { .. }
1360 ));
1361 assert!(matches!(
1362 queue.push_back(TenantRequest {
1363 tenant: Some("ext.ghost"),
1364 value: 2
1365 }),
1366 HostcallQueueEnqueueResult::Rejected { .. }
1367 ));
1368
1369 let drained = queue.drain_all();
1370 assert_eq!(
1371 drained
1372 .into_iter()
1373 .map(|entry| entry.value)
1374 .collect::<Vec<_>>(),
1375 vec![0, 1]
1376 );
1377
1378 assert!(matches!(
1379 queue.push_back(TenantRequest {
1380 tenant: Some("ext.ghost"),
1381 value: 3
1382 }),
1383 HostcallQueueEnqueueResult::FastPath { .. }
1384 ));
1385 assert!(matches!(
1386 queue.push_back(TenantRequest {
1387 tenant: Some("ext.ghost"),
1388 value: 4
1389 }),
1390 HostcallQueueEnqueueResult::OverflowPath { .. }
1391 ));
1392 assert!(matches!(
1393 queue.push_back(TenantRequest {
1394 tenant: Some("ext.ghost"),
1395 value: 5
1396 }),
1397 HostcallQueueEnqueueResult::OverflowPath { .. }
1398 ));
1399
1400 let snapshot = queue.snapshot();
1401 assert!(snapshot.s3fifo_ghost_hits_total >= 1);
1402 assert_eq!(snapshot.s3fifo_fairness_rejected_total, 1);
1403 }
1404
1405 #[test]
1406 fn s3fifo_falls_back_to_conservative_fifo_when_signal_is_insufficient() {
1407 let mut queue = HostcallRequestQueue::with_mode(1, 2, HostcallQueueMode::SafeFallback);
1408
1409 for value in 0..96_u8 {
1410 let _ = queue.push_back(value);
1411 let _ = queue.drain_all();
1412 }
1413
1414 let snapshot = queue.snapshot();
1415 assert_eq!(snapshot.s3fifo_mode, S3FifoMode::ConservativeFifo);
1416 assert_eq!(
1417 snapshot.s3fifo_fallback_reason,
1418 Some(S3FifoFallbackReason::SignalQualityInsufficient)
1419 );
1420 assert!(snapshot.s3fifo_fallback_transitions >= 1);
1421 }
1422
1423 #[test]
1424 fn s3fifo_fairness_fallback_reason_and_transition_count_stay_stable() {
1425 let mut queue = HostcallRequestQueue::with_mode(1, 1, HostcallQueueMode::SafeFallback);
1426
1427 assert!(matches!(
1428 queue.push_back(0_u8),
1429 HostcallQueueEnqueueResult::FastPath { .. }
1430 ));
1431 assert!(matches!(
1432 queue.push_back(1_u8),
1433 HostcallQueueEnqueueResult::OverflowPath { .. }
1434 ));
1435
1436 for value in 2_u8..40_u8 {
1437 let _ = queue.push_back(value);
1438 }
1439
1440 let fallback = queue.snapshot();
1441 assert_eq!(fallback.s3fifo_mode, S3FifoMode::ConservativeFifo);
1442 assert_eq!(
1443 fallback.s3fifo_fallback_reason,
1444 Some(S3FifoFallbackReason::FairnessInstability)
1445 );
1446 assert_eq!(fallback.s3fifo_fallback_transitions, 1);
1447 let fairness_rejections_before = fallback.s3fifo_fairness_rejected_total;
1448
1449 for value in 40_u8..80_u8 {
1450 let _ = queue.push_back(value);
1451 let _ = queue.drain_all();
1452 }
1453
1454 let stable = queue.snapshot();
1455 assert_eq!(stable.s3fifo_mode, S3FifoMode::ConservativeFifo);
1456 assert_eq!(
1457 stable.s3fifo_fallback_reason,
1458 Some(S3FifoFallbackReason::FairnessInstability)
1459 );
1460 assert_eq!(stable.s3fifo_fallback_transitions, 1);
1461 assert_eq!(
1462 stable.s3fifo_fairness_rejected_total,
1463 fairness_rejections_before
1464 );
1465 }
1466
1467 #[test]
1468 fn s3fifo_signal_quality_fallback_reason_does_not_flip_under_later_pressure() {
1469 let mut queue = HostcallRequestQueue::with_mode(1, 2, HostcallQueueMode::SafeFallback);
1470
1471 for value in 0..96_u8 {
1472 let _ = queue.push_back(TenantRequest {
1473 tenant: None,
1474 value,
1475 });
1476 let _ = queue.drain_all();
1477 }
1478
1479 let fallback = queue.snapshot();
1480 assert_eq!(fallback.s3fifo_mode, S3FifoMode::ConservativeFifo);
1481 assert_eq!(
1482 fallback.s3fifo_fallback_reason,
1483 Some(S3FifoFallbackReason::SignalQualityInsufficient)
1484 );
1485 assert_eq!(fallback.s3fifo_fallback_transitions, 1);
1486 let fairness_rejections_before = fallback.s3fifo_fairness_rejected_total;
1487
1488 for value in 0_u8..32_u8 {
1489 let _ = queue.push_back(TenantRequest {
1490 tenant: Some("ext.noisy"),
1491 value,
1492 });
1493 let _ = queue.push_back(TenantRequest {
1494 tenant: Some("ext.noisy"),
1495 value: value.saturating_add(1),
1496 });
1497 let _ = queue.drain_all();
1498 }
1499
1500 let stable = queue.snapshot();
1501 assert_eq!(stable.s3fifo_mode, S3FifoMode::ConservativeFifo);
1502 assert_eq!(
1503 stable.s3fifo_fallback_reason,
1504 Some(S3FifoFallbackReason::SignalQualityInsufficient)
1505 );
1506 assert_eq!(stable.s3fifo_fallback_transitions, 1);
1507 assert_eq!(
1508 stable.s3fifo_fairness_rejected_total,
1509 fairness_rejections_before
1510 );
1511 }
1512
1513 #[test]
1514 fn ebr_reclamation_tracks_lag_and_latency() {
1515 let mut queue = HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::Ebr);
1516 let pin = queue.pin_epoch();
1517 assert!(matches!(
1518 queue.push_back(1_u8),
1519 HostcallQueueEnqueueResult::FastPath { .. }
1520 ));
1521 assert!(matches!(
1522 queue.push_back(2_u8),
1523 HostcallQueueEnqueueResult::FastPath { .. }
1524 ));
1525 let drained = queue.drain_all();
1526 assert_eq!(drained.len(), 2);
1527
1528 queue.force_reclaim();
1529 let blocked = queue.snapshot();
1530 assert_eq!(blocked.reclamation_mode, HostcallQueueMode::Ebr);
1531 assert_eq!(blocked.retired_backlog, 2);
1532 assert_eq!(blocked.reclaimed_total, 0);
1533 assert!(blocked.epoch_lag >= 1);
1534
1535 drop(pin);
1536 queue.force_reclaim();
1537 let reclaimed = queue.snapshot();
1538 assert_eq!(reclaimed.retired_backlog, 0);
1539 assert!(reclaimed.reclaimed_total >= 2);
1540 assert!(reclaimed.reclamation_latency_max_epochs >= 1);
1541 }
1542
1543 #[test]
1544 fn safe_fallback_mode_skips_retirement_tracking() {
1545 let mut queue = HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::SafeFallback);
1546 let _pin = queue.pin_epoch();
1547 assert!(matches!(
1548 queue.push_back(1_u8),
1549 HostcallQueueEnqueueResult::FastPath { .. }
1550 ));
1551 let _ = queue.drain_all();
1552 queue.force_reclaim();
1553
1554 let snapshot = queue.snapshot();
1555 assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::SafeFallback);
1556 assert_eq!(snapshot.retired_backlog, 0);
1557 assert_eq!(snapshot.reclaimed_total, 0);
1558 }
1559
1560 #[test]
1561 fn ebr_auto_falls_back_when_retired_backlog_runs_away() {
1562 let mut queue = HostcallRequestQueue::with_mode(1, 1, HostcallQueueMode::Ebr);
1563 let _pin = queue.pin_epoch();
1564
1565 for value in 0..64_u8 {
1568 let result = queue.push_back(value);
1569 assert!(
1570 !matches!(result, HostcallQueueEnqueueResult::Rejected { .. }),
1571 "queue should accept one item per cycle"
1572 );
1573 let drained = queue.drain_all();
1574 assert_eq!(drained.len(), 1);
1575 queue.force_reclaim();
1576 }
1577
1578 let snapshot = queue.snapshot();
1579 assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::SafeFallback);
1580 assert!(snapshot.fallback_transitions >= 1);
1581 }
1582
1583 #[test]
1584 fn ebr_stress_cycle_keeps_retired_backlog_bounded() {
1585 let mut queue = HostcallRequestQueue::with_mode(4, 8, HostcallQueueMode::Ebr);
1586
1587 for value in 0..10_000_u32 {
1588 let _ = queue.push_back(value);
1589 let drained = queue.drain_all();
1590 assert_eq!(drained.len(), 1);
1591 if value % 64 == 0 {
1592 queue.force_reclaim();
1593 }
1594 }
1595
1596 queue.force_reclaim();
1597 let snapshot = queue.snapshot();
1598 assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::Ebr);
1599 assert_eq!(snapshot.retired_backlog, 0);
1600 assert!(snapshot.reclaimed_total >= 10_000);
1601 }
1602
1603 #[test]
1606 fn contention_sample_total_acquires_sums_reads_and_writes() {
1607 let s = sample(100, 50, 0, 0, 0);
1608 assert_eq!(s.total_acquires(), 150);
1609
1610 let zero = ContentionSample::default();
1611 assert_eq!(zero.total_acquires(), 0);
1612
1613 let max = ContentionSample {
1614 read_acquires: u64::MAX,
1615 write_acquires: 1,
1616 ..Default::default()
1617 };
1618 assert_eq!(max.total_acquires(), u64::MAX, "saturating_add on overflow");
1619 }
1620
1621 #[test]
1622 fn contention_sample_read_ratio_permille_values() {
1623 let all_reads = sample(100, 0, 0, 0, 0);
1625 assert_eq!(all_reads.read_ratio_permille(), 1000);
1626
1627 let all_writes = sample(0, 100, 0, 0, 0);
1629 assert_eq!(all_writes.read_ratio_permille(), 0);
1630
1631 let balanced = sample(50, 50, 0, 0, 0);
1633 assert_eq!(balanced.read_ratio_permille(), 500);
1634
1635 let zero = ContentionSample::default();
1637 assert_eq!(zero.read_ratio_permille(), 0);
1638
1639 let three_quarter = sample(75, 25, 0, 0, 0);
1641 assert_eq!(three_quarter.read_ratio_permille(), 750);
1642 }
1643
1644 #[test]
1645 fn bravo_contention_state_mode_accessor() {
1646 let state = BravoContentionState::new(deterministic_config());
1647 assert_eq!(state.mode(), BravoBiasMode::Balanced);
1648
1649 let mut state2 = BravoContentionState::new(deterministic_config());
1650 let _ = state2.observe(sample(80, 10, 0, 0, 0));
1652 assert_eq!(state2.mode(), BravoBiasMode::ReadBiased);
1653 }
1654
1655 #[test]
1656 fn s3fifo_config_from_capacities_computes_fields() {
1657 let config = S3FifoConfig::from_capacities(256, 2048);
1658 assert_eq!(config.tenant_budget, 1024);
1660 assert_eq!(config.ghost_capacity, 4608);
1662 assert_eq!(config.min_signal_samples, 32);
1663 assert_eq!(config.max_signalless_streak, 64);
1664 assert_eq!(config.unstable_rejection_streak, 16);
1665
1666 let small = S3FifoConfig::from_capacities(1, 1);
1668 assert_eq!(small.tenant_budget, 1); assert_eq!(small.ghost_capacity, 16); }
1671
1672 #[test]
1673 fn queue_with_capacities_creates_functional_queue() {
1674 let mut queue: HostcallRequestQueue<u32> = HostcallRequestQueue::with_capacities(4, 8);
1675 assert!(queue.is_empty());
1676 assert_eq!(queue.len(), 0);
1677
1678 let result = queue.push_back(42);
1679 assert!(matches!(
1680 result,
1681 HostcallQueueEnqueueResult::FastPath { .. }
1682 ));
1683 assert!(!queue.is_empty());
1684 assert_eq!(queue.len(), 1);
1685
1686 let snapshot = queue.snapshot();
1687 assert_eq!(snapshot.fast_capacity, 4);
1688 assert_eq!(snapshot.overflow_capacity, 8);
1689 }
1690
1691 #[test]
1692 fn queue_clear_resets_state() {
1693 let mut queue = HostcallRequestQueue::with_mode(2, 4, HostcallQueueMode::Ebr);
1694 let _ = queue.push_back(1_u8);
1695 let _ = queue.push_back(2_u8);
1696 let _ = queue.push_back(3_u8); assert!(!queue.is_empty());
1698
1699 queue.clear();
1700 assert!(queue.is_empty());
1701 assert_eq!(queue.len(), 0);
1702 let snapshot = queue.snapshot();
1703 assert_eq!(snapshot.max_depth_seen, 0);
1704 assert_eq!(snapshot.overflow_enqueued_total, 0);
1705 assert_eq!(snapshot.overflow_rejected_total, 0);
1706 }
1707
1708 #[test]
1709 fn queue_clear_resets_s3fifo_fallback_and_counters() {
1710 let mut queue = HostcallRequestQueue::with_mode(1, 3, HostcallQueueMode::SafeFallback);
1711
1712 assert!(matches!(
1713 queue.push_back(TenantRequest {
1714 tenant: Some("ext.reset"),
1715 value: 0
1716 }),
1717 HostcallQueueEnqueueResult::FastPath { .. }
1718 ));
1719 assert!(matches!(
1720 queue.push_back(TenantRequest {
1721 tenant: Some("ext.reset"),
1722 value: 1
1723 }),
1724 HostcallQueueEnqueueResult::OverflowPath { .. }
1725 ));
1726 assert!(matches!(
1727 queue.push_back(TenantRequest {
1728 tenant: Some("ext.reset"),
1729 value: 2
1730 }),
1731 HostcallQueueEnqueueResult::Rejected { .. }
1732 ));
1733
1734 let _ = queue.drain_all();
1735
1736 assert!(matches!(
1737 queue.push_back(TenantRequest {
1738 tenant: Some("ext.reset"),
1739 value: 3
1740 }),
1741 HostcallQueueEnqueueResult::FastPath { .. }
1742 ));
1743 assert!(matches!(
1744 queue.push_back(TenantRequest {
1745 tenant: Some("ext.reset"),
1746 value: 4
1747 }),
1748 HostcallQueueEnqueueResult::OverflowPath { .. }
1749 ));
1750 assert!(matches!(
1751 queue.push_back(TenantRequest {
1752 tenant: Some("ext.reset"),
1753 value: 5
1754 }),
1755 HostcallQueueEnqueueResult::OverflowPath { .. }
1756 ));
1757
1758 drive_signal_quality_fallback(&mut queue, 6);
1759
1760 let before_clear = queue.snapshot();
1761 assert_eq!(before_clear.s3fifo_mode, S3FifoMode::ConservativeFifo);
1762 assert_eq!(
1763 before_clear.s3fifo_fallback_reason,
1764 Some(S3FifoFallbackReason::SignalQualityInsufficient)
1765 );
1766 assert_eq!(before_clear.s3fifo_fallback_transitions, 1);
1767 assert!(before_clear.s3fifo_ghost_hits_total >= 1);
1768 assert!(before_clear.s3fifo_fairness_rejected_total >= 1);
1769 assert!(before_clear.s3fifo_signal_samples >= 1);
1770
1771 queue.clear();
1772
1773 let cleared = queue.snapshot();
1774 assert_eq!(cleared.s3fifo_mode, S3FifoMode::Active);
1775 assert_eq!(cleared.s3fifo_fallback_reason, None);
1776 assert_eq!(cleared.s3fifo_ghost_depth, 0);
1777 assert_eq!(cleared.s3fifo_ghost_hits_total, 0);
1778 assert_eq!(cleared.s3fifo_fairness_rejected_total, 0);
1779 assert_eq!(cleared.s3fifo_signal_samples, 0);
1780 assert_eq!(cleared.s3fifo_signalless_streak, 0);
1781 assert_eq!(cleared.s3fifo_fallback_transitions, 0);
1782 assert_eq!(cleared.s3fifo_active_tenants, 0);
1783 }
1784
1785 #[test]
1786 fn queue_clear_allows_s3fifo_fallback_to_retrigger_from_clean_state() {
1787 let mut queue = HostcallRequestQueue::with_mode(1, 2, HostcallQueueMode::SafeFallback);
1788
1789 drive_signal_quality_fallback(&mut queue, 0);
1790
1791 let first = queue.snapshot();
1792 assert_eq!(first.s3fifo_mode, S3FifoMode::ConservativeFifo);
1793 assert_eq!(
1794 first.s3fifo_fallback_reason,
1795 Some(S3FifoFallbackReason::SignalQualityInsufficient)
1796 );
1797 assert_eq!(first.s3fifo_fallback_transitions, 1);
1798
1799 queue.clear();
1800
1801 let after_clear = queue.snapshot();
1802 assert_eq!(after_clear.s3fifo_mode, S3FifoMode::Active);
1803 assert_eq!(after_clear.s3fifo_fallback_reason, None);
1804 assert_eq!(after_clear.s3fifo_fallback_transitions, 0);
1805
1806 drive_signal_quality_fallback(&mut queue, 120);
1807
1808 let retriggered = queue.snapshot();
1809 assert_eq!(retriggered.s3fifo_mode, S3FifoMode::ConservativeFifo);
1810 assert_eq!(
1811 retriggered.s3fifo_fallback_reason,
1812 Some(S3FifoFallbackReason::SignalQualityInsufficient)
1813 );
1814 assert_eq!(retriggered.s3fifo_fallback_transitions, 1);
1815 }
1816
1817 #[test]
1818 fn queue_reclamation_mode_accessor() {
1819 let ebr = HostcallRequestQueue::<u8>::with_mode(2, 2, HostcallQueueMode::Ebr);
1820 assert_eq!(ebr.reclamation_mode(), HostcallQueueMode::Ebr);
1821
1822 let fallback = HostcallRequestQueue::<u8>::with_mode(2, 2, HostcallQueueMode::SafeFallback);
1823 assert_eq!(fallback.reclamation_mode(), HostcallQueueMode::SafeFallback);
1824 }
1825
1826 #[test]
1827 fn queue_force_safe_fallback_switches_mode() {
1828 let mut queue: HostcallRequestQueue<u8> =
1829 HostcallRequestQueue::with_mode(2, 2, HostcallQueueMode::Ebr);
1830 assert_eq!(queue.reclamation_mode(), HostcallQueueMode::Ebr);
1831
1832 queue.force_safe_fallback();
1833 assert_eq!(queue.reclamation_mode(), HostcallQueueMode::SafeFallback);
1834 let snapshot = queue.snapshot();
1835 assert_eq!(snapshot.fallback_transitions, 1);
1836
1837 queue.force_safe_fallback();
1839 let snapshot2 = queue.snapshot();
1840 assert_eq!(snapshot2.fallback_transitions, 1);
1841 }
1842
1843 #[test]
1844 fn queue_default_uses_standard_capacities() {
1845 let queue: HostcallRequestQueue<u8> = HostcallRequestQueue::default();
1846 let snapshot = queue.snapshot();
1847 assert_eq!(snapshot.fast_capacity, HOSTCALL_FAST_RING_CAPACITY);
1848 assert_eq!(snapshot.overflow_capacity, HOSTCALL_OVERFLOW_CAPACITY);
1849 }
1850
1851 mod proptest_bravo {
1854 use super::*;
1855 use proptest::prelude::*;
1856
1857 fn arb_sample() -> impl Strategy<Value = ContentionSample> {
1858 (
1859 0..10_000u64,
1860 0..10_000u64,
1861 0..50_000u64,
1862 0..50_000u64,
1863 0..100u64,
1864 )
1865 .prop_map(|(reads, writes, r_wait, w_wait, w_timeouts)| {
1866 ContentionSample {
1867 read_acquires: reads,
1868 write_acquires: writes,
1869 read_wait_p95_us: r_wait,
1870 write_wait_p95_us: w_wait,
1871 write_timeouts: w_timeouts,
1872 }
1873 })
1874 }
1875
1876 fn arb_config() -> impl Strategy<Value = BravoContentionConfig> {
1877 (
1878 1..200u64,
1879 500..1000u32,
1880 100..500u32,
1881 500..999u32,
1882 1000..20_000u64,
1883 1..10u64,
1884 1..10u32,
1885 1..5u32,
1886 )
1887 .prop_map(
1888 |(
1889 min_acq,
1890 rd_ratio,
1891 mixed_floor,
1892 mixed_ceil,
1893 starve_wait,
1894 starve_to,
1895 max_rb,
1896 wr_windows,
1897 )| {
1898 BravoContentionConfig {
1899 min_total_acquires: min_acq,
1900 read_dominant_ratio_permille: rd_ratio,
1901 mixed_ratio_floor_permille: mixed_floor,
1902 mixed_ratio_ceiling_permille: mixed_ceil.max(mixed_floor),
1903 writer_starvation_wait_us: starve_wait,
1904 writer_starvation_timeouts: starve_to,
1905 max_consecutive_read_bias_windows: max_rb,
1906 writer_recovery_windows: wr_windows,
1907 }
1908 },
1909 )
1910 }
1911
1912 proptest! {
1913 #[test]
1914 fn classify_is_deterministic(
1915 sample in arb_sample(),
1916 cfg in arb_config(),
1917 ) {
1918 let s1 = BravoContentionState::classify(sample, cfg);
1919 let s2 = BravoContentionState::classify(sample, cfg);
1920 assert_eq!(s1, s2, "same inputs must produce same signature");
1921 }
1922
1923 #[test]
1924 fn read_ratio_permille_bounded_0_to_1000(
1925 reads in 0..u64::MAX / 2,
1926 writes in 0..u64::MAX / 2,
1927 ) {
1928 let s = ContentionSample {
1929 read_acquires: reads,
1930 write_acquires: writes,
1931 ..Default::default()
1932 };
1933 let ratio = s.read_ratio_permille();
1934 assert!(ratio <= 1000, "ratio was {ratio}, expected <= 1000");
1935 }
1936
1937 #[test]
1938 fn total_acquires_at_least_each_component(
1939 reads in 0..u64::MAX / 2,
1940 writes in 0..u64::MAX / 2,
1941 ) {
1942 let s = ContentionSample {
1943 read_acquires: reads,
1944 write_acquires: writes,
1945 ..Default::default()
1946 };
1947 let total = s.total_acquires();
1948 assert!(total >= reads, "total must be >= reads");
1949 assert!(total >= writes, "total must be >= writes");
1950 }
1951
1952 #[test]
1953 fn mode_always_valid_after_observation_sequence(
1954 cfg in arb_config(),
1955 samples in prop::collection::vec(arb_sample(), 1..30),
1956 ) {
1957 let mut state = BravoContentionState::new(cfg);
1958 for sample in &samples {
1959 let decision = state.observe(*sample);
1960 assert!(
1961 matches!(
1962 decision.next_mode,
1963 BravoBiasMode::Balanced
1964 | BravoBiasMode::ReadBiased
1965 | BravoBiasMode::WriterRecovery
1966 ),
1967 "mode must be a valid variant"
1968 );
1969 assert_eq!(decision.switched, decision.previous_mode != decision.next_mode);
1970 }
1971 }
1972
1973 #[test]
1974 fn counters_monotonically_nondecreasing(
1975 cfg in arb_config(),
1976 samples in prop::collection::vec(arb_sample(), 1..30),
1977 ) {
1978 let mut state = BravoContentionState::new(cfg);
1979 let mut prev_transitions = 0u64;
1980 let mut prev_rollbacks = 0u64;
1981 let mut prev_windows = 0u64;
1982
1983 for sample in &samples {
1984 let _ = state.observe(*sample);
1985 let snap = state.snapshot();
1986 assert!(snap.transitions >= prev_transitions);
1987 assert!(snap.rollbacks >= prev_rollbacks);
1988 assert!(snap.windows_observed >= prev_windows);
1989 prev_transitions = snap.transitions;
1990 prev_rollbacks = snap.rollbacks;
1991 prev_windows = snap.windows_observed;
1992 }
1993 }
1994
1995 #[test]
1996 fn windows_observed_equals_call_count(
1997 cfg in arb_config(),
1998 samples in prop::collection::vec(arb_sample(), 1..30),
1999 ) {
2000 let mut state = BravoContentionState::new(cfg);
2001 for sample in &samples {
2002 let _ = state.observe(*sample);
2003 }
2004 let snap = state.snapshot();
2005 assert_eq!(
2006 snap.windows_observed,
2007 samples.len() as u64,
2008 "windows_observed must equal number of observe() calls"
2009 );
2010 }
2011
2012 #[test]
2013 fn initial_state_is_balanced_with_zero_counters(
2014 cfg in arb_config(),
2015 ) {
2016 let state = BravoContentionState::new(cfg);
2017 let snap = state.snapshot();
2018 assert_eq!(snap.mode, BravoBiasMode::Balanced);
2019 assert_eq!(snap.transitions, 0);
2020 assert_eq!(snap.rollbacks, 0);
2021 assert_eq!(snap.windows_observed, 0);
2022 assert_eq!(snap.consecutive_read_bias_windows, 0);
2023 assert_eq!(snap.writer_recovery_remaining, 0);
2024 }
2025 }
2026 }
2027
2028 #[test]
2029 #[ignore = "loom model checker SIGSEGV: spin-wait loops exhaust state space"]
2030 fn loom_epoch_pin_blocks_reclamation_until_release() {
2031 use loom::sync::atomic::{AtomicBool, Ordering as LoomOrdering};
2032 use loom::sync::{Arc, Mutex};
2033 use loom::thread;
2034
2035 loom::model(|| {
2036 let queue = Arc::new(Mutex::new(HostcallRequestQueue::with_mode(
2037 1,
2038 2,
2039 HostcallQueueMode::Ebr,
2040 )));
2041 let pin_ready = Arc::new(AtomicBool::new(false));
2042 let release_pin = Arc::new(AtomicBool::new(false));
2043
2044 let queue_for_pin = Arc::clone(&queue);
2045 let pin_ready_for_thread = Arc::clone(&pin_ready);
2046 let release_pin_for_thread = Arc::clone(&release_pin);
2047 let pin_thread = thread::spawn(move || {
2048 let pin = queue_for_pin.lock().expect("lock queue").pin_epoch();
2049 pin_ready_for_thread.store(true, LoomOrdering::SeqCst);
2050 while !release_pin_for_thread.load(LoomOrdering::SeqCst) {
2051 thread::yield_now();
2052 }
2053 drop(pin);
2054 });
2055
2056 let queue_for_worker = Arc::clone(&queue);
2057 let pin_ready_for_worker = Arc::clone(&pin_ready);
2058 let worker = thread::spawn(move || {
2059 while !pin_ready_for_worker.load(LoomOrdering::SeqCst) {
2060 thread::yield_now();
2061 }
2062
2063 let mut queue = queue_for_worker.lock().expect("lock queue");
2064 let _ = queue.push_back(1_u8);
2065 let _ = queue.push_back(2_u8);
2066 let drained = queue.drain_all();
2067 assert_eq!(drained.len(), 2);
2068 queue.force_reclaim();
2069 let snapshot = queue.snapshot();
2070 assert_eq!(snapshot.reclamation_mode, HostcallQueueMode::Ebr);
2071 assert!(snapshot.retired_backlog >= 2);
2072 assert_eq!(snapshot.reclaimed_total, 0);
2073 drop(queue);
2074 });
2075
2076 worker.join().expect("worker join");
2077 release_pin.store(true, LoomOrdering::SeqCst);
2078 pin_thread.join().expect("pin thread join");
2079
2080 let mut queue = queue.lock().expect("lock queue");
2081 queue.force_reclaim();
2082 let snapshot = queue.snapshot();
2083 assert_eq!(snapshot.retired_backlog, 0);
2084 assert!(snapshot.reclaimed_total >= 2);
2085 drop(queue);
2086 });
2087 }
2088
2089 #[test]
2090 #[ignore = "loom model checker SIGSEGV: needs cfg(loom) feature gate"]
2091 fn loom_concurrent_enqueue_dequeue_keeps_values_unique() {
2092 use loom::sync::{Arc, Mutex};
2093 use loom::thread;
2094
2095 loom::model(|| {
2096 let queue = Arc::new(Mutex::new(HostcallRequestQueue::with_mode(
2097 2,
2098 2,
2099 HostcallQueueMode::SafeFallback,
2100 )));
2101
2102 let queue_a = Arc::clone(&queue);
2103 let producer_a = thread::spawn(move || {
2104 let mut queue = queue_a.lock().expect("lock queue");
2105 let _ = queue.push_back(10_u8);
2106 });
2107
2108 let queue_b = Arc::clone(&queue);
2109 let producer_b = thread::spawn(move || {
2110 let mut queue = queue_b.lock().expect("lock queue");
2111 let _ = queue.push_back(11_u8);
2112 });
2113
2114 producer_a.join().expect("producer_a join");
2115 producer_b.join().expect("producer_b join");
2116
2117 let mut queue = queue.lock().expect("lock queue");
2118 let drained = queue.drain_all();
2119 drop(queue);
2120 let mut values = drained.into_iter().collect::<Vec<_>>();
2121 values.sort_unstable();
2122 assert_eq!(values, vec![10, 11]);
2123 });
2124 }
2125}