1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use dashmap::DashMap;
44
45use crate::megakernel::protocol::opcode::SHUTDOWN;
46use crate::megakernel::Megakernel;
47use crate::PipelineError;
48
49pub const TENANT_OPCODE_BASE: u32 = 0x4000_0000;
54
55pub const TENANT_ID_MAX: u32 = u32::MAX - 1;
58
59pub const OPCODE_RANGE_PER_TENANT: u32 = 1 << 20;
64
65const QUIESCE_SPIN_POLLS: u64 = 64;
66const QUIESCE_MIN_PARK: Duration = Duration::from_micros(2);
67const QUIESCE_MAX_PARK: Duration = Duration::from_micros(50);
68const QUIESCE_BACKOFF_SHIFT_CAP: u64 = 5;
69
70#[allow(clippy::unnecessary_min_or_max)]
71fn quiesce_backoff_duration(poll: u64) -> Duration {
72 let parked_poll = poll.checked_sub(QUIESCE_SPIN_POLLS).unwrap_or(0);
73 let shift = parked_poll.min(QUIESCE_BACKOFF_SHIFT_CAP) as u32;
74 let multiplier = 1_u32 << shift;
75 QUIESCE_MIN_PARK
76 .checked_mul(multiplier)
77 .unwrap_or(QUIESCE_MAX_PARK)
78 .min(QUIESCE_MAX_PARK)
79}
80
81fn quiesce_idle(poll: u64) {
82 if poll < QUIESCE_SPIN_POLLS {
83 std::hint::spin_loop();
84 } else {
85 std::thread::park_timeout(quiesce_backoff_duration(poll));
86 }
87}
88
89fn tenant_registry_retry_idle(retry: u64) {
90 if retry < QUIESCE_SPIN_POLLS {
91 std::hint::spin_loop();
92 } else {
93 std::thread::park_timeout(quiesce_backoff_duration(retry));
94 }
95}
96
97#[derive(Debug, thiserror::Error)]
99#[non_exhaustive]
100pub enum TenantError {
101 #[error("tenant registry exhausted after {issued} registrations. Fix: shrink OPCODE_RANGE_PER_TENANT or recycle tenants.")]
104 RegistryFull {
105 issued: u32,
107 },
108 #[error(
111 "tenant {tenant_id} published local opcode {local_opcode}; out of range [0, {cap}). \
112 Fix: caller must stay inside the opcode window returned by `register()`."
113 )]
114 OpcodeOutOfRange {
115 tenant_id: u32,
117 local_opcode: u32,
119 cap: u32,
121 },
122 #[error("tenant {tenant_id} was revoked; handle is stale. Fix: acquire a fresh handle from the registry.")]
124 Revoked {
125 tenant_id: u32,
127 },
128 #[error(
130 "tenant {tenant_id} quiesce timed out with {outstanding} inflight slots. \
131 Fix: ensure the megakernel is making progress (check DONE_COUNT) or raise the timeout."
132 )]
133 QuiesceTimeout {
134 tenant_id: u32,
136 outstanding: u64,
138 },
139 #[error(
141 "tenant {tenant_id} has {outstanding} outstanding slots, cap {cap}. \
142 Fix: wait for drain progress or register the tenant with a larger bounded backlog."
143 )]
144 Backpressure {
145 tenant_id: u32,
147 outstanding: u64,
149 cap: u64,
151 },
152 #[error(
154 "tenant {tenant_id} requested {requested} staging bytes with {used} already reserved, cap {cap}. \
155 Fix: release staging reservations after publish/readback progress or register the tenant with a larger bounded staging budget."
156 )]
157 StagingBackpressure {
158 tenant_id: u32,
160 requested: u64,
162 used: u64,
164 cap: u64,
166 },
167 #[error(
169 "tenant {tenant_id} requested {requested} resident handles with {used} already reserved, cap {cap}. \
170 Fix: release resident handles when backend ownership ends or register the tenant with a larger bounded resident-handle budget."
171 )]
172 ResidentHandleBackpressure {
173 tenant_id: u32,
175 requested: u64,
177 used: u64,
179 cap: u64,
181 },
182 #[error(
184 "tenant {tenant_id} released {requested} {resource} with only {used} reserved. \
185 Fix: pair every tenant resource release with a successful reservation."
186 )]
187 ResourceUnderflow {
188 tenant_id: u32,
190 resource: &'static str,
192 requested: u64,
194 used: u64,
196 },
197 #[error("{0}")]
199 Pipeline(#[from] PipelineError),
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub struct TenantQuota {
205 pub max_outstanding_slots: u64,
207 pub max_staging_bytes: u64,
209 pub max_resident_handles: u64,
211}
212
213impl TenantQuota {
214 #[must_use]
218 pub const fn unbounded() -> Self {
219 Self {
220 max_outstanding_slots: u64::MAX,
221 max_staging_bytes: u64::MAX,
222 max_resident_handles: u64::MAX,
223 }
224 }
225
226 #[must_use]
228 pub const fn bounded(
229 max_outstanding_slots: u64,
230 max_staging_bytes: u64,
231 max_resident_handles: u64,
232 ) -> Self {
233 Self {
234 max_outstanding_slots,
235 max_staging_bytes,
236 max_resident_handles,
237 }
238 }
239}
240
241struct TenantState {
244 id: u32,
245 base_opcode: u32,
246 opcode_cap: u32,
247 published_count: AtomicU64,
249 max_outstanding_slots: u64,
251 staging_bytes: AtomicU64,
253 max_staging_bytes: u64,
255 resident_handles: AtomicU64,
257 max_resident_handles: u64,
259 drained_count: AtomicU64,
262 quiesce_calls: AtomicU64,
264 quiesce_timeouts: AtomicU64,
266 quiesce_wait_ns: AtomicU64,
268 revoked: AtomicU32,
270 label: String,
272}
273
274#[derive(Clone)]
278pub struct TenantHandle {
279 state: Arc<TenantState>,
280}
281
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub struct TenantRuntimeCounters {
285 pub tenant_id: u32,
287 pub published_count: u64,
289 pub drained_count: u64,
291 pub outstanding_slots: u64,
293 pub max_outstanding_slots: u64,
295 pub quiesce_calls: u64,
297 pub quiesce_timeouts: u64,
299 pub quiesce_wait_ns: u64,
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub struct TenantQuotaCounters {
306 pub tenant_id: u32,
308 pub staging_bytes: u64,
310 pub max_staging_bytes: u64,
312 pub resident_handles: u64,
314 pub max_resident_handles: u64,
316}
317
318impl std::fmt::Debug for TenantHandle {
319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320 f.debug_struct("TenantHandle")
321 .field("id", &self.state.id)
322 .field("label", &self.state.label)
323 .field("base_opcode", &self.state.base_opcode)
324 .field(
325 "published_count",
326 &self.state.published_count.load(Ordering::Relaxed),
327 )
328 .field("max_outstanding_slots", &self.state.max_outstanding_slots)
329 .field(
330 "staging_bytes",
331 &self.state.staging_bytes.load(Ordering::Relaxed),
332 )
333 .field("max_staging_bytes", &self.state.max_staging_bytes)
334 .field(
335 "resident_handles",
336 &self.state.resident_handles.load(Ordering::Relaxed),
337 )
338 .field("max_resident_handles", &self.state.max_resident_handles)
339 .field(
340 "drained_count",
341 &self.state.drained_count.load(Ordering::Relaxed),
342 )
343 .field(
344 "revoked",
345 &(self.state.revoked.load(Ordering::Acquire) != 0),
346 )
347 .finish()
348 }
349}
350
351impl TenantHandle {
352 #[must_use]
354 pub fn id(&self) -> u32 {
355 self.state.id
356 }
357
358 #[must_use]
360 pub fn label(&self) -> &str {
361 &self.state.label
362 }
363
364 #[must_use]
366 pub fn base_opcode(&self) -> u32 {
367 self.state.base_opcode
368 }
369
370 pub fn global_opcode(&self, local: u32) -> Result<u32, TenantError> {
378 self.ensure_not_revoked()?;
379 if local >= self.state.opcode_cap {
380 return Err(TenantError::OpcodeOutOfRange {
381 tenant_id: self.id(),
382 local_opcode: local,
383 cap: self.state.opcode_cap,
384 });
385 }
386 let global = self.state.base_opcode + local;
387 if let Err(e) = crate::megakernel::protocol::opcode::validate_user_opcode(global) {
388 return Err(TenantError::Pipeline(PipelineError::Backend(format!(
389 "tenant registry produced invalid global opcode {global}: {e}. Fix: repair tenant opcode window allocation before publishing."
390 ))));
391 }
392 Ok(global)
393 }
394
395 pub fn publish_slot(
407 &self,
408 ring_bytes: &mut [u8],
409 slot_idx: u32,
410 local_opcode: u32,
411 args: &[u32],
412 ) -> Result<(), TenantError> {
413 self.ensure_not_revoked()?;
414 let global = self.global_opcode(local_opcode)?;
415 self.reserve_publish_slot()?;
416 if let Err(error) =
417 Megakernel::publish_slot(ring_bytes, slot_idx, self.state.id, global, args)
418 {
419 saturating_atomic_sub_u64(&self.state.published_count, 1, "tenant published rollback");
420 return Err(error.into());
421 }
422 Ok(())
423 }
424
425 fn ensure_not_revoked(&self) -> Result<(), TenantError> {
426 if self.state.revoked.load(Ordering::Acquire) != 0 {
427 return Err(TenantError::Revoked {
428 tenant_id: self.state.id,
429 });
430 }
431 Ok(())
432 }
433
434 fn reserve_publish_slot(&self) -> Result<(), TenantError> {
435 let cap = self.state.max_outstanding_slots;
436 vyre_driver::accounting::checked_atomic_update_u64_with_order(
437 &self.state.published_count,
438 Ordering::Acquire,
439 Ordering::AcqRel,
440 Ordering::Acquire,
441 |published| {
442 let drained = self.state.drained_count.load(Ordering::Acquire);
443 let outstanding = vyre_driver::accounting::checked_sub_u64_lazy(
444 published,
445 drained,
446 || {
447 TenantError::Pipeline(PipelineError::QueueFull {
448 queue: "tenant",
449 fix: "tenant drained_count exceeded published_count; rebuild tenant accounting state",
450 })
451 },
452 )?;
453 if outstanding >= cap {
454 return Err(TenantError::Backpressure {
455 tenant_id: self.state.id,
456 outstanding,
457 cap,
458 });
459 }
460 vyre_driver::accounting::checked_add_u64_lazy(published, 1, || {
461 TenantError::Pipeline(PipelineError::QueueFull {
462 queue: "tenant",
463 fix: "tenant published_count overflowed u64; quiesce or recreate the tenant before publishing more slots",
464 })
465 })
466 },
467 |_, _| Ok(()),
468 )?;
469 Ok(())
470 }
471
472 #[must_use]
474 pub fn published_count(&self) -> u64 {
475 self.state.published_count.load(Ordering::Relaxed)
476 }
477
478 #[must_use]
481 pub fn drained_count(&self) -> u64 {
482 self.state.drained_count.load(Ordering::Relaxed)
483 }
484
485 #[must_use]
487 pub fn max_outstanding_slots(&self) -> u64 {
488 self.state.max_outstanding_slots
489 }
490
491 pub fn reserve_staging_bytes(&self, byte_count: u64) -> Result<(), TenantError> {
493 self.ensure_not_revoked()?;
494 reserve_resource_quota(
495 &self.state.staging_bytes,
496 byte_count,
497 self.state.max_staging_bytes,
498 || {
499 TenantError::StagingBackpressure {
500 tenant_id: self.state.id,
501 requested: byte_count,
502 used: self.state.staging_bytes.load(Ordering::Acquire),
503 cap: self.state.max_staging_bytes,
504 }
505 },
506 "tenant staging byte reservation overflowed u64; release staging reservations or recreate the tenant before reserving more bytes",
507 )
508 }
509
510 pub fn release_staging_bytes(&self, byte_count: u64) -> Result<(), TenantError> {
512 release_resource_quota(
513 &self.state.staging_bytes,
514 byte_count,
515 self.state.id,
516 "staging bytes",
517 )
518 }
519
520 pub fn reserve_resident_handles(&self, handle_count: u64) -> Result<(), TenantError> {
522 self.ensure_not_revoked()?;
523 reserve_resource_quota(
524 &self.state.resident_handles,
525 handle_count,
526 self.state.max_resident_handles,
527 || {
528 TenantError::ResidentHandleBackpressure {
529 tenant_id: self.state.id,
530 requested: handle_count,
531 used: self.state.resident_handles.load(Ordering::Acquire),
532 cap: self.state.max_resident_handles,
533 }
534 },
535 "tenant resident handle reservation overflowed u64; release resident handles or recreate the tenant before reserving more handles",
536 )
537 }
538
539 pub fn release_resident_handles(&self, handle_count: u64) -> Result<(), TenantError> {
541 release_resource_quota(
542 &self.state.resident_handles,
543 handle_count,
544 self.state.id,
545 "resident handles",
546 )
547 }
548
549 #[must_use]
551 pub fn quota_counters(&self) -> TenantQuotaCounters {
552 TenantQuotaCounters {
553 tenant_id: self.state.id,
554 staging_bytes: self.state.staging_bytes.load(Ordering::Acquire),
555 max_staging_bytes: self.state.max_staging_bytes,
556 resident_handles: self.state.resident_handles.load(Ordering::Acquire),
557 max_resident_handles: self.state.max_resident_handles,
558 }
559 }
560
561 fn release_all_resource_reservations(&self) {
562 self.state.staging_bytes.store(0, Ordering::Release);
563 self.state.resident_handles.store(0, Ordering::Release);
564 }
565
566 #[must_use]
568 pub fn runtime_counters(&self) -> TenantRuntimeCounters {
569 let published_count = self.state.published_count.load(Ordering::Acquire);
570 let drained_count = self.state.drained_count.load(Ordering::Acquire);
571 TenantRuntimeCounters {
572 tenant_id: self.state.id,
573 published_count,
574 drained_count,
575 outstanding_slots: published_count.saturating_sub(drained_count),
576 max_outstanding_slots: self.state.max_outstanding_slots,
577 quiesce_calls: self.state.quiesce_calls.load(Ordering::Acquire),
578 quiesce_timeouts: self.state.quiesce_timeouts.load(Ordering::Acquire),
579 quiesce_wait_ns: self.state.quiesce_wait_ns.load(Ordering::Acquire),
580 }
581 }
582
583 pub fn note_drained(&self, count: u64) {
587 saturating_atomic_add_u64(&self.state.drained_count, count, "tenant drained_count");
588 }
589
590 pub fn quiesce(&self, max_spins: u64) -> Result<(), TenantError> {
599 let started = Instant::now();
600 for poll in 0..max_spins {
601 let pub_count = self.state.published_count.load(Ordering::Acquire);
602 let drained = self.state.drained_count.load(Ordering::Acquire);
603 if drained >= pub_count {
604 self.record_quiesce(started, false);
605 return Ok(());
606 }
607 quiesce_idle(poll);
608 }
609 let pub_count = self.state.published_count.load(Ordering::Acquire);
610 let drained = self.state.drained_count.load(Ordering::Acquire);
611 self.record_quiesce(started, true);
612 Err(TenantError::QuiesceTimeout {
613 tenant_id: self.state.id,
614 outstanding: vyre_driver::accounting::checked_sub_u64_lazy(pub_count, drained, || {
615 TenantError::Pipeline(PipelineError::QueueFull {
616 queue: "tenant",
617 fix: "tenant drained_count exceeded published_count during quiesce; rebuild tenant accounting state",
618 })
619 })?,
620 })
621 }
622
623 fn record_quiesce(&self, started: Instant, timed_out: bool) {
624 saturating_atomic_add_u64(&self.state.quiesce_calls, 1, "tenant quiesce_calls");
625 if timed_out {
626 saturating_atomic_add_u64(&self.state.quiesce_timeouts, 1, "tenant quiesce_timeouts");
627 }
628 let elapsed_ns = match u64::try_from(started.elapsed().as_nanos()) {
629 Ok(elapsed_ns) => elapsed_ns,
630 Err(_) => u64::MAX,
631 };
632 saturating_atomic_add_u64(
633 &self.state.quiesce_wait_ns,
634 elapsed_ns,
635 "tenant quiesce_wait_ns",
636 );
637 }
638}
639
640pub struct TenantRegistry {
643 tenants: DashMap<u32, TenantHandle>,
644 next_id: AtomicU32,
645}
646
647impl Default for TenantRegistry {
648 fn default() -> Self {
649 Self {
650 tenants: DashMap::new(),
651 next_id: AtomicU32::new(0),
652 }
653 }
654}
655
656#[derive(Debug, Default)]
658pub struct TenantSelectionScratch {
659 active_ids: Vec<u32>,
660 selected_indices: Vec<usize>,
661}
662
663impl TenantSelectionScratch {
664 #[must_use]
666 pub const fn new() -> Self {
667 Self {
668 active_ids: Vec::new(),
669 selected_indices: Vec::new(),
670 }
671 }
672}
673
674fn saturating_atomic_add_u64(counter: &AtomicU64, value: u64, _label: &'static str) {
675 let mut current = counter.load(Ordering::Acquire);
676 loop {
677 let next = current.saturating_add(value);
678 match counter.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire) {
679 Ok(_) => return,
680 Err(observed) => current = observed,
681 }
682 }
683}
684
685fn saturating_atomic_sub_u64(counter: &AtomicU64, value: u64, _label: &'static str) {
686 let mut current = counter.load(Ordering::Acquire);
687 loop {
688 let next = current.saturating_sub(value);
689 match counter.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire) {
690 Ok(_) => return,
691 Err(observed) => current = observed,
692 }
693 }
694}
695
696fn reserve_resource_quota(
697 counter: &AtomicU64,
698 value: u64,
699 cap: u64,
700 backpressure: impl Fn() -> TenantError,
701 overflow_fix: &'static str,
702) -> Result<(), TenantError> {
703 vyre_driver::accounting::checked_atomic_update_u64_with_order(
704 counter,
705 Ordering::Acquire,
706 Ordering::AcqRel,
707 Ordering::Acquire,
708 |used| {
709 let next = vyre_driver::accounting::checked_add_u64_lazy(used, value, || {
710 TenantError::Pipeline(PipelineError::QueueFull {
711 queue: "tenant resource quota",
712 fix: overflow_fix,
713 })
714 })?;
715 if next > cap {
716 return Err(backpressure());
717 }
718 Ok(next)
719 },
720 |_, _| Ok(()),
721 )?;
722 Ok(())
723}
724
725fn release_resource_quota(
726 counter: &AtomicU64,
727 value: u64,
728 tenant_id: u32,
729 resource: &'static str,
730) -> Result<(), TenantError> {
731 vyre_driver::accounting::checked_atomic_update_u64_with_order(
732 counter,
733 Ordering::Acquire,
734 Ordering::AcqRel,
735 Ordering::Acquire,
736 |used| {
737 used.checked_sub(value)
738 .ok_or(TenantError::ResourceUnderflow {
739 tenant_id,
740 resource,
741 requested: value,
742 used,
743 })
744 },
745 |_, _| Ok(()),
746 )?;
747 Ok(())
748}
749
750impl TenantRegistry {
751 #[must_use]
753 pub fn new() -> Self {
754 Self::default()
755 }
756
757 pub fn register(&self, label: impl Into<String>) -> Result<TenantHandle, TenantError> {
766 self.register_with_backpressure(label, u64::MAX)
767 }
768
769 pub fn register_with_backpressure(
776 &self,
777 label: impl Into<String>,
778 max_outstanding_slots: u64,
779 ) -> Result<TenantHandle, TenantError> {
780 self.register_with_quotas(
781 label,
782 TenantQuota {
783 max_outstanding_slots,
784 ..TenantQuota::unbounded()
785 },
786 )
787 }
788
789 pub fn register_with_quotas(
797 &self,
798 label: impl Into<String>,
799 quota: TenantQuota,
800 ) -> Result<TenantHandle, TenantError> {
801 let mut registration_retries = 0u64;
802 let issued = vyre_driver::accounting::checked_atomic_update_u32_with_order(
803 &self.next_id,
804 Ordering::Relaxed,
805 Ordering::SeqCst,
806 Ordering::Relaxed,
807 |current| {
808 if current >= TENANT_ID_MAX {
809 return Err(TenantError::RegistryFull { issued: current });
810 }
811 let id = current.max(1);
812 id.checked_add(1)
813 .ok_or(TenantError::RegistryFull { issued: current })
814 },
815 |_, _| {
816 tenant_registry_retry_idle(registration_retries);
817 registration_retries = vyre_driver::accounting::checked_add_u64_lazy(
818 registration_retries,
819 1,
820 || {
821 TenantError::Pipeline(PipelineError::QueueFull {
822 queue: "tenant",
823 fix: "tenant registration retry counter overflowed u64; retry registration later",
824 })
825 },
826 )?;
827 Ok(())
828 },
829 )?;
830 let id = issued.max(1);
831
832 let tenant_offset = vyre_driver::accounting::checked_mul_u32_value(
833 id,
834 OPCODE_RANGE_PER_TENANT,
835 TenantError::RegistryFull { issued },
836 )?;
837 let base_opcode = vyre_driver::accounting::checked_add_u32_value(
838 TENANT_OPCODE_BASE,
839 tenant_offset,
840 TenantError::RegistryFull { issued },
841 )?;
842 let top_opcode = vyre_driver::accounting::checked_add_u32_value(
843 base_opcode,
844 OPCODE_RANGE_PER_TENANT,
845 TenantError::RegistryFull { issued },
846 )?;
847 if top_opcode == SHUTDOWN {
848 return Err(TenantError::RegistryFull { issued });
849 }
850 let handle = TenantHandle {
851 state: Arc::new(TenantState {
852 id,
853 base_opcode,
854 opcode_cap: OPCODE_RANGE_PER_TENANT,
855 published_count: AtomicU64::new(0),
856 max_outstanding_slots: quota.max_outstanding_slots.max(1),
857 staging_bytes: AtomicU64::new(0),
858 max_staging_bytes: quota.max_staging_bytes.max(1),
859 resident_handles: AtomicU64::new(0),
860 max_resident_handles: quota.max_resident_handles.max(1),
861 drained_count: AtomicU64::new(0),
862 quiesce_calls: AtomicU64::new(0),
863 quiesce_timeouts: AtomicU64::new(0),
864 quiesce_wait_ns: AtomicU64::new(0),
865 revoked: AtomicU32::new(0),
866 label: label.into(),
867 }),
868 };
869 self.tenants.insert(id, handle.clone());
870 Ok(handle)
871 }
872
873 pub fn unregister(&self, tenant_id: u32) -> Option<TenantHandle> {
878 let (_, handle) = self.tenants.remove(&tenant_id)?;
879 handle.state.revoked.store(1, Ordering::Release);
880 handle.release_all_resource_reservations();
881 Some(handle)
882 }
883
884 #[must_use]
886 pub fn active_tenants(&self) -> Vec<TenantHandle> {
887 let mut out = Vec::with_capacity(self.tenants.len());
888 out.extend(self.tenants.iter().map(|entry| entry.value().clone()));
889 out.sort_by_key(TenantHandle::id);
890 out
891 }
892
893 pub fn active_tenants_into(&self, out: &mut Vec<TenantHandle>) {
895 out.clear();
896 out.reserve(self.tenants.len());
897 self.tenants
898 .iter()
899 .for_each(|entry| out.push(entry.value().clone()));
900 out.sort_by_key(TenantHandle::id);
901 }
902
903 #[must_use]
906 pub fn lookup(&self, tenant_id: u32) -> Option<TenantHandle> {
907 self.tenants
908 .get(&tenant_id)
909 .map(|entry| entry.value().clone())
910 }
911
912 #[must_use]
914 pub fn runtime_counters(&self) -> Vec<TenantRuntimeCounters> {
915 let mut out = Vec::with_capacity(self.tenants.len());
916 self.tenants
917 .iter()
918 .map(|entry| entry.value().runtime_counters())
919 .for_each(|counters| out.push(counters));
920 out.sort_by_key(|counters| counters.tenant_id);
921 out
922 }
923
924 pub fn runtime_counters_into(&self, out: &mut Vec<TenantRuntimeCounters>) {
926 out.clear();
927 out.reserve(self.tenants.len());
928 self.tenants
929 .iter()
930 .map(|entry| entry.value().runtime_counters())
931 .for_each(|counters| out.push(counters));
932 out.sort_by_key(|counters| counters.tenant_id);
933 }
934
935 #[must_use]
944 pub fn select_concurrent_tenants(&self, conflict_adj: &[u32]) -> Vec<u32> {
945 let mut out = Vec::new();
946 let mut scratch = TenantSelectionScratch::new();
947 self.select_concurrent_tenants_into(conflict_adj, &mut out, &mut scratch);
948 out
949 }
950
951 pub fn select_concurrent_tenants_into(
953 &self,
954 conflict_adj: &[u32],
955 out: &mut Vec<u32>,
956 scratch: &mut TenantSelectionScratch,
957 ) {
958 out.clear();
959 scratch.active_ids.clear();
960 scratch.active_ids.reserve(self.tenants.len());
961 self.tenants
962 .iter()
963 .map(|entry| entry.value().id())
964 .for_each(|id| scratch.active_ids.push(id));
965 scratch.active_ids.sort_unstable();
966 let n = scratch.active_ids.len();
967 if n == 0 {
968 return;
969 }
970 if vyre_driver::accounting::checked_mul_usize_lazy(n, n, || ()).ok()
971 != Some(conflict_adj.len())
972 {
973 out.reserve(n);
976 out.extend(scratch.active_ids.iter().copied());
977 return;
978 }
979 if conflict_adj.iter().all(|conflict| *conflict == 0) {
980 out.reserve(n);
981 out.extend(scratch.active_ids.iter().copied());
982 return;
983 }
984 scratch.selected_indices.clear();
985 scratch.selected_indices.reserve(n);
986 'candidate: for candidate_idx in 0..n {
987 for &selected_idx in &scratch.selected_indices {
988 if conflict_adj[candidate_idx * n + selected_idx] != 0
989 || conflict_adj[selected_idx * n + candidate_idx] != 0
990 {
991 continue 'candidate;
992 }
993 }
994 scratch.selected_indices.push(candidate_idx);
995 }
996 out.reserve(scratch.selected_indices.len());
997 for &index in &scratch.selected_indices {
998 if let Some(&id) = scratch.active_ids.get(index) {
999 out.push(id);
1000 }
1001 }
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008
1009 #[test]
1010 fn two_tenants_get_distinct_id_and_opcode_ranges() {
1011 let reg = TenantRegistry::new();
1012 let a = reg
1013 .register("scanner-a")
1014 .expect("Fix: register a; restore this invariant before continuing.");
1015 let b = reg
1016 .register("scanner-b")
1017 .expect("Fix: register b; restore this invariant before continuing.");
1018 assert_ne!(a.id(), b.id());
1019 assert!(a.base_opcode() + OPCODE_RANGE_PER_TENANT <= b.base_opcode());
1020 assert_eq!(a.label(), "scanner-a");
1021 assert_eq!(b.label(), "scanner-b");
1022 }
1023
1024 #[test]
1025 fn global_opcode_rejects_out_of_range_local() {
1026 let reg = TenantRegistry::new();
1027 let t = reg.register("soleno").unwrap();
1028 let err = t
1029 .global_opcode(OPCODE_RANGE_PER_TENANT)
1030 .expect_err("oversized local opcode must reject");
1031 assert!(matches!(err, TenantError::OpcodeOutOfRange { .. }));
1032
1033 let ok = t
1034 .global_opcode(42)
1035 .expect("Fix: 42 < cap; restore this invariant before continuing.");
1036 assert_eq!(ok, t.base_opcode() + 42);
1037 }
1038
1039 #[test]
1040 fn publish_slot_writes_with_tenant_id_and_bumps_counter() {
1041 let reg = TenantRegistry::new();
1042 let t = reg.register("warpscan").unwrap();
1043 let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
1044
1045 t.publish_slot(
1046 &mut ring,
1047 0,
1048 7,
1049 &[1, 2, 3],
1050 )
1051 .expect("Fix: publish; restore this invariant before continuing.");
1052 assert_eq!(t.published_count(), 1);
1053
1054 let tenant_off = super::super::megakernel::protocol::TENANT_WORD as usize * 4;
1056 let opcode_off = super::super::megakernel::protocol::OPCODE_WORD as usize * 4;
1057 let stored_tenant =
1058 u32::from_le_bytes(ring[tenant_off..tenant_off + 4].try_into().unwrap());
1059 let stored_opcode =
1060 u32::from_le_bytes(ring[opcode_off..opcode_off + 4].try_into().unwrap());
1061 assert_eq!(stored_tenant, t.id());
1062 assert_eq!(stored_opcode, t.base_opcode() + 7);
1063 }
1064
1065 #[test]
1066 fn unregister_blocks_future_publishes() {
1067 let reg = TenantRegistry::new();
1068 let t = reg.register("vein").unwrap();
1069 let tenant_id = t.id();
1070 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
1071 t.publish_slot(&mut ring, 0, 0, &[0, 0, 0])
1072 .expect("Fix: first publish ok; restore this invariant before continuing.");
1073 reg.unregister(tenant_id)
1074 .expect("Fix: unregister; restore this invariant before continuing.");
1075 let err = t
1076 .publish_slot(&mut ring, 1, 0, &[0, 0, 0])
1077 .expect_err("publish after unregister must reject");
1078 assert!(matches!(err, TenantError::Revoked { .. }));
1079 assert!(reg.lookup(tenant_id).is_none());
1080 }
1081
1082 #[test]
1083 fn quiesce_returns_when_drained_catches_up() {
1084 let reg = TenantRegistry::new();
1085 let t = reg.register("t1").unwrap();
1086 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
1087 t.publish_slot(&mut ring, 0, 0, &[1, 2, 3]).unwrap();
1088 t.publish_slot(&mut ring, 1, 0, &[4, 5, 6]).unwrap();
1089 assert_eq!(t.published_count(), 2);
1090 t.note_drained(2);
1091 t.quiesce(1)
1092 .expect("Fix: drained == published after note_drained; restore this invariant before continuing.");
1093 let counters = t.runtime_counters();
1094 assert_eq!(counters.published_count, 2);
1095 assert_eq!(counters.drained_count, 2);
1096 assert_eq!(counters.outstanding_slots, 0);
1097 assert_eq!(counters.quiesce_calls, 1);
1098 assert_eq!(counters.quiesce_timeouts, 0);
1099 }
1100
1101 #[test]
1102 fn quiesce_times_out_when_drain_stalled() {
1103 let reg = TenantRegistry::new();
1104 let t = reg.register("t2").unwrap();
1105 let mut ring = Megakernel::try_encode_empty_ring(1).unwrap();
1106 t.publish_slot(&mut ring, 0, 0, &[0, 0, 0]).unwrap();
1107 let err = t.quiesce(4).expect_err("stalled quiesce must time out");
1109 assert!(matches!(
1110 err,
1111 TenantError::QuiesceTimeout { outstanding: 1, .. }
1112 ));
1113 let counters = t.runtime_counters();
1114 assert_eq!(counters.outstanding_slots, 1);
1115 assert_eq!(counters.quiesce_calls, 1);
1116 assert_eq!(counters.quiesce_timeouts, 1);
1117 }
1118
1119 #[test]
1120 fn bounded_tenant_backpressure_rejects_unbounded_publish_backlog() {
1121 let reg = TenantRegistry::new();
1122 let t = reg.register_with_backpressure("bounded", 2).unwrap();
1123 let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
1124
1125 t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
1126 t.publish_slot(&mut ring, 1, 0, &[2]).unwrap();
1127 let err = t
1128 .publish_slot(&mut ring, 2, 0, &[3])
1129 .expect_err("third outstanding publish must hit tenant backpressure");
1130 assert!(matches!(
1131 err,
1132 TenantError::Backpressure {
1133 outstanding: 2,
1134 cap: 2,
1135 ..
1136 }
1137 ));
1138 assert_eq!(t.published_count(), 2);
1139 let counters = t.runtime_counters();
1140 assert_eq!(counters.max_outstanding_slots, 2);
1141 assert_eq!(counters.outstanding_slots, 2);
1142 }
1143
1144 #[test]
1145 fn tenant_backpressure_reopens_after_drain_progress() {
1146 let reg = TenantRegistry::new();
1147 let t = reg.register_with_backpressure("bounded", 1).unwrap();
1148 let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
1149
1150 t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
1151 assert!(matches!(
1152 t.publish_slot(&mut ring, 1, 0, &[2]).unwrap_err(),
1153 TenantError::Backpressure { .. }
1154 ));
1155 t.note_drained(1);
1156 t.publish_slot(&mut ring, 1, 0, &[2])
1157 .expect("Fix: drain progress must reopen the bounded tenant queue; restore this invariant before continuing.");
1158 assert_eq!(t.published_count(), 2);
1159 assert_eq!(t.runtime_counters().outstanding_slots, 1);
1160 }
1161
1162 #[test]
1163 fn tenant_resource_quotas_reject_overcommit_and_cleanup_on_unregister() {
1164 let reg = TenantRegistry::new();
1165 let t = reg
1166 .register_with_quotas("quota", TenantQuota::bounded(2, 16, 1))
1167 .unwrap();
1168
1169 t.reserve_staging_bytes(8).unwrap();
1170 let staging_error = t
1171 .reserve_staging_bytes(9)
1172 .expect_err("staging byte quota must reject overcommit");
1173 assert!(matches!(
1174 staging_error,
1175 TenantError::StagingBackpressure {
1176 requested: 9,
1177 cap: 16,
1178 ..
1179 }
1180 ));
1181 assert_eq!(t.quota_counters().staging_bytes, 8);
1182
1183 t.release_staging_bytes(4).unwrap();
1184 t.reserve_staging_bytes(12).unwrap();
1185 assert_eq!(t.quota_counters().staging_bytes, 16);
1186 let underflow = t
1187 .release_staging_bytes(17)
1188 .expect_err("staging release must reject underflow");
1189 assert!(matches!(
1190 underflow,
1191 TenantError::ResourceUnderflow {
1192 resource: "staging bytes",
1193 requested: 17,
1194 used: 16,
1195 ..
1196 }
1197 ));
1198
1199 t.reserve_resident_handles(1).unwrap();
1200 let handle_error = t
1201 .reserve_resident_handles(1)
1202 .expect_err("resident handle quota must reject overcommit");
1203 assert!(matches!(
1204 handle_error,
1205 TenantError::ResidentHandleBackpressure {
1206 requested: 1,
1207 cap: 1,
1208 ..
1209 }
1210 ));
1211 assert_eq!(t.quota_counters().resident_handles, 1);
1212
1213 let removed = reg.unregister(t.id()).unwrap();
1214 assert_eq!(removed.quota_counters().staging_bytes, 0);
1215 assert_eq!(removed.quota_counters().resident_handles, 0);
1216 assert!(matches!(
1217 t.reserve_staging_bytes(1).unwrap_err(),
1218 TenantError::Revoked { .. }
1219 ));
1220 assert!(matches!(
1221 t.reserve_resident_handles(1).unwrap_err(),
1222 TenantError::Revoked { .. }
1223 ));
1224 }
1225
1226 #[test]
1227 fn tenant_registry_registration_retry_uses_adaptive_idle_not_unbounded_spin() {
1228 for retry in [0, 1, 2, QUIESCE_SPIN_POLLS - 1, QUIESCE_SPIN_POLLS] {
1229 tenant_registry_retry_idle(retry);
1230 }
1231 assert_eq!(
1232 quiesce_backoff_duration(QUIESCE_SPIN_POLLS),
1233 QUIESCE_MIN_PARK
1234 );
1235 assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
1236 }
1237
1238 #[test]
1239 fn quiesce_backoff_is_bounded_and_monotonic() {
1240 let samples = [
1241 quiesce_backoff_duration(0),
1242 quiesce_backoff_duration(1),
1243 quiesce_backoff_duration(2),
1244 quiesce_backoff_duration(8),
1245 quiesce_backoff_duration(64),
1246 ];
1247 assert_eq!(samples[0], QUIESCE_MIN_PARK);
1248 for pair in samples.windows(2) {
1249 assert!(pair[0] <= pair[1], "quiesce backoff must not shrink");
1250 assert!(pair[1] <= QUIESCE_MAX_PARK, "quiesce backoff must cap");
1251 }
1252 assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
1253 }
1254
1255 #[test]
1256 fn active_tenants_tracks_registrations() {
1257 let reg = TenantRegistry::new();
1258 let a = reg.register("a").unwrap();
1259 let b = reg.register("b").unwrap();
1260 let active: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
1261 assert!(active.contains(&a.id()));
1262 assert!(active.contains(&b.id()));
1263 reg.unregister(a.id());
1264 let after: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
1265 assert!(!after.contains(&a.id()));
1266 assert!(after.contains(&b.id()));
1267 let counters: Vec<u32> = reg
1268 .runtime_counters()
1269 .iter()
1270 .map(|tenant| tenant.tenant_id)
1271 .collect();
1272 assert_eq!(counters, vec![b.id()]);
1273 }
1274
1275 #[test]
1276 fn tenant_snapshots_reuse_caller_storage() {
1277 let reg = TenantRegistry::new();
1278 let a = reg.register("a").unwrap();
1279 let b = reg.register("b").unwrap();
1280 let mut active = Vec::with_capacity(2);
1281 let mut counters = Vec::with_capacity(2);
1282
1283 reg.active_tenants_into(&mut active);
1284 reg.runtime_counters_into(&mut counters);
1285 let active_ptr = active.as_ptr();
1286 let counters_ptr = counters.as_ptr();
1287 reg.active_tenants_into(&mut active);
1288 reg.runtime_counters_into(&mut counters);
1289
1290 assert_eq!(active.as_ptr(), active_ptr);
1291 assert_eq!(counters.as_ptr(), counters_ptr);
1292 assert!(active.iter().any(|tenant| tenant.id() == a.id()));
1293 assert!(active.iter().any(|tenant| tenant.id() == b.id()));
1294 assert!(counters.iter().any(|tenant| tenant.tenant_id == a.id()));
1295 assert!(counters.iter().any(|tenant| tenant.tenant_id == b.id()));
1296 }
1297
1298 #[test]
1299 fn concurrent_tenant_selection_reuses_scratch_and_output() {
1300 let reg = TenantRegistry::new();
1301 let a = reg.register("a").unwrap();
1302 let b = reg.register("b").unwrap();
1303 let c = reg.register("c").unwrap();
1304 let n = 3;
1305 let mut conflicts = vec![0_u32; n * n];
1306 conflicts[0 * n + 1] = 1;
1307 conflicts[1 * n + 0] = 1;
1308 let mut out = Vec::with_capacity(3);
1309 let mut scratch = TenantSelectionScratch::new();
1310
1311 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1312 let out_ptr = out.as_ptr();
1313 let active_ids_ptr = scratch.active_ids.as_ptr();
1314 let selected_ptr = scratch.selected_indices.as_ptr();
1315 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1316
1317 assert_eq!(out.as_ptr(), out_ptr);
1318 assert_eq!(scratch.active_ids.as_ptr(), active_ids_ptr);
1319 assert_eq!(scratch.selected_indices.as_ptr(), selected_ptr);
1320 assert!(out.contains(&a.id()) || out.contains(&b.id()));
1321 assert!(!(out.contains(&a.id()) && out.contains(&b.id())));
1322 assert!(out.contains(&c.id()));
1323 }
1324
1325 #[test]
1326 fn concurrent_tenant_selection_fast_paths_all_zero_conflicts() {
1327 let reg = TenantRegistry::new();
1328 let a = reg.register("a").unwrap();
1329 let b = reg.register("b").unwrap();
1330 let c = reg.register("c").unwrap();
1331 let mut out = Vec::with_capacity(8);
1332 let mut scratch = TenantSelectionScratch::new();
1333 let conflicts = vec![0_u32; 9];
1334 let out_ptr = out.as_ptr();
1335
1336 reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1337
1338 assert_eq!(out, vec![a.id(), b.id(), c.id()]);
1339 assert_eq!(
1340 out.as_ptr(),
1341 out_ptr,
1342 "all-zero conflict fast path must reuse caller-owned output storage"
1343 );
1344 assert!(
1345 scratch.selected_indices.is_empty(),
1346 "all-zero conflict fast path must not populate pairwise selection scratch"
1347 );
1348 }
1349
1350 #[test]
1351 fn concurrent_tenant_selection_respects_conflicts() {
1352 let reg = TenantRegistry::new();
1353 let a = reg.register("a").unwrap();
1354 let b = reg.register("b").unwrap();
1355 let c = reg.register("c").unwrap();
1356 let n = 3;
1357 let mut conflicts = vec![0_u32; n * n];
1358 conflicts[0 * n + 1] = 1;
1359 conflicts[1 * n + 0] = 1;
1360
1361 let selected = reg.select_concurrent_tenants(&conflicts);
1362
1363 assert!(selected.contains(&a.id()) || selected.contains(&b.id()));
1364 assert!(!(selected.contains(&a.id()) && selected.contains(&b.id())));
1365 assert!(selected.contains(&c.id()));
1366 }
1367
1368 #[test]
1369 fn concurrent_registration_assigns_unique_ids() {
1370 use std::thread;
1371 let reg = Arc::new(TenantRegistry::new());
1372 let mut handles = Vec::new();
1373 for i in 0..32 {
1374 let reg = Arc::clone(®);
1375 handles.push(thread::spawn(move || {
1376 reg.register(format!("t{i}")).unwrap().id()
1377 }));
1378 }
1379 let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1380 let mut sorted = ids.clone();
1381 sorted.sort();
1382 sorted.dedup();
1383 assert_eq!(sorted.len(), ids.len(), "concurrent ids must be unique");
1384 }
1385}