1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use dashmap::DashMap;
44
45use crate::megakernel::protocol::opcode::SHUTDOWN;
46use crate::megakernel::Megakernel;
47use crate::PipelineError;
48
49pub const TENANT_OPCODE_BASE: u32 = 0x4000_0000;
54
55pub const TENANT_ID_MAX: u32 = u32::MAX - 1;
58
59pub const OPCODE_RANGE_PER_TENANT: u32 = 1 << 20;
64
65const QUIESCE_SPIN_POLLS: u64 = 64;
66const QUIESCE_MIN_PARK: Duration = Duration::from_micros(2);
67const QUIESCE_MAX_PARK: Duration = Duration::from_micros(50);
68const QUIESCE_BACKOFF_SHIFT_CAP: u64 = 5;
69
70#[allow(clippy::unnecessary_min_or_max)]
71fn quiesce_backoff_duration(poll: u64) -> Duration {
72 let parked_poll = poll.checked_sub(QUIESCE_SPIN_POLLS).unwrap_or(0);
73 let shift = u32::try_from(parked_poll.min(QUIESCE_BACKOFF_SHIFT_CAP)).unwrap_or_else(|error| {
74 panic!(
75 "tenant quiesce backoff shift cannot fit u32: {error}. Fix: lower QUIESCE_BACKOFF_SHIFT_CAP."
76 )
77 });
78 let multiplier = 1_u32.checked_shl(shift).unwrap_or_else(|| {
79 panic!("tenant quiesce backoff multiplier overflowed u32. Fix: lower shift cap.")
80 });
81 QUIESCE_MIN_PARK
82 .checked_mul(multiplier)
83 .unwrap_or_else(|| {
84 panic!("tenant quiesce backoff duration overflowed. Fix: lower quiesce park bounds.")
85 })
86 .min(QUIESCE_MAX_PARK)
87}
88
89fn quiesce_idle(poll: u64) {
90 if poll < QUIESCE_SPIN_POLLS {
91 std::hint::spin_loop();
92 } else {
93 std::thread::park_timeout(quiesce_backoff_duration(poll));
94 }
95}
96
97fn tenant_registry_retry_idle(retry: u64) {
98 if retry < QUIESCE_SPIN_POLLS {
99 std::hint::spin_loop();
100 } else {
101 std::thread::park_timeout(quiesce_backoff_duration(retry));
102 }
103}
104
105#[derive(Debug, thiserror::Error)]
107#[non_exhaustive]
108pub enum TenantError {
109 #[error("tenant registry exhausted after {issued} registrations. Fix: shrink OPCODE_RANGE_PER_TENANT or recycle tenants.")]
112 RegistryFull {
113 issued: u32,
115 },
116 #[error(
119 "tenant {tenant_id} published local opcode {local_opcode}; out of range [0, {cap}). \
120 Fix: caller must stay inside the opcode window returned by `register()`."
121 )]
122 OpcodeOutOfRange {
123 tenant_id: u32,
125 local_opcode: u32,
127 cap: u32,
129 },
130 #[error("tenant {tenant_id} was revoked; handle is stale. Fix: acquire a fresh handle from the registry.")]
132 Revoked {
133 tenant_id: u32,
135 },
136 #[error(
138 "tenant {tenant_id} quiesce timed out with {outstanding} inflight slots. \
139 Fix: ensure the megakernel is making progress (check DONE_COUNT) or raise the timeout."
140 )]
141 QuiesceTimeout {
142 tenant_id: u32,
144 outstanding: u64,
146 },
147 #[error(
149 "tenant {tenant_id} has {outstanding} outstanding slots, cap {cap}. \
150 Fix: wait for drain progress or register the tenant with a larger bounded backlog."
151 )]
152 Backpressure {
153 tenant_id: u32,
155 outstanding: u64,
157 cap: u64,
159 },
160 #[error("{0}")]
162 Pipeline(#[from] PipelineError),
163}
164
165struct TenantState {
168 id: u32,
169 base_opcode: u32,
170 opcode_cap: u32,
171 published_count: AtomicU64,
173 max_outstanding_slots: u64,
175 drained_count: AtomicU64,
178 quiesce_calls: AtomicU64,
180 quiesce_timeouts: AtomicU64,
182 quiesce_wait_ns: AtomicU64,
184 revoked: AtomicU32,
186 label: String,
188}
189
190#[derive(Clone)]
194pub struct TenantHandle {
195 state: Arc<TenantState>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct TenantRuntimeCounters {
201 pub tenant_id: u32,
203 pub published_count: u64,
205 pub drained_count: u64,
207 pub outstanding_slots: u64,
209 pub max_outstanding_slots: u64,
211 pub quiesce_calls: u64,
213 pub quiesce_timeouts: u64,
215 pub quiesce_wait_ns: u64,
217}
218
219impl std::fmt::Debug for TenantHandle {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("TenantHandle")
222 .field("id", &self.state.id)
223 .field("label", &self.state.label)
224 .field("base_opcode", &self.state.base_opcode)
225 .field(
226 "published_count",
227 &self.state.published_count.load(Ordering::Relaxed),
228 )
229 .field("max_outstanding_slots", &self.state.max_outstanding_slots)
230 .field(
231 "drained_count",
232 &self.state.drained_count.load(Ordering::Relaxed),
233 )
234 .field(
235 "revoked",
236 &(self.state.revoked.load(Ordering::Acquire) != 0),
237 )
238 .finish()
239 }
240}
241
242impl TenantHandle {
243 #[must_use]
245 pub fn id(&self) -> u32 {
246 self.state.id
247 }
248
249 #[must_use]
251 pub fn label(&self) -> &str {
252 &self.state.label
253 }
254
255 #[must_use]
257 pub fn base_opcode(&self) -> u32 {
258 self.state.base_opcode
259 }
260
261 pub fn global_opcode(&self, local: u32) -> Result<u32, TenantError> {
269 if local >= self.state.opcode_cap {
270 return Err(TenantError::OpcodeOutOfRange {
271 tenant_id: self.id(),
272 local_opcode: local,
273 cap: self.state.opcode_cap,
274 });
275 }
276 let global = self.state.base_opcode + local;
277 if let Err(e) = crate::megakernel::protocol::opcode::validate_user_opcode(global) {
278 return Err(TenantError::Pipeline(PipelineError::Backend(format!(
279 "tenant registry produced invalid global opcode {global}: {e}. Fix: repair tenant opcode window allocation before publishing."
280 ))));
281 }
282 Ok(global)
283 }
284
285 pub fn publish_slot(
297 &self,
298 ring_bytes: &mut [u8],
299 slot_idx: u32,
300 local_opcode: u32,
301 args: &[u32],
302 ) -> Result<(), TenantError> {
303 if self.state.revoked.load(Ordering::Acquire) != 0 {
304 return Err(TenantError::Revoked {
305 tenant_id: self.state.id,
306 });
307 }
308 let global = self.global_opcode(local_opcode)?;
309 self.reserve_publish_slot()?;
310 if let Err(error) =
311 Megakernel::publish_slot(ring_bytes, slot_idx, self.state.id, global, args)
312 {
313 checked_atomic_sub_u64(&self.state.published_count, 1, "tenant published rollback");
314 return Err(error.into());
315 }
316 Ok(())
317 }
318
319 fn reserve_publish_slot(&self) -> Result<(), TenantError> {
320 let cap = self.state.max_outstanding_slots;
321 vyre_driver::accounting::checked_atomic_update_u64_with_order(
322 &self.state.published_count,
323 Ordering::Acquire,
324 Ordering::AcqRel,
325 Ordering::Acquire,
326 |published| {
327 let drained = self.state.drained_count.load(Ordering::Acquire);
328 let outstanding = vyre_driver::accounting::checked_sub_u64_lazy(
329 published,
330 drained,
331 || {
332 TenantError::Pipeline(PipelineError::QueueFull {
333 queue: "tenant",
334 fix: "tenant drained_count exceeded published_count; rebuild tenant accounting state",
335 })
336 },
337 )?;
338 if outstanding >= cap {
339 return Err(TenantError::Backpressure {
340 tenant_id: self.state.id,
341 outstanding,
342 cap,
343 });
344 }
345 vyre_driver::accounting::checked_add_u64_lazy(published, 1, || {
346 TenantError::Pipeline(PipelineError::QueueFull {
347 queue: "tenant",
348 fix: "tenant published_count overflowed u64; quiesce or recreate the tenant before publishing more slots",
349 })
350 })
351 },
352 |_, _| Ok(()),
353 )?;
354 Ok(())
355 }
356
357 #[must_use]
359 pub fn published_count(&self) -> u64 {
360 self.state.published_count.load(Ordering::Relaxed)
361 }
362
363 #[must_use]
366 pub fn drained_count(&self) -> u64 {
367 self.state.drained_count.load(Ordering::Relaxed)
368 }
369
370 #[must_use]
372 pub fn max_outstanding_slots(&self) -> u64 {
373 self.state.max_outstanding_slots
374 }
375
376 #[must_use]
378 pub fn runtime_counters(&self) -> TenantRuntimeCounters {
379 let published_count = self.state.published_count.load(Ordering::Acquire);
380 let drained_count = self.state.drained_count.load(Ordering::Acquire);
381 TenantRuntimeCounters {
382 tenant_id: self.state.id,
383 published_count,
384 drained_count,
385 outstanding_slots: vyre_driver::accounting::checked_sub_u64_lazy(
386 published_count,
387 drained_count,
388 || "tenant drained_count exceeded published_count. Fix: rebuild tenant accounting state.",
389 )
390 .unwrap_or_else(|message| panic!("{message}")),
391 max_outstanding_slots: self.state.max_outstanding_slots,
392 quiesce_calls: self.state.quiesce_calls.load(Ordering::Acquire),
393 quiesce_timeouts: self.state.quiesce_timeouts.load(Ordering::Acquire),
394 quiesce_wait_ns: self.state.quiesce_wait_ns.load(Ordering::Acquire),
395 }
396 }
397
398 pub fn note_drained(&self, count: u64) {
402 checked_atomic_add_u64(&self.state.drained_count, count, "tenant drained_count");
403 }
404
405 pub fn quiesce(&self, max_spins: u64) -> Result<(), TenantError> {
414 let started = Instant::now();
415 for poll in 0..max_spins {
416 let pub_count = self.state.published_count.load(Ordering::Acquire);
417 let drained = self.state.drained_count.load(Ordering::Acquire);
418 if drained >= pub_count {
419 self.record_quiesce(started, false);
420 return Ok(());
421 }
422 quiesce_idle(poll);
423 }
424 let pub_count = self.state.published_count.load(Ordering::Acquire);
425 let drained = self.state.drained_count.load(Ordering::Acquire);
426 self.record_quiesce(started, true);
427 Err(TenantError::QuiesceTimeout {
428 tenant_id: self.state.id,
429 outstanding: vyre_driver::accounting::checked_sub_u64_lazy(pub_count, drained, || {
430 TenantError::Pipeline(PipelineError::QueueFull {
431 queue: "tenant",
432 fix: "tenant drained_count exceeded published_count during quiesce; rebuild tenant accounting state",
433 })
434 })?,
435 })
436 }
437
438 fn record_quiesce(&self, started: Instant, timed_out: bool) {
439 checked_atomic_add_u64(&self.state.quiesce_calls, 1, "tenant quiesce_calls");
440 if timed_out {
441 checked_atomic_add_u64(&self.state.quiesce_timeouts, 1, "tenant quiesce_timeouts");
442 }
443 let elapsed_ns = u64::try_from(started.elapsed().as_nanos()).unwrap_or_else(|error| {
444 panic!(
445 "tenant quiesce elapsed nanoseconds cannot fit u64: {error}. Fix: quiesce with a bounded timeout."
446 )
447 });
448 checked_atomic_add_u64(
449 &self.state.quiesce_wait_ns,
450 elapsed_ns,
451 "tenant quiesce_wait_ns",
452 );
453 }
454}
455
456pub struct TenantRegistry {
459 tenants: DashMap<u32, TenantHandle>,
460 next_id: AtomicU32,
461}
462
463impl Default for TenantRegistry {
464 fn default() -> Self {
465 Self {
466 tenants: DashMap::new(),
467 next_id: AtomicU32::new(0),
468 }
469 }
470}
471
472#[derive(Debug, Default)]
474pub struct TenantSelectionScratch {
475 active_ids: Vec<u32>,
476 selected_indices: Vec<usize>,
477}
478
479impl TenantSelectionScratch {
480 #[must_use]
482 pub const fn new() -> Self {
483 Self {
484 active_ids: Vec::new(),
485 selected_indices: Vec::new(),
486 }
487 }
488}
489
490fn checked_atomic_add_u64(counter: &AtomicU64, value: u64, label: &'static str) {
491 vyre_driver::accounting::checked_atomic_add_u64_with_order(
492 counter,
493 value,
494 Ordering::Acquire,
495 Ordering::AcqRel,
496 Ordering::Acquire,
497 |_, _| {
498 format!("{label} overflowed u64. Fix: quiesce or recreate the tenant accounting state.")
499 },
500 )
501 .unwrap_or_else(|message| panic!("{message}"));
502}
503
504fn checked_atomic_sub_u64(counter: &AtomicU64, value: u64, label: &'static str) {
505 vyre_driver::accounting::checked_atomic_sub_u64_with_order(
506 counter,
507 value,
508 Ordering::Acquire,
509 Ordering::AcqRel,
510 Ordering::Acquire,
511 |_, _| format!("{label} underflowed u64. Fix: rebuild tenant accounting state."),
512 )
513 .unwrap_or_else(|message| panic!("{message}"));
514}
515
516impl TenantRegistry {
517 #[must_use]
519 pub fn new() -> Self {
520 Self::default()
521 }
522
523 pub fn register(&self, label: impl Into<String>) -> Result<TenantHandle, TenantError> {
532 self.register_with_backpressure(label, u64::MAX)
533 }
534
535 pub fn register_with_backpressure(
542 &self,
543 label: impl Into<String>,
544 max_outstanding_slots: u64,
545 ) -> Result<TenantHandle, TenantError> {
546 let mut registration_retries = 0u64;
547 let issued = vyre_driver::accounting::checked_atomic_update_u32_with_order(
548 &self.next_id,
549 Ordering::Relaxed,
550 Ordering::SeqCst,
551 Ordering::Relaxed,
552 |current| {
553 if current >= TENANT_ID_MAX {
554 return Err(TenantError::RegistryFull { issued: current });
555 }
556 let id = current.max(1);
557 id.checked_add(1)
558 .ok_or(TenantError::RegistryFull { issued: current })
559 },
560 |_, _| {
561 tenant_registry_retry_idle(registration_retries);
562 registration_retries = vyre_driver::accounting::checked_add_u64_lazy(
563 registration_retries,
564 1,
565 || {
566 TenantError::Pipeline(PipelineError::QueueFull {
567 queue: "tenant",
568 fix: "tenant registration retry counter overflowed u64; retry registration later",
569 })
570 },
571 )?;
572 Ok(())
573 },
574 )?;
575 let id = issued.max(1);
576
577 let tenant_offset = vyre_driver::accounting::checked_mul_u32_value(
578 id,
579 OPCODE_RANGE_PER_TENANT,
580 TenantError::RegistryFull { issued },
581 )?;
582 let base_opcode = vyre_driver::accounting::checked_add_u32_value(
583 TENANT_OPCODE_BASE,
584 tenant_offset,
585 TenantError::RegistryFull { issued },
586 )?;
587 let top_opcode = vyre_driver::accounting::checked_add_u32_value(
588 base_opcode,
589 OPCODE_RANGE_PER_TENANT,
590 TenantError::RegistryFull { issued },
591 )?;
592 if top_opcode == SHUTDOWN {
593 return Err(TenantError::RegistryFull { issued });
594 }
595 let handle = TenantHandle {
596 state: Arc::new(TenantState {
597 id,
598 base_opcode,
599 opcode_cap: OPCODE_RANGE_PER_TENANT,
600 published_count: AtomicU64::new(0),
601 max_outstanding_slots: max_outstanding_slots.max(1),
602 drained_count: AtomicU64::new(0),
603 quiesce_calls: AtomicU64::new(0),
604 quiesce_timeouts: AtomicU64::new(0),
605 quiesce_wait_ns: AtomicU64::new(0),
606 revoked: AtomicU32::new(0),
607 label: label.into(),
608 }),
609 };
610 self.tenants.insert(id, handle.clone());
611 Ok(handle)
612 }
613
614 pub fn unregister(&self, tenant_id: u32) -> Option<TenantHandle> {
619 let (_, handle) = self.tenants.remove(&tenant_id)?;
620 handle.state.revoked.store(1, Ordering::Release);
621 Some(handle)
622 }
623
624 #[must_use]
626 pub fn active_tenants(&self) -> Vec<TenantHandle> {
627 let mut out = Vec::with_capacity(self.tenants.len());
628 out.extend(self.tenants.iter().map(|entry| entry.value().clone()));
629 out.sort_by_key(TenantHandle::id);
630 out
631 }
632
633 pub fn active_tenants_into(&self, out: &mut Vec<TenantHandle>) {
635 out.clear();
636 out.reserve(self.tenants.len());
637 self.tenants
638 .iter()
639 .for_each(|entry| out.push(entry.value().clone()));
640 out.sort_by_key(TenantHandle::id);
641 }
642
643 #[must_use]
646 pub fn lookup(&self, tenant_id: u32) -> Option<TenantHandle> {
647 self.tenants
648 .get(&tenant_id)
649 .map(|entry| entry.value().clone())
650 }
651
652 #[must_use]
654 pub fn runtime_counters(&self) -> Vec<TenantRuntimeCounters> {
655 let mut out = Vec::with_capacity(self.tenants.len());
656 self.tenants
657 .iter()
658 .map(|entry| entry.value().runtime_counters())
659 .for_each(|counters| out.push(counters));
660 out.sort_by_key(|counters| counters.tenant_id);
661 out
662 }
663
664 pub fn runtime_counters_into(&self, out: &mut Vec<TenantRuntimeCounters>) {
666 out.clear();
667 out.reserve(self.tenants.len());
668 self.tenants
669 .iter()
670 .map(|entry| entry.value().runtime_counters())
671 .for_each(|counters| out.push(counters));
672 out.sort_by_key(|counters| counters.tenant_id);
673 }
674
675 #[must_use]
684 pub fn select_concurrent_tenants(&self, conflict_adj: &[u32]) -> Vec<u32> {
685 let mut out = Vec::new();
686 let mut scratch = TenantSelectionScratch::new();
687 self.select_concurrent_tenants_into(conflict_adj, &mut out, &mut scratch);
688 out
689 }
690
691 pub fn select_concurrent_tenants_into(
693 &self,
694 conflict_adj: &[u32],
695 out: &mut Vec<u32>,
696 scratch: &mut TenantSelectionScratch,
697 ) {
698 out.clear();
699 scratch.active_ids.clear();
700 scratch.active_ids.reserve(self.tenants.len());
701 self.tenants
702 .iter()
703 .map(|entry| entry.value().id())
704 .for_each(|id| scratch.active_ids.push(id));
705 scratch.active_ids.sort_unstable();
706 let n = scratch.active_ids.len();
707 if n == 0 {
708 return;
709 }
710 if vyre_driver::accounting::checked_mul_usize_lazy(n, n, || ()).ok()
711 != Some(conflict_adj.len())
712 {
713 out.reserve(n);
716 out.extend(scratch.active_ids.iter().copied());
717 return;
718 }
719 if conflict_adj.iter().all(|conflict| *conflict == 0) {
720 out.reserve(n);
721 out.extend(scratch.active_ids.iter().copied());
722 return;
723 }
724 scratch.selected_indices.clear();
725 scratch.selected_indices.reserve(n);
726 'candidate: for candidate_idx in 0..n {
727 for &selected_idx in &scratch.selected_indices {
728 if conflict_adj[candidate_idx * n + selected_idx] != 0
729 || conflict_adj[selected_idx * n + candidate_idx] != 0
730 {
731 continue 'candidate;
732 }
733 }
734 scratch.selected_indices.push(candidate_idx);
735 }
736 out.reserve(scratch.selected_indices.len());
737 for &index in &scratch.selected_indices {
738 if let Some(&id) = scratch.active_ids.get(index) {
739 out.push(id);
740 }
741 }
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748
749 #[test]
750 fn two_tenants_get_distinct_id_and_opcode_ranges() {
751 let reg = TenantRegistry::new();
752 let a = reg
753 .register("scanner-a")
754 .expect("Fix: register a; restore this invariant before continuing.");
755 let b = reg
756 .register("scanner-b")
757 .expect("Fix: register b; restore this invariant before continuing.");
758 assert_ne!(a.id(), b.id());
759 assert!(a.base_opcode() + OPCODE_RANGE_PER_TENANT <= b.base_opcode());
760 assert_eq!(a.label(), "scanner-a");
761 assert_eq!(b.label(), "scanner-b");
762 }
763
764 #[test]
765 fn global_opcode_rejects_out_of_range_local() {
766 let reg = TenantRegistry::new();
767 let t = reg.register("soleno").unwrap();
768 let err = t
769 .global_opcode(OPCODE_RANGE_PER_TENANT)
770 .expect_err("oversized local opcode must reject");
771 assert!(matches!(err, TenantError::OpcodeOutOfRange { .. }));
772
773 let ok = t
774 .global_opcode(42)
775 .expect("Fix: 42 < cap; restore this invariant before continuing.");
776 assert_eq!(ok, t.base_opcode() + 42);
777 }
778
779 #[test]
780 fn publish_slot_writes_with_tenant_id_and_bumps_counter() {
781 let reg = TenantRegistry::new();
782 let t = reg.register("warpscan").unwrap();
783 let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
784
785 t.publish_slot(
786 &mut ring,
787 0,
788 7,
789 &[1, 2, 3],
790 )
791 .expect("Fix: publish; restore this invariant before continuing.");
792 assert_eq!(t.published_count(), 1);
793
794 let tenant_off = super::super::megakernel::protocol::TENANT_WORD as usize * 4;
796 let opcode_off = super::super::megakernel::protocol::OPCODE_WORD as usize * 4;
797 let stored_tenant =
798 u32::from_le_bytes(ring[tenant_off..tenant_off + 4].try_into().unwrap());
799 let stored_opcode =
800 u32::from_le_bytes(ring[opcode_off..opcode_off + 4].try_into().unwrap());
801 assert_eq!(stored_tenant, t.id());
802 assert_eq!(stored_opcode, t.base_opcode() + 7);
803 }
804
805 #[test]
806 fn unregister_blocks_future_publishes() {
807 let reg = TenantRegistry::new();
808 let t = reg.register("vein").unwrap();
809 let tenant_id = t.id();
810 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
811 t.publish_slot(&mut ring, 0, 0, &[0, 0, 0])
812 .expect("Fix: first publish ok; restore this invariant before continuing.");
813 reg.unregister(tenant_id)
814 .expect("Fix: unregister; restore this invariant before continuing.");
815 let err = t
816 .publish_slot(&mut ring, 1, 0, &[0, 0, 0])
817 .expect_err("publish after unregister must reject");
818 assert!(matches!(err, TenantError::Revoked { .. }));
819 assert!(reg.lookup(tenant_id).is_none());
820 }
821
822 #[test]
823 fn quiesce_returns_when_drained_catches_up() {
824 let reg = TenantRegistry::new();
825 let t = reg.register("t1").unwrap();
826 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
827 t.publish_slot(&mut ring, 0, 0, &[1, 2, 3]).unwrap();
828 t.publish_slot(&mut ring, 1, 0, &[4, 5, 6]).unwrap();
829 assert_eq!(t.published_count(), 2);
830 t.note_drained(2);
831 t.quiesce(1)
832 .expect("Fix: drained == published after note_drained; restore this invariant before continuing.");
833 let counters = t.runtime_counters();
834 assert_eq!(counters.published_count, 2);
835 assert_eq!(counters.drained_count, 2);
836 assert_eq!(counters.outstanding_slots, 0);
837 assert_eq!(counters.quiesce_calls, 1);
838 assert_eq!(counters.quiesce_timeouts, 0);
839 }
840
841 #[test]
842 fn quiesce_times_out_when_drain_stalled() {
843 let reg = TenantRegistry::new();
844 let t = reg.register("t2").unwrap();
845 let mut ring = Megakernel::try_encode_empty_ring(1).unwrap();
846 t.publish_slot(&mut ring, 0, 0, &[0, 0, 0]).unwrap();
847 let err = t.quiesce(4).expect_err("stalled quiesce must time out");
849 assert!(matches!(
850 err,
851 TenantError::QuiesceTimeout { outstanding: 1, .. }
852 ));
853 let counters = t.runtime_counters();
854 assert_eq!(counters.outstanding_slots, 1);
855 assert_eq!(counters.quiesce_calls, 1);
856 assert_eq!(counters.quiesce_timeouts, 1);
857 }
858
859 #[test]
860 fn bounded_tenant_backpressure_rejects_unbounded_publish_backlog() {
861 let reg = TenantRegistry::new();
862 let t = reg.register_with_backpressure("bounded", 2).unwrap();
863 let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
864
865 t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
866 t.publish_slot(&mut ring, 1, 0, &[2]).unwrap();
867 let err = t
868 .publish_slot(&mut ring, 2, 0, &[3])
869 .expect_err("third outstanding publish must hit tenant backpressure");
870 assert!(matches!(
871 err,
872 TenantError::Backpressure {
873 outstanding: 2,
874 cap: 2,
875 ..
876 }
877 ));
878 assert_eq!(t.published_count(), 2);
879 let counters = t.runtime_counters();
880 assert_eq!(counters.max_outstanding_slots, 2);
881 assert_eq!(counters.outstanding_slots, 2);
882 }
883
884 #[test]
885 fn tenant_backpressure_reopens_after_drain_progress() {
886 let reg = TenantRegistry::new();
887 let t = reg.register_with_backpressure("bounded", 1).unwrap();
888 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
889
890 t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
891 assert!(matches!(
892 t.publish_slot(&mut ring, 1, 0, &[2]).unwrap_err(),
893 TenantError::Backpressure { .. }
894 ));
895 t.note_drained(1);
896 t.publish_slot(&mut ring, 1, 0, &[2])
897 .expect("Fix: drain progress must reopen the bounded tenant queue; restore this invariant before continuing.");
898 assert_eq!(t.published_count(), 2);
899 assert_eq!(t.runtime_counters().outstanding_slots, 1);
900 }
901
902 #[test]
903 fn tenant_registry_registration_retry_uses_adaptive_idle_not_unbounded_spin() {
904 for retry in [0, 1, 2, QUIESCE_SPIN_POLLS - 1, QUIESCE_SPIN_POLLS] {
905 tenant_registry_retry_idle(retry);
906 }
907 assert_eq!(
908 quiesce_backoff_duration(QUIESCE_SPIN_POLLS),
909 QUIESCE_MIN_PARK
910 );
911 assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
912 }
913
914 #[test]
915 fn quiesce_backoff_is_bounded_and_monotonic() {
916 let samples = [
917 quiesce_backoff_duration(0),
918 quiesce_backoff_duration(1),
919 quiesce_backoff_duration(2),
920 quiesce_backoff_duration(8),
921 quiesce_backoff_duration(64),
922 ];
923 assert_eq!(samples[0], QUIESCE_MIN_PARK);
924 for pair in samples.windows(2) {
925 assert!(pair[0] <= pair[1], "quiesce backoff must not shrink");
926 assert!(pair[1] <= QUIESCE_MAX_PARK, "quiesce backoff must cap");
927 }
928 assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
929 }
930
931 #[test]
932 fn active_tenants_tracks_registrations() {
933 let reg = TenantRegistry::new();
934 let a = reg.register("a").unwrap();
935 let b = reg.register("b").unwrap();
936 let active: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
937 assert!(active.contains(&a.id()));
938 assert!(active.contains(&b.id()));
939 reg.unregister(a.id());
940 let after: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
941 assert!(!after.contains(&a.id()));
942 assert!(after.contains(&b.id()));
943 let counters: Vec<u32> = reg
944 .runtime_counters()
945 .iter()
946 .map(|tenant| tenant.tenant_id)
947 .collect();
948 assert_eq!(counters, vec![b.id()]);
949 }
950
951 #[test]
952 fn tenant_snapshots_reuse_caller_storage() {
953 let reg = TenantRegistry::new();
954 let a = reg.register("a").unwrap();
955 let b = reg.register("b").unwrap();
956 let mut active = Vec::with_capacity(2);
957 let mut counters = Vec::with_capacity(2);
958
959 reg.active_tenants_into(&mut active);
960 reg.runtime_counters_into(&mut counters);
961 let active_ptr = active.as_ptr();
962 let counters_ptr = counters.as_ptr();
963 reg.active_tenants_into(&mut active);
964 reg.runtime_counters_into(&mut counters);
965
966 assert_eq!(active.as_ptr(), active_ptr);
967 assert_eq!(counters.as_ptr(), counters_ptr);
968 assert!(active.iter().any(|tenant| tenant.id() == a.id()));
969 assert!(active.iter().any(|tenant| tenant.id() == b.id()));
970 assert!(counters.iter().any(|tenant| tenant.tenant_id == a.id()));
971 assert!(counters.iter().any(|tenant| tenant.tenant_id == b.id()));
972 }
973
974 #[test]
975 fn concurrent_tenant_selection_reuses_scratch_and_output() {
976 let reg = TenantRegistry::new();
977 let a = reg.register("a").unwrap();
978 let b = reg.register("b").unwrap();
979 let c = reg.register("c").unwrap();
980 let n = 3;
981 let mut conflicts = vec![0_u32; n * n];
982 conflicts[0 * n + 1] = 1;
983 conflicts[1 * n + 0] = 1;
984 let mut out = Vec::with_capacity(3);
985 let mut scratch = TenantSelectionScratch::new();
986
987 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
988 let out_ptr = out.as_ptr();
989 let active_ids_ptr = scratch.active_ids.as_ptr();
990 let selected_ptr = scratch.selected_indices.as_ptr();
991 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
992
993 assert_eq!(out.as_ptr(), out_ptr);
994 assert_eq!(scratch.active_ids.as_ptr(), active_ids_ptr);
995 assert_eq!(scratch.selected_indices.as_ptr(), selected_ptr);
996 assert!(out.contains(&a.id()) || out.contains(&b.id()));
997 assert!(!(out.contains(&a.id()) && out.contains(&b.id())));
998 assert!(out.contains(&c.id()));
999 }
1000
1001 #[test]
1002 fn concurrent_tenant_selection_fast_paths_all_zero_conflicts() {
1003 let reg = TenantRegistry::new();
1004 let a = reg.register("a").unwrap();
1005 let b = reg.register("b").unwrap();
1006 let c = reg.register("c").unwrap();
1007 let mut out = Vec::with_capacity(8);
1008 let mut scratch = TenantSelectionScratch::new();
1009 let conflicts = vec![0_u32; 9];
1010 let out_ptr = out.as_ptr();
1011
1012 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1013
1014 assert_eq!(out, vec![a.id(), b.id(), c.id()]);
1015 assert_eq!(
1016 out.as_ptr(),
1017 out_ptr,
1018 "all-zero conflict fast path must reuse caller-owned output storage"
1019 );
1020 assert!(
1021 scratch.selected_indices.is_empty(),
1022 "all-zero conflict fast path must not populate pairwise selection scratch"
1023 );
1024 }
1025
1026 #[test]
1027 fn concurrent_tenant_selection_respects_conflicts() {
1028 let reg = TenantRegistry::new();
1029 let a = reg.register("a").unwrap();
1030 let b = reg.register("b").unwrap();
1031 let c = reg.register("c").unwrap();
1032 let n = 3;
1033 let mut conflicts = vec![0_u32; n * n];
1034 conflicts[0 * n + 1] = 1;
1035 conflicts[1 * n + 0] = 1;
1036
1037 let selected = reg.select_concurrent_tenants(&conflicts);
1038
1039 assert!(selected.contains(&a.id()) || selected.contains(&b.id()));
1040 assert!(!(selected.contains(&a.id()) && selected.contains(&b.id())));
1041 assert!(selected.contains(&c.id()));
1042 }
1043
1044 #[test]
1045 fn concurrent_registration_assigns_unique_ids() {
1046 use std::thread;
1047 let reg = Arc::new(TenantRegistry::new());
1048 let mut handles = Vec::new();
1049 for i in 0..32 {
1050 let reg = Arc::clone(®);
1051 handles.push(thread::spawn(move || {
1052 reg.register(format!("t{i}")).unwrap().id()
1053 }));
1054 }
1055 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1056 let mut sorted = ids.clone();
1057 sorted.sort();
1058 sorted.dedup();
1059 assert_eq!(sorted.len(), ids.len(), "concurrent ids must be unique");
1060 }
1061}