1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use dashmap::DashMap;
44
45use crate::megakernel::protocol::opcode::SHUTDOWN;
46use crate::megakernel::Megakernel;
47use crate::PipelineError;
48
49pub const TENANT_OPCODE_BASE: u32 = 0x4000_0000;
54
55pub const TENANT_ID_MAX: u32 = u32::MAX - 1;
58
59pub const OPCODE_RANGE_PER_TENANT: u32 = 1 << 20;
64
65const QUIESCE_SPIN_POLLS: u64 = 64;
66const QUIESCE_MIN_PARK: Duration = Duration::from_micros(2);
67const QUIESCE_MAX_PARK: Duration = Duration::from_micros(50);
68const QUIESCE_BACKOFF_SHIFT_CAP: u64 = 5;
69
70#[allow(clippy::unnecessary_min_or_max)]
71fn quiesce_backoff_duration(poll: u64) -> Duration {
72 let parked_poll = poll.checked_sub(QUIESCE_SPIN_POLLS).unwrap_or(0);
73 let shift = u32::try_from(parked_poll.min(QUIESCE_BACKOFF_SHIFT_CAP)).unwrap_or_else(|error| {
74 panic!(
75 "tenant quiesce backoff shift cannot fit u32: {error}. Fix: lower QUIESCE_BACKOFF_SHIFT_CAP."
76 )
77 });
78 let multiplier = 1_u32.checked_shl(shift).unwrap_or_else(|| {
79 panic!("tenant quiesce backoff multiplier overflowed u32. Fix: lower shift cap.")
80 });
81 QUIESCE_MIN_PARK
82 .checked_mul(multiplier)
83 .unwrap_or_else(|| {
84 panic!("tenant quiesce backoff duration overflowed. Fix: lower quiesce park bounds.")
85 })
86 .min(QUIESCE_MAX_PARK)
87}
88
89fn quiesce_idle(poll: u64) {
90 if poll < QUIESCE_SPIN_POLLS {
91 std::hint::spin_loop();
92 } else {
93 std::thread::park_timeout(quiesce_backoff_duration(poll));
94 }
95}
96
97fn tenant_registry_retry_idle(retry: u64) {
98 if retry < QUIESCE_SPIN_POLLS {
99 std::hint::spin_loop();
100 } else {
101 std::thread::park_timeout(quiesce_backoff_duration(retry));
102 }
103}
104
105#[derive(Debug, thiserror::Error)]
107#[non_exhaustive]
108pub enum TenantError {
109 #[error("tenant registry exhausted after {issued} registrations. Fix: shrink OPCODE_RANGE_PER_TENANT or recycle tenants.")]
112 RegistryFull {
113 issued: u32,
115 },
116 #[error(
119 "tenant {tenant_id} published local opcode {local_opcode}; out of range [0, {cap}). \
120 Fix: caller must stay inside the opcode window returned by `register()`."
121 )]
122 OpcodeOutOfRange {
123 tenant_id: u32,
125 local_opcode: u32,
127 cap: u32,
129 },
130 #[error("tenant {tenant_id} was revoked; handle is stale. Fix: acquire a fresh handle from the registry.")]
132 Revoked {
133 tenant_id: u32,
135 },
136 #[error(
138 "tenant {tenant_id} quiesce timed out with {outstanding} inflight slots. \
139 Fix: ensure the megakernel is making progress (check DONE_COUNT) or raise the timeout."
140 )]
141 QuiesceTimeout {
142 tenant_id: u32,
144 outstanding: u64,
146 },
147 #[error(
149 "tenant {tenant_id} has {outstanding} outstanding slots, cap {cap}. \
150 Fix: wait for drain progress or register the tenant with a larger bounded backlog."
151 )]
152 Backpressure {
153 tenant_id: u32,
155 outstanding: u64,
157 cap: u64,
159 },
160 #[error("{0}")]
162 Pipeline(#[from] PipelineError),
163}
164
165struct TenantState {
168 id: u32,
169 base_opcode: u32,
170 opcode_cap: u32,
171 published_count: AtomicU64,
173 max_outstanding_slots: u64,
175 drained_count: AtomicU64,
178 quiesce_calls: AtomicU64,
180 quiesce_timeouts: AtomicU64,
182 quiesce_wait_ns: AtomicU64,
184 revoked: AtomicU32,
186 label: String,
188}
189
190#[derive(Clone)]
194pub struct TenantHandle {
195 state: Arc<TenantState>,
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct TenantRuntimeCounters {
201 pub tenant_id: u32,
203 pub published_count: u64,
205 pub drained_count: u64,
207 pub outstanding_slots: u64,
209 pub max_outstanding_slots: u64,
211 pub quiesce_calls: u64,
213 pub quiesce_timeouts: u64,
215 pub quiesce_wait_ns: u64,
217}
218
219impl std::fmt::Debug for TenantHandle {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("TenantHandle")
222 .field("id", &self.state.id)
223 .field("label", &self.state.label)
224 .field("base_opcode", &self.state.base_opcode)
225 .field(
226 "published_count",
227 &self.state.published_count.load(Ordering::Relaxed),
228 )
229 .field("max_outstanding_slots", &self.state.max_outstanding_slots)
230 .field(
231 "drained_count",
232 &self.state.drained_count.load(Ordering::Relaxed),
233 )
234 .field(
235 "revoked",
236 &(self.state.revoked.load(Ordering::Acquire) != 0),
237 )
238 .finish()
239 }
240}
241
242impl TenantHandle {
243 #[must_use]
245 pub fn id(&self) -> u32 {
246 self.state.id
247 }
248
249 #[must_use]
251 pub fn label(&self) -> &str {
252 &self.state.label
253 }
254
255 #[must_use]
257 pub fn base_opcode(&self) -> u32 {
258 self.state.base_opcode
259 }
260
261 pub fn global_opcode(&self, local: u32) -> Result<u32, TenantError> {
269 if local >= self.state.opcode_cap {
270 return Err(TenantError::OpcodeOutOfRange {
271 tenant_id: self.id(),
272 local_opcode: local,
273 cap: self.state.opcode_cap,
274 });
275 }
276 let global = self.state.base_opcode + local;
277 if let Err(e) = crate::megakernel::protocol::opcode::validate_user_opcode(global) {
278 return Err(TenantError::Pipeline(PipelineError::Backend(format!(
279 "tenant registry produced invalid global opcode {global}: {e}. Fix: repair tenant opcode window allocation before publishing."
280 ))));
281 }
282 Ok(global)
283 }
284
285 pub fn publish_slot(
297 &self,
298 ring_bytes: &mut [u8],
299 slot_idx: u32,
300 local_opcode: u32,
301 args: &[u32],
302 ) -> Result<(), TenantError> {
303 if self.state.revoked.load(Ordering::Acquire) != 0 {
304 return Err(TenantError::Revoked {
305 tenant_id: self.state.id,
306 });
307 }
308 let global = self.global_opcode(local_opcode)?;
309 self.reserve_publish_slot()?;
310 if let Err(error) =
311 Megakernel::publish_slot(ring_bytes, slot_idx, self.state.id, global, args)
312 {
313 checked_atomic_sub_u64(&self.state.published_count, 1, "tenant published rollback");
314 return Err(error.into());
315 }
316 Ok(())
317 }
318
319 fn reserve_publish_slot(&self) -> Result<(), TenantError> {
320 let cap = self.state.max_outstanding_slots;
321 vyre_driver::accounting::checked_atomic_update_u64_with_order(
322 &self.state.published_count,
323 Ordering::Acquire,
324 Ordering::AcqRel,
325 Ordering::Acquire,
326 |published| {
327 let drained = self.state.drained_count.load(Ordering::Acquire);
328 let outstanding =
329 vyre_driver::accounting::checked_sub_u64_lazy(published, drained, || {
330 TenantError::Pipeline(PipelineError::QueueFull {
331 queue: "tenant",
332 fix: "tenant drained_count exceeded published_count; rebuild tenant accounting state",
333 })
334 })?;
335 if outstanding >= cap {
336 return Err(TenantError::Backpressure {
337 tenant_id: self.state.id,
338 outstanding,
339 cap,
340 });
341 }
342 vyre_driver::accounting::checked_add_u64_lazy(published, 1, || {
343 TenantError::Pipeline(PipelineError::QueueFull {
344 queue: "tenant",
345 fix: "tenant published_count overflowed u64; quiesce or recreate the tenant before publishing more slots",
346 })
347 })
348 },
349 |_, _| Ok(()),
350 )?;
351 Ok(())
352 }
353
354 #[must_use]
356 pub fn published_count(&self) -> u64 {
357 self.state.published_count.load(Ordering::Relaxed)
358 }
359
360 #[must_use]
363 pub fn drained_count(&self) -> u64 {
364 self.state.drained_count.load(Ordering::Relaxed)
365 }
366
367 #[must_use]
369 pub fn max_outstanding_slots(&self) -> u64 {
370 self.state.max_outstanding_slots
371 }
372
373 #[must_use]
375 pub fn runtime_counters(&self) -> TenantRuntimeCounters {
376 let published_count = self.state.published_count.load(Ordering::Acquire);
377 let drained_count = self.state.drained_count.load(Ordering::Acquire);
378 TenantRuntimeCounters {
379 tenant_id: self.state.id,
380 published_count,
381 drained_count,
382 outstanding_slots: vyre_driver::accounting::checked_sub_u64_lazy(
383 published_count,
384 drained_count,
385 || "tenant drained_count exceeded published_count. Fix: rebuild tenant accounting state.",
386 )
387 .unwrap_or_else(|message| panic!("{message}")),
388 max_outstanding_slots: self.state.max_outstanding_slots,
389 quiesce_calls: self.state.quiesce_calls.load(Ordering::Acquire),
390 quiesce_timeouts: self.state.quiesce_timeouts.load(Ordering::Acquire),
391 quiesce_wait_ns: self.state.quiesce_wait_ns.load(Ordering::Acquire),
392 }
393 }
394
395 pub fn note_drained(&self, count: u64) {
399 checked_atomic_add_u64(&self.state.drained_count, count, "tenant drained_count");
400 }
401
402 pub fn quiesce(&self, max_spins: u64) -> Result<(), TenantError> {
411 let started = Instant::now();
412 for poll in 0..max_spins {
413 let pub_count = self.state.published_count.load(Ordering::Acquire);
414 let drained = self.state.drained_count.load(Ordering::Acquire);
415 if drained >= pub_count {
416 self.record_quiesce(started, false);
417 return Ok(());
418 }
419 quiesce_idle(poll);
420 }
421 let pub_count = self.state.published_count.load(Ordering::Acquire);
422 let drained = self.state.drained_count.load(Ordering::Acquire);
423 self.record_quiesce(started, true);
424 Err(TenantError::QuiesceTimeout {
425 tenant_id: self.state.id,
426 outstanding: vyre_driver::accounting::checked_sub_u64_lazy(pub_count, drained, || {
427 TenantError::Pipeline(PipelineError::QueueFull {
428 queue: "tenant",
429 fix: "tenant drained_count exceeded published_count during quiesce; rebuild tenant accounting state",
430 })
431 })?,
432 })
433 }
434
435 fn record_quiesce(&self, started: Instant, timed_out: bool) {
436 checked_atomic_add_u64(&self.state.quiesce_calls, 1, "tenant quiesce_calls");
437 if timed_out {
438 checked_atomic_add_u64(&self.state.quiesce_timeouts, 1, "tenant quiesce_timeouts");
439 }
440 let elapsed_ns = u64::try_from(started.elapsed().as_nanos()).unwrap_or_else(|error| {
441 panic!(
442 "tenant quiesce elapsed nanoseconds cannot fit u64: {error}. Fix: quiesce with a bounded timeout."
443 )
444 });
445 checked_atomic_add_u64(
446 &self.state.quiesce_wait_ns,
447 elapsed_ns,
448 "tenant quiesce_wait_ns",
449 );
450 }
451}
452
453pub struct TenantRegistry {
456 tenants: DashMap<u32, TenantHandle>,
457 next_id: AtomicU32,
458}
459
460impl Default for TenantRegistry {
461 fn default() -> Self {
462 Self {
463 tenants: DashMap::new(),
464 next_id: AtomicU32::new(0),
465 }
466 }
467}
468
469#[derive(Debug, Default)]
471pub struct TenantSelectionScratch {
472 active_ids: Vec<u32>,
473 selected_indices: Vec<usize>,
474}
475
476impl TenantSelectionScratch {
477 #[must_use]
479 pub const fn new() -> Self {
480 Self {
481 active_ids: Vec::new(),
482 selected_indices: Vec::new(),
483 }
484 }
485}
486
487fn checked_atomic_add_u64(counter: &AtomicU64, value: u64, label: &'static str) {
488 vyre_driver::accounting::checked_atomic_add_u64_with_order(
489 counter,
490 value,
491 Ordering::Acquire,
492 Ordering::AcqRel,
493 Ordering::Acquire,
494 |_, _| {
495 format!("{label} overflowed u64. Fix: quiesce or recreate the tenant accounting state.")
496 },
497 )
498 .unwrap_or_else(|message| panic!("{message}"));
499}
500
501fn checked_atomic_sub_u64(counter: &AtomicU64, value: u64, label: &'static str) {
502 vyre_driver::accounting::checked_atomic_sub_u64_with_order(
503 counter,
504 value,
505 Ordering::Acquire,
506 Ordering::AcqRel,
507 Ordering::Acquire,
508 |_, _| {
509 format!("{label} underflowed u64. Fix: rebuild tenant accounting state.")
510 },
511 )
512 .unwrap_or_else(|message| panic!("{message}"));
513}
514
515impl TenantRegistry {
516 #[must_use]
518 pub fn new() -> Self {
519 Self::default()
520 }
521
522 pub fn register(&self, label: impl Into<String>) -> Result<TenantHandle, TenantError> {
531 self.register_with_backpressure(label, u64::MAX)
532 }
533
534 pub fn register_with_backpressure(
541 &self,
542 label: impl Into<String>,
543 max_outstanding_slots: u64,
544 ) -> Result<TenantHandle, TenantError> {
545 let mut registration_retries = 0u64;
546 let issued = vyre_driver::accounting::checked_atomic_update_u32_with_order(
547 &self.next_id,
548 Ordering::Relaxed,
549 Ordering::SeqCst,
550 Ordering::Relaxed,
551 |current| {
552 if current >= TENANT_ID_MAX {
553 return Err(TenantError::RegistryFull { issued: current });
554 }
555 let id = current.max(1);
556 id.checked_add(1)
557 .ok_or(TenantError::RegistryFull { issued: current })
558 },
559 |_, _| {
560 tenant_registry_retry_idle(registration_retries);
561 registration_retries = vyre_driver::accounting::checked_add_u64_lazy(
562 registration_retries,
563 1,
564 || {
565 TenantError::Pipeline(PipelineError::QueueFull {
566 queue: "tenant",
567 fix: "tenant registration retry counter overflowed u64; retry registration later",
568 })
569 },
570 )?;
571 Ok(())
572 }
573 )?;
574 let id = issued.max(1);
575
576 let tenant_offset = vyre_driver::accounting::checked_mul_u32_value(
577 id,
578 OPCODE_RANGE_PER_TENANT,
579 TenantError::RegistryFull { issued },
580 )?;
581 let base_opcode = vyre_driver::accounting::checked_add_u32_value(
582 TENANT_OPCODE_BASE,
583 tenant_offset,
584 TenantError::RegistryFull { issued },
585 )?;
586 let top_opcode = vyre_driver::accounting::checked_add_u32_value(
587 base_opcode,
588 OPCODE_RANGE_PER_TENANT,
589 TenantError::RegistryFull { issued },
590 )?;
591 if top_opcode == SHUTDOWN {
592 return Err(TenantError::RegistryFull { issued });
593 }
594 let handle = TenantHandle {
595 state: Arc::new(TenantState {
596 id,
597 base_opcode,
598 opcode_cap: OPCODE_RANGE_PER_TENANT,
599 published_count: AtomicU64::new(0),
600 max_outstanding_slots: max_outstanding_slots.max(1),
601 drained_count: AtomicU64::new(0),
602 quiesce_calls: AtomicU64::new(0),
603 quiesce_timeouts: AtomicU64::new(0),
604 quiesce_wait_ns: AtomicU64::new(0),
605 revoked: AtomicU32::new(0),
606 label: label.into(),
607 }),
608 };
609 self.tenants.insert(id, handle.clone());
610 Ok(handle)
611 }
612
613 pub fn unregister(&self, tenant_id: u32) -> Option<TenantHandle> {
618 let (_, handle) = self.tenants.remove(&tenant_id)?;
619 handle.state.revoked.store(1, Ordering::Release);
620 Some(handle)
621 }
622
623 #[must_use]
625 pub fn active_tenants(&self) -> Vec<TenantHandle> {
626 let mut out = Vec::with_capacity(self.tenants.len());
627 out.extend(self.tenants.iter().map(|entry| entry.value().clone()));
628 out.sort_by_key(TenantHandle::id);
629 out
630 }
631
632 pub fn active_tenants_into(&self, out: &mut Vec<TenantHandle>) {
634 out.clear();
635 out.reserve(self.tenants.len());
636 self.tenants
637 .iter()
638 .for_each(|entry| out.push(entry.value().clone()));
639 out.sort_by_key(TenantHandle::id);
640 }
641
642 #[must_use]
645 pub fn lookup(&self, tenant_id: u32) -> Option<TenantHandle> {
646 self.tenants
647 .get(&tenant_id)
648 .map(|entry| entry.value().clone())
649 }
650
651 #[must_use]
653 pub fn runtime_counters(&self) -> Vec<TenantRuntimeCounters> {
654 let mut out = Vec::with_capacity(self.tenants.len());
655 self.tenants
656 .iter()
657 .map(|entry| entry.value().runtime_counters())
658 .for_each(|counters| out.push(counters));
659 out.sort_by_key(|counters| counters.tenant_id);
660 out
661 }
662
663 pub fn runtime_counters_into(&self, out: &mut Vec<TenantRuntimeCounters>) {
665 out.clear();
666 out.reserve(self.tenants.len());
667 self.tenants
668 .iter()
669 .map(|entry| entry.value().runtime_counters())
670 .for_each(|counters| out.push(counters));
671 out.sort_by_key(|counters| counters.tenant_id);
672 }
673
674 #[must_use]
683 pub fn select_concurrent_tenants(&self, conflict_adj: &[u32]) -> Vec<u32> {
684 let mut out = Vec::new();
685 let mut scratch = TenantSelectionScratch::new();
686 self.select_concurrent_tenants_into(conflict_adj, &mut out, &mut scratch);
687 out
688 }
689
690 pub fn select_concurrent_tenants_into(
692 &self,
693 conflict_adj: &[u32],
694 out: &mut Vec<u32>,
695 scratch: &mut TenantSelectionScratch,
696 ) {
697 out.clear();
698 scratch.active_ids.clear();
699 scratch.active_ids.reserve(self.tenants.len());
700 self.tenants
701 .iter()
702 .map(|entry| entry.value().id())
703 .for_each(|id| scratch.active_ids.push(id));
704 scratch.active_ids.sort_unstable();
705 let n = scratch.active_ids.len();
706 if n == 0 {
707 return;
708 }
709 if vyre_driver::accounting::checked_mul_usize_lazy(n, n, || ()).ok()
710 != Some(conflict_adj.len())
711 {
712 out.reserve(n);
715 out.extend(scratch.active_ids.iter().copied());
716 return;
717 }
718 if conflict_adj.iter().all(|conflict| *conflict == 0) {
719 out.reserve(n);
720 out.extend(scratch.active_ids.iter().copied());
721 return;
722 }
723 scratch.selected_indices.clear();
724 scratch.selected_indices.reserve(n);
725 'candidate: for candidate_idx in 0..n {
726 for &selected_idx in &scratch.selected_indices {
727 if conflict_adj[candidate_idx * n + selected_idx] != 0
728 || conflict_adj[selected_idx * n + candidate_idx] != 0
729 {
730 continue 'candidate;
731 }
732 }
733 scratch.selected_indices.push(candidate_idx);
734 }
735 out.reserve(scratch.selected_indices.len());
736 for &index in &scratch.selected_indices {
737 if let Some(&id) = scratch.active_ids.get(index) {
738 out.push(id);
739 }
740 }
741 }
742}
743
744#[cfg(test)]
745mod tests {
746 use super::*;
747
748 #[test]
749 fn two_tenants_get_distinct_id_and_opcode_ranges() {
750 let reg = TenantRegistry::new();
751 let a = reg
752 .register("scanner-a")
753 .expect("Fix: register a; restore this invariant before continuing.");
754 let b = reg
755 .register("scanner-b")
756 .expect("Fix: register b; restore this invariant before continuing.");
757 assert_ne!(a.id(), b.id());
758 assert!(a.base_opcode() + OPCODE_RANGE_PER_TENANT <= b.base_opcode());
759 assert_eq!(a.label(), "scanner-a");
760 assert_eq!(b.label(), "scanner-b");
761 }
762
763 #[test]
764 fn global_opcode_rejects_out_of_range_local() {
765 let reg = TenantRegistry::new();
766 let t = reg.register("soleno").unwrap();
767 let err = t
768 .global_opcode(OPCODE_RANGE_PER_TENANT)
769 .expect_err("oversized local opcode must reject");
770 assert!(matches!(err, TenantError::OpcodeOutOfRange { .. }));
771
772 let ok = t
773 .global_opcode(42)
774 .expect("Fix: 42 < cap; restore this invariant before continuing.");
775 assert_eq!(ok, t.base_opcode() + 42);
776 }
777
778 #[test]
779 fn publish_slot_writes_with_tenant_id_and_bumps_counter() {
780 let reg = TenantRegistry::new();
781 let t = reg.register("warpscan").unwrap();
782 let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
783
784 t.publish_slot(
785 &mut ring,
786 0,
787 7,
788 &[1, 2, 3],
789 )
790 .expect("Fix: publish; restore this invariant before continuing.");
791 assert_eq!(t.published_count(), 1);
792
793 let tenant_off = super::super::megakernel::protocol::TENANT_WORD as usize * 4;
795 let opcode_off = super::super::megakernel::protocol::OPCODE_WORD as usize * 4;
796 let stored_tenant =
797 u32::from_le_bytes(ring[tenant_off..tenant_off + 4].try_into().unwrap());
798 let stored_opcode =
799 u32::from_le_bytes(ring[opcode_off..opcode_off + 4].try_into().unwrap());
800 assert_eq!(stored_tenant, t.id());
801 assert_eq!(stored_opcode, t.base_opcode() + 7);
802 }
803
804 #[test]
805 fn unregister_blocks_future_publishes() {
806 let reg = TenantRegistry::new();
807 let t = reg.register("vein").unwrap();
808 let tenant_id = t.id();
809 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
810 t.publish_slot(&mut ring, 0, 0, &[0, 0, 0])
811 .expect("Fix: first publish ok; restore this invariant before continuing.");
812 reg.unregister(tenant_id)
813 .expect("Fix: unregister; restore this invariant before continuing.");
814 let err = t
815 .publish_slot(&mut ring, 1, 0, &[0, 0, 0])
816 .expect_err("publish after unregister must reject");
817 assert!(matches!(err, TenantError::Revoked { .. }));
818 assert!(reg.lookup(tenant_id).is_none());
819 }
820
821 #[test]
822 fn quiesce_returns_when_drained_catches_up() {
823 let reg = TenantRegistry::new();
824 let t = reg.register("t1").unwrap();
825 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
826 t.publish_slot(&mut ring, 0, 0, &[1, 2, 3]).unwrap();
827 t.publish_slot(&mut ring, 1, 0, &[4, 5, 6]).unwrap();
828 assert_eq!(t.published_count(), 2);
829 t.note_drained(2);
830 t.quiesce(1)
831 .expect("Fix: drained == published after note_drained; restore this invariant before continuing.");
832 let counters = t.runtime_counters();
833 assert_eq!(counters.published_count, 2);
834 assert_eq!(counters.drained_count, 2);
835 assert_eq!(counters.outstanding_slots, 0);
836 assert_eq!(counters.quiesce_calls, 1);
837 assert_eq!(counters.quiesce_timeouts, 0);
838 }
839
840 #[test]
841 fn quiesce_times_out_when_drain_stalled() {
842 let reg = TenantRegistry::new();
843 let t = reg.register("t2").unwrap();
844 let mut ring = Megakernel::try_encode_empty_ring(1).unwrap();
845 t.publish_slot(&mut ring, 0, 0, &[0, 0, 0]).unwrap();
846 let err = t.quiesce(4).expect_err("stalled quiesce must time out");
848 assert!(matches!(
849 err,
850 TenantError::QuiesceTimeout { outstanding: 1, .. }
851 ));
852 let counters = t.runtime_counters();
853 assert_eq!(counters.outstanding_slots, 1);
854 assert_eq!(counters.quiesce_calls, 1);
855 assert_eq!(counters.quiesce_timeouts, 1);
856 }
857
858 #[test]
859 fn bounded_tenant_backpressure_rejects_unbounded_publish_backlog() {
860 let reg = TenantRegistry::new();
861 let t = reg.register_with_backpressure("bounded", 2).unwrap();
862 let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
863
864 t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
865 t.publish_slot(&mut ring, 1, 0, &[2]).unwrap();
866 let err = t
867 .publish_slot(&mut ring, 2, 0, &[3])
868 .expect_err("third outstanding publish must hit tenant backpressure");
869 assert!(matches!(
870 err,
871 TenantError::Backpressure {
872 outstanding: 2,
873 cap: 2,
874 ..
875 }
876 ));
877 assert_eq!(t.published_count(), 2);
878 let counters = t.runtime_counters();
879 assert_eq!(counters.max_outstanding_slots, 2);
880 assert_eq!(counters.outstanding_slots, 2);
881 }
882
883 #[test]
884 fn tenant_backpressure_reopens_after_drain_progress() {
885 let reg = TenantRegistry::new();
886 let t = reg.register_with_backpressure("bounded", 1).unwrap();
887 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
888
889 t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
890 assert!(matches!(
891 t.publish_slot(&mut ring, 1, 0, &[2]).unwrap_err(),
892 TenantError::Backpressure { .. }
893 ));
894 t.note_drained(1);
895 t.publish_slot(&mut ring, 1, 0, &[2])
896 .expect("Fix: drain progress must reopen the bounded tenant queue; restore this invariant before continuing.");
897 assert_eq!(t.published_count(), 2);
898 assert_eq!(t.runtime_counters().outstanding_slots, 1);
899 }
900
901 #[test]
902 fn tenant_registry_registration_retry_uses_adaptive_idle_not_unbounded_spin() {
903 for retry in [0, 1, 2, QUIESCE_SPIN_POLLS - 1, QUIESCE_SPIN_POLLS] {
904 tenant_registry_retry_idle(retry);
905 }
906 assert_eq!(
907 quiesce_backoff_duration(QUIESCE_SPIN_POLLS),
908 QUIESCE_MIN_PARK
909 );
910 assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
911 }
912
913 #[test]
914 fn quiesce_backoff_is_bounded_and_monotonic() {
915 let samples = [
916 quiesce_backoff_duration(0),
917 quiesce_backoff_duration(1),
918 quiesce_backoff_duration(2),
919 quiesce_backoff_duration(8),
920 quiesce_backoff_duration(64),
921 ];
922 assert_eq!(samples[0], QUIESCE_MIN_PARK);
923 for pair in samples.windows(2) {
924 assert!(pair[0] <= pair[1], "quiesce backoff must not shrink");
925 assert!(pair[1] <= QUIESCE_MAX_PARK, "quiesce backoff must cap");
926 }
927 assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
928 }
929
930 #[test]
931 fn active_tenants_tracks_registrations() {
932 let reg = TenantRegistry::new();
933 let a = reg.register("a").unwrap();
934 let b = reg.register("b").unwrap();
935 let active: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
936 assert!(active.contains(&a.id()));
937 assert!(active.contains(&b.id()));
938 reg.unregister(a.id());
939 let after: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
940 assert!(!after.contains(&a.id()));
941 assert!(after.contains(&b.id()));
942 let counters: Vec<u32> = reg
943 .runtime_counters()
944 .iter()
945 .map(|tenant| tenant.tenant_id)
946 .collect();
947 assert_eq!(counters, vec![b.id()]);
948 }
949
950 #[test]
951 fn tenant_snapshots_reuse_caller_storage() {
952 let reg = TenantRegistry::new();
953 let a = reg.register("a").unwrap();
954 let b = reg.register("b").unwrap();
955 let mut active = Vec::with_capacity(2);
956 let mut counters = Vec::with_capacity(2);
957
958 reg.active_tenants_into(&mut active);
959 reg.runtime_counters_into(&mut counters);
960 let active_ptr = active.as_ptr();
961 let counters_ptr = counters.as_ptr();
962 reg.active_tenants_into(&mut active);
963 reg.runtime_counters_into(&mut counters);
964
965 assert_eq!(active.as_ptr(), active_ptr);
966 assert_eq!(counters.as_ptr(), counters_ptr);
967 assert!(active.iter().any(|tenant| tenant.id() == a.id()));
968 assert!(active.iter().any(|tenant| tenant.id() == b.id()));
969 assert!(counters.iter().any(|tenant| tenant.tenant_id == a.id()));
970 assert!(counters.iter().any(|tenant| tenant.tenant_id == b.id()));
971 }
972
973 #[test]
974 fn concurrent_tenant_selection_reuses_scratch_and_output() {
975 let reg = TenantRegistry::new();
976 let a = reg.register("a").unwrap();
977 let b = reg.register("b").unwrap();
978 let c = reg.register("c").unwrap();
979 let n = 3;
980 let mut conflicts = vec![0_u32; n * n];
981 conflicts[0 * n + 1] = 1;
982 conflicts[1 * n + 0] = 1;
983 let mut out = Vec::with_capacity(3);
984 let mut scratch = TenantSelectionScratch::new();
985
986 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
987 let out_ptr = out.as_ptr();
988 let active_ids_ptr = scratch.active_ids.as_ptr();
989 let selected_ptr = scratch.selected_indices.as_ptr();
990 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
991
992 assert_eq!(out.as_ptr(), out_ptr);
993 assert_eq!(scratch.active_ids.as_ptr(), active_ids_ptr);
994 assert_eq!(scratch.selected_indices.as_ptr(), selected_ptr);
995 assert!(out.contains(&a.id()) || out.contains(&b.id()));
996 assert!(!(out.contains(&a.id()) && out.contains(&b.id())));
997 assert!(out.contains(&c.id()));
998 }
999
1000 #[test]
1001 fn concurrent_tenant_selection_fast_paths_all_zero_conflicts() {
1002 let reg = TenantRegistry::new();
1003 let a = reg.register("a").unwrap();
1004 let b = reg.register("b").unwrap();
1005 let c = reg.register("c").unwrap();
1006 let mut out = Vec::with_capacity(8);
1007 let mut scratch = TenantSelectionScratch::new();
1008 let conflicts = vec![0_u32; 9];
1009 let out_ptr = out.as_ptr();
1010
1011 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1012
1013 assert_eq!(out, vec![a.id(), b.id(), c.id()]);
1014 assert_eq!(
1015 out.as_ptr(),
1016 out_ptr,
1017 "all-zero conflict fast path must reuse caller-owned output storage"
1018 );
1019 assert!(
1020 scratch.selected_indices.is_empty(),
1021 "all-zero conflict fast path must not populate pairwise selection scratch"
1022 );
1023 }
1024
1025 #[test]
1026 fn concurrent_tenant_selection_respects_conflicts() {
1027 let reg = TenantRegistry::new();
1028 let a = reg.register("a").unwrap();
1029 let b = reg.register("b").unwrap();
1030 let c = reg.register("c").unwrap();
1031 let n = 3;
1032 let mut conflicts = vec![0_u32; n * n];
1033 conflicts[0 * n + 1] = 1;
1034 conflicts[1 * n + 0] = 1;
1035
1036 let selected = reg.select_concurrent_tenants(&conflicts);
1037
1038 assert!(selected.contains(&a.id()) || selected.contains(&b.id()));
1039 assert!(!(selected.contains(&a.id()) && selected.contains(&b.id())));
1040 assert!(selected.contains(&c.id()));
1041 }
1042
1043 #[test]
1044 fn concurrent_registration_assigns_unique_ids() {
1045 use std::thread;
1046 let reg = Arc::new(TenantRegistry::new());
1047 let mut handles = Vec::new();
1048 for i in 0..32 {
1049 let reg = Arc::clone(®);
1050 handles.push(thread::spawn(move || {
1051 reg.register(format!("t{i}")).unwrap().id()
1052 }));
1053 }
1054 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1055 let mut sorted = ids.clone();
1056 sorted.sort();
1057 sorted.dedup();
1058 assert_eq!(sorted.len(), ids.len(), "concurrent ids must be unique");
1059 }
1060}
1061