Skip to main content

vyre_runtime/
tenant.rs

1//! Multi-tenant megakernel multiplexing.
2//!
3//! A single persistent megakernel per GPU can service many producer
4//! tools without each one paying the dispatch-setup cost. The
5//! `tenant_id` field already lives in the ring-slot protocol
6//! (`protocol::TENANT_WORD`); this module owns the host-side
7//! bookkeeping that hands each producer a stable id, reserves an
8//! opcode-range per producer, and gates publish operations against a
9//! per-tenant mask so one producer cannot accidentally drive another
10//! producer's opcodes.
11//!
12//! ## Tenants and opcodes
13//!
14//! Every tenant owns an opcode range `[base, base + cap)` where the
15//! whole range sits inside the user-extension space reserved by
16//! `vyre_runtime::megakernel::protocol::opcode` (≥ `0x4000_0000`).
17//! When [`TenantRegistry::register`] returns a [`TenantHandle`],
18//! callers publish into slot args `[rule_local_opcode, ...]` and
19//! the registry maps that to `(tenant_base + rule_local_opcode)`
20//! before writing into the ring. A tenant that tries to publish an
21//! opcode outside its own range fails with a structured error.
22//!
23//! ## Draining
24//!
25//! Unregistering a tenant revokes future publishes but does NOT
26//! revoke in-flight slots  -  the GPU is still going to execute any
27//! slot it already CAS-claimed. Callers that need hard draining
28//! drive [`TenantHandle::quiesce`] which spins on the megakernel
29//! DONE_COUNT until every slot the tenant published has been
30//! acknowledged.
31//!
32//! ## Daemon surface
33//!
34//! The registry is the reusable piece. A full `MegakernelDaemon`
35//! (listening on a Unix socket, vending handles over RPC) is a thin
36//! wrapper that we can ship alongside the runtime  -  the registry
37//! here already handles the interesting concurrency.
38
39use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use dashmap::DashMap;
44
45use crate::megakernel::protocol::opcode::SHUTDOWN;
46use crate::megakernel::Megakernel;
47use crate::PipelineError;
48
49/// First opcode the tenant registry hands out. Sits inside the
50/// user-extension range reserved by the megakernel protocol so fused
51/// rule documents compose with tenant allocation without colliding
52/// with built-in opcodes.
53pub const TENANT_OPCODE_BASE: u32 = 0x4000_0000;
54
55/// Upper bound on the tenant-id space. `tenant_id == TENANT_ID_MAX`
56/// is reserved as an invalid / revoked sentinel.
57pub const TENANT_ID_MAX: u32 = u32::MAX - 1;
58
59/// Size of the opcode window reserved per tenant. 1 << 20 = 1 MiB
60/// of opcodes  -  well over any realistic rule count per producer
61/// while still allowing ~4094 simultaneous tenants inside the u32
62/// opcode range.
63pub const OPCODE_RANGE_PER_TENANT: u32 = 1 << 20;
64
65const QUIESCE_SPIN_POLLS: u64 = 64;
66const QUIESCE_MIN_PARK: Duration = Duration::from_micros(2);
67const QUIESCE_MAX_PARK: Duration = Duration::from_micros(50);
68const QUIESCE_BACKOFF_SHIFT_CAP: u64 = 5;
69
70#[allow(clippy::unnecessary_min_or_max)]
71fn quiesce_backoff_duration(poll: u64) -> Duration {
72    let parked_poll = poll.checked_sub(QUIESCE_SPIN_POLLS).unwrap_or(0);
73    let shift = parked_poll.min(QUIESCE_BACKOFF_SHIFT_CAP) as u32;
74    let multiplier = 1_u32 << shift;
75    QUIESCE_MIN_PARK
76        .checked_mul(multiplier)
77        .unwrap_or(QUIESCE_MAX_PARK)
78        .min(QUIESCE_MAX_PARK)
79}
80
81fn quiesce_idle(poll: u64) {
82    if poll < QUIESCE_SPIN_POLLS {
83        std::hint::spin_loop();
84    } else {
85        std::thread::park_timeout(quiesce_backoff_duration(poll));
86    }
87}
88
89fn tenant_registry_retry_idle(retry: u64) {
90    if retry < QUIESCE_SPIN_POLLS {
91        std::hint::spin_loop();
92    } else {
93        std::thread::park_timeout(quiesce_backoff_duration(retry));
94    }
95}
96
97/// Errors surfaced by the tenant registry.
98#[derive(Debug, thiserror::Error)]
99#[non_exhaustive]
100pub enum TenantError {
101    /// The registry ran out of tenant ids. Unregister unused tenants
102    /// or raise the range per tenant.
103    #[error("tenant registry exhausted after {issued} registrations. Fix: shrink OPCODE_RANGE_PER_TENANT or recycle tenants.")]
104    RegistryFull {
105        /// Number of tenants already issued when exhaustion hit.
106        issued: u32,
107    },
108    /// Tried to publish an opcode outside the tenant's reserved
109    /// range. Almost always a caller bug.
110    #[error(
111        "tenant {tenant_id} published local opcode {local_opcode}; out of range [0, {cap}). \
112         Fix: caller must stay inside the opcode window returned by `register()`."
113    )]
114    OpcodeOutOfRange {
115        /// Tenant id that tripped.
116        tenant_id: u32,
117        /// Local opcode the caller supplied.
118        local_opcode: u32,
119        /// Cap on the tenant's local opcode range.
120        cap: u32,
121    },
122    /// Tenant was unregistered concurrently; its handle is stale.
123    #[error("tenant {tenant_id} was revoked; handle is stale. Fix: acquire a fresh handle from the registry.")]
124    Revoked {
125        /// Tenant id that was revoked.
126        tenant_id: u32,
127    },
128    /// Quiesce timed out with inflight slots still outstanding.
129    #[error(
130        "tenant {tenant_id} quiesce timed out with {outstanding} inflight slots. \
131         Fix: ensure the megakernel is making progress (check DONE_COUNT) or raise the timeout."
132    )]
133    QuiesceTimeout {
134        /// Tenant id whose quiesce tripped.
135        tenant_id: u32,
136        /// Number of slots still inflight at timeout.
137        outstanding: u64,
138    },
139    /// Tenant has reached its configured outstanding-slot cap.
140    #[error(
141        "tenant {tenant_id} has {outstanding} outstanding slots, cap {cap}. \
142         Fix: wait for drain progress or register the tenant with a larger bounded backlog."
143    )]
144    Backpressure {
145        /// Tenant id whose backlog is full.
146        tenant_id: u32,
147        /// Current host-visible outstanding slots.
148        outstanding: u64,
149        /// Configured outstanding-slot cap.
150        cap: u64,
151    },
152    /// Tenant has reached its configured staging-byte cap.
153    #[error(
154        "tenant {tenant_id} requested {requested} staging bytes with {used} already reserved, cap {cap}. \
155         Fix: release staging reservations after publish/readback progress or register the tenant with a larger bounded staging budget."
156    )]
157    StagingBackpressure {
158        /// Tenant id whose staging byte budget is full.
159        tenant_id: u32,
160        /// New bytes requested.
161        requested: u64,
162        /// Current reserved staging bytes.
163        used: u64,
164        /// Configured staging byte cap.
165        cap: u64,
166    },
167    /// Tenant has reached its configured resident-handle cap.
168    #[error(
169        "tenant {tenant_id} requested {requested} resident handles with {used} already reserved, cap {cap}. \
170         Fix: release resident handles when backend ownership ends or register the tenant with a larger bounded resident-handle budget."
171    )]
172    ResidentHandleBackpressure {
173        /// Tenant id whose resident handle budget is full.
174        tenant_id: u32,
175        /// New handles requested.
176        requested: u64,
177        /// Current reserved resident handles.
178        used: u64,
179        /// Configured resident handle cap.
180        cap: u64,
181    },
182    /// Tenant resource accounting would underflow.
183    #[error(
184        "tenant {tenant_id} released {requested} {resource} with only {used} reserved. \
185         Fix: pair every tenant resource release with a successful reservation."
186    )]
187    ResourceUnderflow {
188        /// Tenant id whose counter would underflow.
189        tenant_id: u32,
190        /// Resource counter being released.
191        resource: &'static str,
192        /// Release count requested.
193        requested: u64,
194        /// Current reserved count.
195        used: u64,
196    },
197    /// Protocol error bubbled up from `Megakernel::publish_slot`.
198    #[error("{0}")]
199    Pipeline(#[from] PipelineError),
200}
201
202/// Per-tenant resource quota.
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub struct TenantQuota {
205    /// Maximum host-visible ring slots the tenant may keep outstanding.
206    pub max_outstanding_slots: u64,
207    /// Maximum staging bytes the tenant may reserve for pending work.
208    pub max_staging_bytes: u64,
209    /// Maximum resident handles the tenant may hold at once.
210    pub max_resident_handles: u64,
211}
212
213impl TenantQuota {
214    /// Unbounded tenant quota for compatibility with the legacy registration
215    /// API. Individual fields are still normalized to at least one resource
216    /// slot during registration.
217    #[must_use]
218    pub const fn unbounded() -> Self {
219        Self {
220            max_outstanding_slots: u64::MAX,
221            max_staging_bytes: u64::MAX,
222            max_resident_handles: u64::MAX,
223        }
224    }
225
226    /// Build a bounded tenant quota.
227    #[must_use]
228    pub const fn bounded(
229        max_outstanding_slots: u64,
230        max_staging_bytes: u64,
231        max_resident_handles: u64,
232    ) -> Self {
233        Self {
234            max_outstanding_slots,
235            max_staging_bytes,
236            max_resident_handles,
237        }
238    }
239}
240
241/// One tenant's accounting state. Lives inside an `Arc` so handles
242/// stay valid after the registry borrow drops.
243struct TenantState {
244    id: u32,
245    base_opcode: u32,
246    opcode_cap: u32,
247    /// Number of slots this tenant has ever published.
248    published_count: AtomicU64,
249    /// Maximum host-visible slots this tenant may keep outstanding.
250    max_outstanding_slots: u64,
251    /// Number of staging bytes currently reserved by this tenant.
252    staging_bytes: AtomicU64,
253    /// Maximum staging bytes this tenant may reserve.
254    max_staging_bytes: u64,
255    /// Number of resident handles currently reserved by this tenant.
256    resident_handles: AtomicU64,
257    /// Maximum resident handles this tenant may reserve.
258    max_resident_handles: u64,
259    /// Number of slots the GPU has reported DONE for this tenant.
260    /// Advanced by [`TenantHandle::note_drained`].
261    drained_count: AtomicU64,
262    /// Number of quiesce calls completed or timed out for this tenant.
263    quiesce_calls: AtomicU64,
264    /// Number of quiesce calls that timed out before the tenant drained.
265    quiesce_timeouts: AtomicU64,
266    /// Cumulative host-observed drain wait across quiesce calls.
267    quiesce_wait_ns: AtomicU64,
268    /// Set to 1 on `unregister`; publishes reject afterwards.
269    revoked: AtomicU32,
270    /// Stable label for diagnostics (for example, `"scanner-a"`, `"scanner-b"`).
271    label: String,
272}
273
274/// Stable handle returned by [`TenantRegistry::register`]. Clones
275/// share the same underlying state, so multiple producer threads
276/// inside one tenant can publish through their own handles.
277#[derive(Clone)]
278pub struct TenantHandle {
279    state: Arc<TenantState>,
280}
281
282/// Host-visible tenant runtime counters.
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub struct TenantRuntimeCounters {
285    /// Tenant id.
286    pub tenant_id: u32,
287    /// Number of slots ever published by this tenant.
288    pub published_count: u64,
289    /// Number of slots observed drained for this tenant.
290    pub drained_count: u64,
291    /// Current host-visible backlog (`published_count - drained_count`).
292    pub outstanding_slots: u64,
293    /// Configured outstanding-slot cap for this tenant.
294    pub max_outstanding_slots: u64,
295    /// Number of quiesce calls recorded for this tenant.
296    pub quiesce_calls: u64,
297    /// Number of quiesce calls that timed out.
298    pub quiesce_timeouts: u64,
299    /// Cumulative nanoseconds spent waiting for this tenant to drain.
300    pub quiesce_wait_ns: u64,
301}
302
303/// Host-visible tenant quota counters.
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub struct TenantQuotaCounters {
306    /// Tenant id.
307    pub tenant_id: u32,
308    /// Current reserved staging bytes.
309    pub staging_bytes: u64,
310    /// Configured staging byte cap.
311    pub max_staging_bytes: u64,
312    /// Current reserved resident handle count.
313    pub resident_handles: u64,
314    /// Configured resident handle cap.
315    pub max_resident_handles: u64,
316}
317
318impl std::fmt::Debug for TenantHandle {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        f.debug_struct("TenantHandle")
321            .field("id", &self.state.id)
322            .field("label", &self.state.label)
323            .field("base_opcode", &self.state.base_opcode)
324            .field(
325                "published_count",
326                &self.state.published_count.load(Ordering::Relaxed),
327            )
328            .field("max_outstanding_slots", &self.state.max_outstanding_slots)
329            .field(
330                "staging_bytes",
331                &self.state.staging_bytes.load(Ordering::Relaxed),
332            )
333            .field("max_staging_bytes", &self.state.max_staging_bytes)
334            .field(
335                "resident_handles",
336                &self.state.resident_handles.load(Ordering::Relaxed),
337            )
338            .field("max_resident_handles", &self.state.max_resident_handles)
339            .field(
340                "drained_count",
341                &self.state.drained_count.load(Ordering::Relaxed),
342            )
343            .field(
344                "revoked",
345                &(self.state.revoked.load(Ordering::Acquire) != 0),
346            )
347            .finish()
348    }
349}
350
351impl TenantHandle {
352    /// Stable tenant id; maps onto the ring-slot `TENANT_WORD`.
353    #[must_use]
354    pub fn id(&self) -> u32 {
355        self.state.id
356    }
357
358    /// Human-readable label supplied at registration time.
359    #[must_use]
360    pub fn label(&self) -> &str {
361        &self.state.label
362    }
363
364    /// First opcode this tenant owns.
365    #[must_use]
366    pub fn base_opcode(&self) -> u32 {
367        self.state.base_opcode
368    }
369
370    /// Convert a tenant-local opcode to the global opcode used in
371    /// the ring slot. Caller enforces `local < opcode_cap()`.
372    ///
373    /// # Errors
374    ///
375    /// Returns [`TenantError::OpcodeOutOfRange`] when the local
376    /// value is outside the reserved window.
377    pub fn global_opcode(&self, local: u32) -> Result<u32, TenantError> {
378        self.ensure_not_revoked()?;
379        if local >= self.state.opcode_cap {
380            return Err(TenantError::OpcodeOutOfRange {
381                tenant_id: self.id(),
382                local_opcode: local,
383                cap: self.state.opcode_cap,
384            });
385        }
386        let global = self.state.base_opcode + local;
387        if let Err(e) = crate::megakernel::protocol::opcode::validate_user_opcode(global) {
388            return Err(TenantError::Pipeline(PipelineError::Backend(format!(
389                "tenant registry produced invalid global opcode {global}: {e}. Fix: repair tenant opcode window allocation before publishing."
390            ))));
391        }
392        Ok(global)
393    }
394
395    /// Publish a slot into the tenant's ring with a tenant-local
396    /// opcode. Convenience wrapper that composes
397    /// [`Megakernel::publish_slot`] with tenant bookkeeping.
398    ///
399    /// # Errors
400    ///
401    /// - [`TenantError::Revoked`] if the tenant was unregistered.
402    /// - [`TenantError::OpcodeOutOfRange`] if `local_opcode` is
403    ///   outside the tenant's window.
404    /// - [`TenantError::Pipeline`] when the underlying
405    ///   `publish_slot` rejects (e.g., slot still in-flight).
406    pub fn publish_slot(
407        &self,
408        ring_bytes: &mut [u8],
409        slot_idx: u32,
410        local_opcode: u32,
411        args: &[u32],
412    ) -> Result<(), TenantError> {
413        self.ensure_not_revoked()?;
414        let global = self.global_opcode(local_opcode)?;
415        self.reserve_publish_slot()?;
416        if let Err(error) =
417            Megakernel::publish_slot(ring_bytes, slot_idx, self.state.id, global, args)
418        {
419            saturating_atomic_sub_u64(&self.state.published_count, 1, "tenant published rollback");
420            return Err(error.into());
421        }
422        Ok(())
423    }
424
425    fn ensure_not_revoked(&self) -> Result<(), TenantError> {
426        if self.state.revoked.load(Ordering::Acquire) != 0 {
427            return Err(TenantError::Revoked {
428                tenant_id: self.state.id,
429            });
430        }
431        Ok(())
432    }
433
434    fn reserve_publish_slot(&self) -> Result<(), TenantError> {
435        let cap = self.state.max_outstanding_slots;
436        vyre_driver::accounting::checked_atomic_update_u64_with_order(
437            &self.state.published_count,
438            Ordering::Acquire,
439            Ordering::AcqRel,
440            Ordering::Acquire,
441            |published| {
442                let drained = self.state.drained_count.load(Ordering::Acquire);
443                let outstanding = vyre_driver::accounting::checked_sub_u64_lazy(
444                    published,
445                    drained,
446                    || {
447                        TenantError::Pipeline(PipelineError::QueueFull {
448                            queue: "tenant",
449                            fix: "tenant drained_count exceeded published_count; rebuild tenant accounting state",
450                        })
451                    },
452                )?;
453                if outstanding >= cap {
454                    return Err(TenantError::Backpressure {
455                        tenant_id: self.state.id,
456                        outstanding,
457                        cap,
458                    });
459                }
460                vyre_driver::accounting::checked_add_u64_lazy(published, 1, || {
461                    TenantError::Pipeline(PipelineError::QueueFull {
462                        queue: "tenant",
463                        fix: "tenant published_count overflowed u64; quiesce or recreate the tenant before publishing more slots",
464                    })
465                })
466            },
467            |_, _| Ok(()),
468        )?;
469        Ok(())
470    }
471
472    /// Number of slots this tenant has ever published.
473    #[must_use]
474    pub fn published_count(&self) -> u64 {
475        self.state.published_count.load(Ordering::Relaxed)
476    }
477
478    /// Number of slots this tenant has observed drained (via
479    /// [`note_drained`](Self::note_drained)).
480    #[must_use]
481    pub fn drained_count(&self) -> u64 {
482        self.state.drained_count.load(Ordering::Relaxed)
483    }
484
485    /// Maximum host-visible slots this tenant may keep outstanding.
486    #[must_use]
487    pub fn max_outstanding_slots(&self) -> u64 {
488        self.state.max_outstanding_slots
489    }
490
491    /// Reserve staging bytes against this tenant's quota.
492    pub fn reserve_staging_bytes(&self, byte_count: u64) -> Result<(), TenantError> {
493        self.ensure_not_revoked()?;
494        reserve_resource_quota(
495            &self.state.staging_bytes,
496            byte_count,
497            self.state.max_staging_bytes,
498            || {
499                TenantError::StagingBackpressure {
500                    tenant_id: self.state.id,
501                    requested: byte_count,
502                    used: self.state.staging_bytes.load(Ordering::Acquire),
503                    cap: self.state.max_staging_bytes,
504                }
505            },
506            "tenant staging byte reservation overflowed u64; release staging reservations or recreate the tenant before reserving more bytes",
507        )
508    }
509
510    /// Release staging bytes previously reserved by this tenant.
511    pub fn release_staging_bytes(&self, byte_count: u64) -> Result<(), TenantError> {
512        release_resource_quota(
513            &self.state.staging_bytes,
514            byte_count,
515            self.state.id,
516            "staging bytes",
517        )
518    }
519
520    /// Reserve resident handles against this tenant's quota.
521    pub fn reserve_resident_handles(&self, handle_count: u64) -> Result<(), TenantError> {
522        self.ensure_not_revoked()?;
523        reserve_resource_quota(
524            &self.state.resident_handles,
525            handle_count,
526            self.state.max_resident_handles,
527            || {
528                TenantError::ResidentHandleBackpressure {
529                    tenant_id: self.state.id,
530                    requested: handle_count,
531                    used: self.state.resident_handles.load(Ordering::Acquire),
532                    cap: self.state.max_resident_handles,
533                }
534            },
535            "tenant resident handle reservation overflowed u64; release resident handles or recreate the tenant before reserving more handles",
536        )
537    }
538
539    /// Release resident handles previously reserved by this tenant.
540    pub fn release_resident_handles(&self, handle_count: u64) -> Result<(), TenantError> {
541        release_resource_quota(
542            &self.state.resident_handles,
543            handle_count,
544            self.state.id,
545            "resident handles",
546        )
547    }
548
549    /// Snapshot quota counters for this tenant.
550    #[must_use]
551    pub fn quota_counters(&self) -> TenantQuotaCounters {
552        TenantQuotaCounters {
553            tenant_id: self.state.id,
554            staging_bytes: self.state.staging_bytes.load(Ordering::Acquire),
555            max_staging_bytes: self.state.max_staging_bytes,
556            resident_handles: self.state.resident_handles.load(Ordering::Acquire),
557            max_resident_handles: self.state.max_resident_handles,
558        }
559    }
560
561    fn release_all_resource_reservations(&self) {
562        self.state.staging_bytes.store(0, Ordering::Release);
563        self.state.resident_handles.store(0, Ordering::Release);
564    }
565
566    /// Snapshot host-visible runtime counters for this tenant.
567    #[must_use]
568    pub fn runtime_counters(&self) -> TenantRuntimeCounters {
569        let published_count = self.state.published_count.load(Ordering::Acquire);
570        let drained_count = self.state.drained_count.load(Ordering::Acquire);
571        TenantRuntimeCounters {
572            tenant_id: self.state.id,
573            published_count,
574            drained_count,
575            outstanding_slots: published_count.saturating_sub(drained_count),
576            max_outstanding_slots: self.state.max_outstanding_slots,
577            quiesce_calls: self.state.quiesce_calls.load(Ordering::Acquire),
578            quiesce_timeouts: self.state.quiesce_timeouts.load(Ordering::Acquire),
579            quiesce_wait_ns: self.state.quiesce_wait_ns.load(Ordering::Acquire),
580        }
581    }
582
583    /// Mark `count` slots as drained. The host pump that observes
584    /// DONE_COUNT calls this when it sees the global counter
585    /// advance past the tenant's last-published cursor.
586    pub fn note_drained(&self, count: u64) {
587        saturating_atomic_add_u64(&self.state.drained_count, count, "tenant drained_count");
588    }
589
590    /// Block-style quiesce: bounded backoff until every published
591    /// slot has been drained or `max_spins` polls elapse.
592    ///
593    /// # Errors
594    ///
595    /// Returns [`TenantError::QuiesceTimeout`] when `max_spins`
596    /// iterations pass without full drain. The outstanding count
597    /// at timeout is included for diagnostics.
598    pub fn quiesce(&self, max_spins: u64) -> Result<(), TenantError> {
599        let started = Instant::now();
600        for poll in 0..max_spins {
601            let pub_count = self.state.published_count.load(Ordering::Acquire);
602            let drained = self.state.drained_count.load(Ordering::Acquire);
603            if drained >= pub_count {
604                self.record_quiesce(started, false);
605                return Ok(());
606            }
607            quiesce_idle(poll);
608        }
609        let pub_count = self.state.published_count.load(Ordering::Acquire);
610        let drained = self.state.drained_count.load(Ordering::Acquire);
611        self.record_quiesce(started, true);
612        Err(TenantError::QuiesceTimeout {
613            tenant_id: self.state.id,
614            outstanding: vyre_driver::accounting::checked_sub_u64_lazy(pub_count, drained, || {
615                TenantError::Pipeline(PipelineError::QueueFull {
616                    queue: "tenant",
617                    fix: "tenant drained_count exceeded published_count during quiesce; rebuild tenant accounting state",
618                })
619            })?,
620        })
621    }
622
623    fn record_quiesce(&self, started: Instant, timed_out: bool) {
624        saturating_atomic_add_u64(&self.state.quiesce_calls, 1, "tenant quiesce_calls");
625        if timed_out {
626            saturating_atomic_add_u64(&self.state.quiesce_timeouts, 1, "tenant quiesce_timeouts");
627        }
628        let elapsed_ns = match u64::try_from(started.elapsed().as_nanos()) {
629            Ok(elapsed_ns) => elapsed_ns,
630            Err(_) => u64::MAX,
631        };
632        saturating_atomic_add_u64(
633            &self.state.quiesce_wait_ns,
634            elapsed_ns,
635            "tenant quiesce_wait_ns",
636        );
637    }
638}
639
640/// Thread-safe tenant registry. One per megakernel instance.
641
642pub struct TenantRegistry {
643    tenants: DashMap<u32, TenantHandle>,
644    next_id: AtomicU32,
645}
646
647impl Default for TenantRegistry {
648    fn default() -> Self {
649        Self {
650            tenants: DashMap::new(),
651            next_id: AtomicU32::new(0),
652        }
653    }
654}
655
656/// Caller-owned scratch for repeated concurrent-tenant selection.
657#[derive(Debug, Default)]
658pub struct TenantSelectionScratch {
659    active_ids: Vec<u32>,
660    selected_indices: Vec<usize>,
661}
662
663impl TenantSelectionScratch {
664    /// Construct empty tenant-selection scratch.
665    #[must_use]
666    pub const fn new() -> Self {
667        Self {
668            active_ids: Vec::new(),
669            selected_indices: Vec::new(),
670        }
671    }
672}
673
674fn saturating_atomic_add_u64(counter: &AtomicU64, value: u64, _label: &'static str) {
675    let mut current = counter.load(Ordering::Acquire);
676    loop {
677        let next = current.saturating_add(value);
678        match counter.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire) {
679            Ok(_) => return,
680            Err(observed) => current = observed,
681        }
682    }
683}
684
685fn saturating_atomic_sub_u64(counter: &AtomicU64, value: u64, _label: &'static str) {
686    let mut current = counter.load(Ordering::Acquire);
687    loop {
688        let next = current.saturating_sub(value);
689        match counter.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire) {
690            Ok(_) => return,
691            Err(observed) => current = observed,
692        }
693    }
694}
695
696fn reserve_resource_quota(
697    counter: &AtomicU64,
698    value: u64,
699    cap: u64,
700    backpressure: impl Fn() -> TenantError,
701    overflow_fix: &'static str,
702) -> Result<(), TenantError> {
703    vyre_driver::accounting::checked_atomic_update_u64_with_order(
704        counter,
705        Ordering::Acquire,
706        Ordering::AcqRel,
707        Ordering::Acquire,
708        |used| {
709            let next = vyre_driver::accounting::checked_add_u64_lazy(used, value, || {
710                TenantError::Pipeline(PipelineError::QueueFull {
711                    queue: "tenant resource quota",
712                    fix: overflow_fix,
713                })
714            })?;
715            if next > cap {
716                return Err(backpressure());
717            }
718            Ok(next)
719        },
720        |_, _| Ok(()),
721    )?;
722    Ok(())
723}
724
725fn release_resource_quota(
726    counter: &AtomicU64,
727    value: u64,
728    tenant_id: u32,
729    resource: &'static str,
730) -> Result<(), TenantError> {
731    vyre_driver::accounting::checked_atomic_update_u64_with_order(
732        counter,
733        Ordering::Acquire,
734        Ordering::AcqRel,
735        Ordering::Acquire,
736        |used| {
737            used.checked_sub(value)
738                .ok_or(TenantError::ResourceUnderflow {
739                    tenant_id,
740                    resource,
741                    requested: value,
742                    used,
743                })
744        },
745        |_, _| Ok(()),
746    )?;
747    Ok(())
748}
749
750impl TenantRegistry {
751    /// Fresh registry with no tenants.
752    #[must_use]
753    pub fn new() -> Self {
754        Self::default()
755    }
756
757    /// Register a new tenant with the given diagnostic label.
758    /// Returns a handle whose opcode range is reserved until
759    /// [`unregister`](Self::unregister) is called.
760    ///
761    /// # Errors
762    ///
763    /// Returns [`TenantError::RegistryFull`] when the tenant id or
764    /// opcode space is exhausted.
765    pub fn register(&self, label: impl Into<String>) -> Result<TenantHandle, TenantError> {
766        self.register_with_backpressure(label, u64::MAX)
767    }
768
769    /// Register a new tenant with a bounded outstanding-slot budget.
770    ///
771    /// # Errors
772    ///
773    /// Returns [`TenantError::RegistryFull`] when the tenant id or opcode space
774    /// is exhausted.
775    pub fn register_with_backpressure(
776        &self,
777        label: impl Into<String>,
778        max_outstanding_slots: u64,
779    ) -> Result<TenantHandle, TenantError> {
780        self.register_with_quotas(
781            label,
782            TenantQuota {
783                max_outstanding_slots,
784                ..TenantQuota::unbounded()
785            },
786        )
787    }
788
789    /// Register a tenant with explicit ring-slot, staging-byte, and
790    /// resident-handle quotas.
791    ///
792    /// # Errors
793    ///
794    /// Returns [`TenantError::RegistryFull`] when the tenant id or opcode space
795    /// is exhausted.
796    pub fn register_with_quotas(
797        &self,
798        label: impl Into<String>,
799        quota: TenantQuota,
800    ) -> Result<TenantHandle, TenantError> {
801        let mut registration_retries = 0u64;
802        let issued = vyre_driver::accounting::checked_atomic_update_u32_with_order(
803            &self.next_id,
804            Ordering::Relaxed,
805            Ordering::SeqCst,
806            Ordering::Relaxed,
807            |current| {
808                if current >= TENANT_ID_MAX {
809                    return Err(TenantError::RegistryFull { issued: current });
810                }
811                let id = current.max(1);
812                id.checked_add(1)
813                    .ok_or(TenantError::RegistryFull { issued: current })
814            },
815            |_, _| {
816                tenant_registry_retry_idle(registration_retries);
817                registration_retries = vyre_driver::accounting::checked_add_u64_lazy(
818                    registration_retries,
819                    1,
820                    || {
821                        TenantError::Pipeline(PipelineError::QueueFull {
822                            queue: "tenant",
823                            fix: "tenant registration retry counter overflowed u64; retry registration later",
824                        })
825                    },
826                )?;
827                Ok(())
828            },
829        )?;
830        let id = issued.max(1);
831
832        let tenant_offset = vyre_driver::accounting::checked_mul_u32_value(
833            id,
834            OPCODE_RANGE_PER_TENANT,
835            TenantError::RegistryFull { issued },
836        )?;
837        let base_opcode = vyre_driver::accounting::checked_add_u32_value(
838            TENANT_OPCODE_BASE,
839            tenant_offset,
840            TenantError::RegistryFull { issued },
841        )?;
842        let top_opcode = vyre_driver::accounting::checked_add_u32_value(
843            base_opcode,
844            OPCODE_RANGE_PER_TENANT,
845            TenantError::RegistryFull { issued },
846        )?;
847        if top_opcode == SHUTDOWN {
848            return Err(TenantError::RegistryFull { issued });
849        }
850        let handle = TenantHandle {
851            state: Arc::new(TenantState {
852                id,
853                base_opcode,
854                opcode_cap: OPCODE_RANGE_PER_TENANT,
855                published_count: AtomicU64::new(0),
856                max_outstanding_slots: quota.max_outstanding_slots.max(1),
857                staging_bytes: AtomicU64::new(0),
858                max_staging_bytes: quota.max_staging_bytes.max(1),
859                resident_handles: AtomicU64::new(0),
860                max_resident_handles: quota.max_resident_handles.max(1),
861                drained_count: AtomicU64::new(0),
862                quiesce_calls: AtomicU64::new(0),
863                quiesce_timeouts: AtomicU64::new(0),
864                quiesce_wait_ns: AtomicU64::new(0),
865                revoked: AtomicU32::new(0),
866                label: label.into(),
867            }),
868        };
869        self.tenants.insert(id, handle.clone());
870        Ok(handle)
871    }
872
873    /// Unregister a tenant. Future publishes on the handle fail
874    /// with [`TenantError::Revoked`]. In-flight slots already on
875    /// the GPU still execute  -  the host is responsible for
876    /// quiescing before unregister if it needs that guarantee.
877    pub fn unregister(&self, tenant_id: u32) -> Option<TenantHandle> {
878        let (_, handle) = self.tenants.remove(&tenant_id)?;
879        handle.state.revoked.store(1, Ordering::Release);
880        handle.release_all_resource_reservations();
881        Some(handle)
882    }
883
884    /// Snapshot of active tenants for observability / diagnostics.
885    #[must_use]
886    pub fn active_tenants(&self) -> Vec<TenantHandle> {
887        let mut out = Vec::with_capacity(self.tenants.len());
888        out.extend(self.tenants.iter().map(|entry| entry.value().clone()));
889        out.sort_by_key(TenantHandle::id);
890        out
891    }
892
893    /// Snapshot active tenants into caller-owned storage.
894    pub fn active_tenants_into(&self, out: &mut Vec<TenantHandle>) {
895        out.clear();
896        out.reserve(self.tenants.len());
897        self.tenants
898            .iter()
899            .for_each(|entry| out.push(entry.value().clone()));
900        out.sort_by_key(TenantHandle::id);
901    }
902
903    /// Look up a tenant by id. Returns `None` if the id was
904    /// unregistered.
905    #[must_use]
906    pub fn lookup(&self, tenant_id: u32) -> Option<TenantHandle> {
907        self.tenants
908            .get(&tenant_id)
909            .map(|entry| entry.value().clone())
910    }
911
912    /// Snapshot runtime counters for every active tenant.
913    #[must_use]
914    pub fn runtime_counters(&self) -> Vec<TenantRuntimeCounters> {
915        let mut out = Vec::with_capacity(self.tenants.len());
916        self.tenants
917            .iter()
918            .map(|entry| entry.value().runtime_counters())
919            .for_each(|counters| out.push(counters));
920        out.sort_by_key(|counters| counters.tenant_id);
921        out
922    }
923
924    /// Snapshot runtime counters into caller-owned storage.
925    pub fn runtime_counters_into(&self, out: &mut Vec<TenantRuntimeCounters>) {
926        out.clear();
927        out.reserve(self.tenants.len());
928        self.tenants
929            .iter()
930            .map(|entry| entry.value().runtime_counters())
931            .for_each(|counters| out.push(counters));
932        out.sort_by_key(|counters| counters.tenant_id);
933    }
934
935    /// Select a maximal independent subset of tenants for a fair
936    /// schedule slot.
937    ///
938    /// `conflict_adj[i*n+j] != 0` means tenants `i` and `j` cannot
939    /// share the same dispatch slot (e.g., both pinned to the same
940    /// queue, or both holding mutually-exclusive opcode locks). The
941    /// Returns a Vec of tenant ids in selection order. Empty if no
942    /// tenants are active.
943    #[must_use]
944    pub fn select_concurrent_tenants(&self, conflict_adj: &[u32]) -> Vec<u32> {
945        let mut out = Vec::new();
946        let mut scratch = TenantSelectionScratch::new();
947        self.select_concurrent_tenants_into(conflict_adj, &mut out, &mut scratch);
948        out
949    }
950
951    /// Select a maximal independent tenant subset into caller-owned storage.
952    pub fn select_concurrent_tenants_into(
953        &self,
954        conflict_adj: &[u32],
955        out: &mut Vec<u32>,
956        scratch: &mut TenantSelectionScratch,
957    ) {
958        out.clear();
959        scratch.active_ids.clear();
960        scratch.active_ids.reserve(self.tenants.len());
961        self.tenants
962            .iter()
963            .map(|entry| entry.value().id())
964            .for_each(|id| scratch.active_ids.push(id));
965        scratch.active_ids.sort_unstable();
966        let n = scratch.active_ids.len();
967        if n == 0 {
968            return;
969        }
970        if vyre_driver::accounting::checked_mul_usize_lazy(n, n, || ()).ok()
971            != Some(conflict_adj.len())
972        {
973            // Degenerate: caller didn't supply a matching adjacency.
974            // Default to all-tenants-can-run (no conflicts).
975            out.reserve(n);
976            out.extend(scratch.active_ids.iter().copied());
977            return;
978        }
979        if conflict_adj.iter().all(|conflict| *conflict == 0) {
980            out.reserve(n);
981            out.extend(scratch.active_ids.iter().copied());
982            return;
983        }
984        scratch.selected_indices.clear();
985        scratch.selected_indices.reserve(n);
986        'candidate: for candidate_idx in 0..n {
987            for &selected_idx in &scratch.selected_indices {
988                if conflict_adj[candidate_idx * n + selected_idx] != 0
989                    || conflict_adj[selected_idx * n + candidate_idx] != 0
990                {
991                    continue 'candidate;
992                }
993            }
994            scratch.selected_indices.push(candidate_idx);
995        }
996        out.reserve(scratch.selected_indices.len());
997        for &index in &scratch.selected_indices {
998            if let Some(&id) = scratch.active_ids.get(index) {
999                out.push(id);
1000            }
1001        }
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008
1009    #[test]
1010    fn two_tenants_get_distinct_id_and_opcode_ranges() {
1011        let reg = TenantRegistry::new();
1012        let a = reg
1013            .register("scanner-a")
1014            .expect("Fix: register a; restore this invariant before continuing.");
1015        let b = reg
1016            .register("scanner-b")
1017            .expect("Fix: register b; restore this invariant before continuing.");
1018        assert_ne!(a.id(), b.id());
1019        assert!(a.base_opcode() + OPCODE_RANGE_PER_TENANT <= b.base_opcode());
1020        assert_eq!(a.label(), "scanner-a");
1021        assert_eq!(b.label(), "scanner-b");
1022    }
1023
1024    #[test]
1025    fn global_opcode_rejects_out_of_range_local() {
1026        let reg = TenantRegistry::new();
1027        let t = reg.register("soleno").unwrap();
1028        let err = t
1029            .global_opcode(OPCODE_RANGE_PER_TENANT)
1030            .expect_err("oversized local opcode must reject");
1031        assert!(matches!(err, TenantError::OpcodeOutOfRange { .. }));
1032
1033        let ok = t
1034            .global_opcode(42)
1035            .expect("Fix: 42 < cap; restore this invariant before continuing.");
1036        assert_eq!(ok, t.base_opcode() + 42);
1037    }
1038
1039    #[test]
1040    fn publish_slot_writes_with_tenant_id_and_bumps_counter() {
1041        let reg = TenantRegistry::new();
1042        let t = reg.register("warpscan").unwrap();
1043        let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
1044
1045        t.publish_slot(
1046            &mut ring,
1047            /* slot = */ 0,
1048            /* local = */ 7,
1049            &[1, 2, 3],
1050        )
1051        .expect("Fix: publish; restore this invariant before continuing.");
1052        assert_eq!(t.published_count(), 1);
1053
1054        // Slot 0 should carry tenant=t.id(), opcode=t.base_opcode()+7.
1055        let tenant_off = super::super::megakernel::protocol::TENANT_WORD as usize * 4;
1056        let opcode_off = super::super::megakernel::protocol::OPCODE_WORD as usize * 4;
1057        let stored_tenant =
1058            u32::from_le_bytes(ring[tenant_off..tenant_off + 4].try_into().unwrap());
1059        let stored_opcode =
1060            u32::from_le_bytes(ring[opcode_off..opcode_off + 4].try_into().unwrap());
1061        assert_eq!(stored_tenant, t.id());
1062        assert_eq!(stored_opcode, t.base_opcode() + 7);
1063    }
1064
1065    #[test]
1066    fn unregister_blocks_future_publishes() {
1067        let reg = TenantRegistry::new();
1068        let t = reg.register("vein").unwrap();
1069        let tenant_id = t.id();
1070        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
1071        t.publish_slot(&mut ring, 0, 0, &[0, 0, 0])
1072            .expect("Fix: first publish ok; restore this invariant before continuing.");
1073        reg.unregister(tenant_id)
1074            .expect("Fix: unregister; restore this invariant before continuing.");
1075        let err = t
1076            .publish_slot(&mut ring, 1, 0, &[0, 0, 0])
1077            .expect_err("publish after unregister must reject");
1078        assert!(matches!(err, TenantError::Revoked { .. }));
1079        assert!(reg.lookup(tenant_id).is_none());
1080    }
1081
1082    #[test]
1083    fn quiesce_returns_when_drained_catches_up() {
1084        let reg = TenantRegistry::new();
1085        let t = reg.register("t1").unwrap();
1086        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
1087        t.publish_slot(&mut ring, 0, 0, &[1, 2, 3]).unwrap();
1088        t.publish_slot(&mut ring, 1, 0, &[4, 5, 6]).unwrap();
1089        assert_eq!(t.published_count(), 2);
1090        t.note_drained(2);
1091        t.quiesce(1)
1092            .expect("Fix: drained == published after note_drained; restore this invariant before continuing.");
1093        let counters = t.runtime_counters();
1094        assert_eq!(counters.published_count, 2);
1095        assert_eq!(counters.drained_count, 2);
1096        assert_eq!(counters.outstanding_slots, 0);
1097        assert_eq!(counters.quiesce_calls, 1);
1098        assert_eq!(counters.quiesce_timeouts, 0);
1099    }
1100
1101    #[test]
1102    fn quiesce_times_out_when_drain_stalled() {
1103        let reg = TenantRegistry::new();
1104        let t = reg.register("t2").unwrap();
1105        let mut ring = Megakernel::try_encode_empty_ring(1).unwrap();
1106        t.publish_slot(&mut ring, 0, 0, &[0, 0, 0]).unwrap();
1107        // Never note_drained → quiesce must time out.
1108        let err = t.quiesce(4).expect_err("stalled quiesce must time out");
1109        assert!(matches!(
1110            err,
1111            TenantError::QuiesceTimeout { outstanding: 1, .. }
1112        ));
1113        let counters = t.runtime_counters();
1114        assert_eq!(counters.outstanding_slots, 1);
1115        assert_eq!(counters.quiesce_calls, 1);
1116        assert_eq!(counters.quiesce_timeouts, 1);
1117    }
1118
1119    #[test]
1120    fn bounded_tenant_backpressure_rejects_unbounded_publish_backlog() {
1121        let reg = TenantRegistry::new();
1122        let t = reg.register_with_backpressure("bounded", 2).unwrap();
1123        let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
1124
1125        t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
1126        t.publish_slot(&mut ring, 1, 0, &[2]).unwrap();
1127        let err = t
1128            .publish_slot(&mut ring, 2, 0, &[3])
1129            .expect_err("third outstanding publish must hit tenant backpressure");
1130        assert!(matches!(
1131            err,
1132            TenantError::Backpressure {
1133                outstanding: 2,
1134                cap: 2,
1135                ..
1136            }
1137        ));
1138        assert_eq!(t.published_count(), 2);
1139        let counters = t.runtime_counters();
1140        assert_eq!(counters.max_outstanding_slots, 2);
1141        assert_eq!(counters.outstanding_slots, 2);
1142    }
1143
1144    #[test]
1145    fn tenant_backpressure_reopens_after_drain_progress() {
1146        let reg = TenantRegistry::new();
1147        let t = reg.register_with_backpressure("bounded", 1).unwrap();
1148        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
1149
1150        t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
1151        assert!(matches!(
1152            t.publish_slot(&mut ring, 1, 0, &[2]).unwrap_err(),
1153            TenantError::Backpressure { .. }
1154        ));
1155        t.note_drained(1);
1156        t.publish_slot(&mut ring, 1, 0, &[2])
1157            .expect("Fix: drain progress must reopen the bounded tenant queue; restore this invariant before continuing.");
1158        assert_eq!(t.published_count(), 2);
1159        assert_eq!(t.runtime_counters().outstanding_slots, 1);
1160    }
1161
1162    #[test]
1163    fn tenant_resource_quotas_reject_overcommit_and_cleanup_on_unregister() {
1164        let reg = TenantRegistry::new();
1165        let t = reg
1166            .register_with_quotas("quota", TenantQuota::bounded(2, 16, 1))
1167            .unwrap();
1168
1169        t.reserve_staging_bytes(8).unwrap();
1170        let staging_error = t
1171            .reserve_staging_bytes(9)
1172            .expect_err("staging byte quota must reject overcommit");
1173        assert!(matches!(
1174            staging_error,
1175            TenantError::StagingBackpressure {
1176                requested: 9,
1177                cap: 16,
1178                ..
1179            }
1180        ));
1181        assert_eq!(t.quota_counters().staging_bytes, 8);
1182
1183        t.release_staging_bytes(4).unwrap();
1184        t.reserve_staging_bytes(12).unwrap();
1185        assert_eq!(t.quota_counters().staging_bytes, 16);
1186        let underflow = t
1187            .release_staging_bytes(17)
1188            .expect_err("staging release must reject underflow");
1189        assert!(matches!(
1190            underflow,
1191            TenantError::ResourceUnderflow {
1192                resource: "staging bytes",
1193                requested: 17,
1194                used: 16,
1195                ..
1196            }
1197        ));
1198
1199        t.reserve_resident_handles(1).unwrap();
1200        let handle_error = t
1201            .reserve_resident_handles(1)
1202            .expect_err("resident handle quota must reject overcommit");
1203        assert!(matches!(
1204            handle_error,
1205            TenantError::ResidentHandleBackpressure {
1206                requested: 1,
1207                cap: 1,
1208                ..
1209            }
1210        ));
1211        assert_eq!(t.quota_counters().resident_handles, 1);
1212
1213        let removed = reg.unregister(t.id()).unwrap();
1214        assert_eq!(removed.quota_counters().staging_bytes, 0);
1215        assert_eq!(removed.quota_counters().resident_handles, 0);
1216        assert!(matches!(
1217            t.reserve_staging_bytes(1).unwrap_err(),
1218            TenantError::Revoked { .. }
1219        ));
1220        assert!(matches!(
1221            t.reserve_resident_handles(1).unwrap_err(),
1222            TenantError::Revoked { .. }
1223        ));
1224    }
1225
1226    #[test]
1227    fn tenant_registry_registration_retry_uses_adaptive_idle_not_unbounded_spin() {
1228        for retry in [0, 1, 2, QUIESCE_SPIN_POLLS - 1, QUIESCE_SPIN_POLLS] {
1229            tenant_registry_retry_idle(retry);
1230        }
1231        assert_eq!(
1232            quiesce_backoff_duration(QUIESCE_SPIN_POLLS),
1233            QUIESCE_MIN_PARK
1234        );
1235        assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
1236    }
1237
1238    #[test]
1239    fn quiesce_backoff_is_bounded_and_monotonic() {
1240        let samples = [
1241            quiesce_backoff_duration(0),
1242            quiesce_backoff_duration(1),
1243            quiesce_backoff_duration(2),
1244            quiesce_backoff_duration(8),
1245            quiesce_backoff_duration(64),
1246        ];
1247        assert_eq!(samples[0], QUIESCE_MIN_PARK);
1248        for pair in samples.windows(2) {
1249            assert!(pair[0] <= pair[1], "quiesce backoff must not shrink");
1250            assert!(pair[1] <= QUIESCE_MAX_PARK, "quiesce backoff must cap");
1251        }
1252        assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
1253    }
1254
1255    #[test]
1256    fn active_tenants_tracks_registrations() {
1257        let reg = TenantRegistry::new();
1258        let a = reg.register("a").unwrap();
1259        let b = reg.register("b").unwrap();
1260        let active: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
1261        assert!(active.contains(&a.id()));
1262        assert!(active.contains(&b.id()));
1263        reg.unregister(a.id());
1264        let after: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
1265        assert!(!after.contains(&a.id()));
1266        assert!(after.contains(&b.id()));
1267        let counters: Vec<u32> = reg
1268            .runtime_counters()
1269            .iter()
1270            .map(|tenant| tenant.tenant_id)
1271            .collect();
1272        assert_eq!(counters, vec![b.id()]);
1273    }
1274
1275    #[test]
1276    fn tenant_snapshots_reuse_caller_storage() {
1277        let reg = TenantRegistry::new();
1278        let a = reg.register("a").unwrap();
1279        let b = reg.register("b").unwrap();
1280        let mut active = Vec::with_capacity(2);
1281        let mut counters = Vec::with_capacity(2);
1282
1283        reg.active_tenants_into(&mut active);
1284        reg.runtime_counters_into(&mut counters);
1285        let active_ptr = active.as_ptr();
1286        let counters_ptr = counters.as_ptr();
1287        reg.active_tenants_into(&mut active);
1288        reg.runtime_counters_into(&mut counters);
1289
1290        assert_eq!(active.as_ptr(), active_ptr);
1291        assert_eq!(counters.as_ptr(), counters_ptr);
1292        assert!(active.iter().any(|tenant| tenant.id() == a.id()));
1293        assert!(active.iter().any(|tenant| tenant.id() == b.id()));
1294        assert!(counters.iter().any(|tenant| tenant.tenant_id == a.id()));
1295        assert!(counters.iter().any(|tenant| tenant.tenant_id == b.id()));
1296    }
1297
1298    #[test]
1299    fn concurrent_tenant_selection_reuses_scratch_and_output() {
1300        let reg = TenantRegistry::new();
1301        let a = reg.register("a").unwrap();
1302        let b = reg.register("b").unwrap();
1303        let c = reg.register("c").unwrap();
1304        let n = 3;
1305        let mut conflicts = vec![0_u32; n * n];
1306        conflicts[0 * n + 1] = 1;
1307        conflicts[1 * n + 0] = 1;
1308        let mut out = Vec::with_capacity(3);
1309        let mut scratch = TenantSelectionScratch::new();
1310
1311        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1312        let out_ptr = out.as_ptr();
1313        let active_ids_ptr = scratch.active_ids.as_ptr();
1314        let selected_ptr = scratch.selected_indices.as_ptr();
1315        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1316
1317        assert_eq!(out.as_ptr(), out_ptr);
1318        assert_eq!(scratch.active_ids.as_ptr(), active_ids_ptr);
1319        assert_eq!(scratch.selected_indices.as_ptr(), selected_ptr);
1320        assert!(out.contains(&a.id()) || out.contains(&b.id()));
1321        assert!(!(out.contains(&a.id()) && out.contains(&b.id())));
1322        assert!(out.contains(&c.id()));
1323    }
1324
1325    #[test]
1326    fn concurrent_tenant_selection_fast_paths_all_zero_conflicts() {
1327        let reg = TenantRegistry::new();
1328        let a = reg.register("a").unwrap();
1329        let b = reg.register("b").unwrap();
1330        let c = reg.register("c").unwrap();
1331        let mut out = Vec::with_capacity(8);
1332        let mut scratch = TenantSelectionScratch::new();
1333        let conflicts = vec![0_u32; 9];
1334        let out_ptr = out.as_ptr();
1335
1336        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1337
1338        assert_eq!(out, vec![a.id(), b.id(), c.id()]);
1339        assert_eq!(
1340            out.as_ptr(),
1341            out_ptr,
1342            "all-zero conflict fast path must reuse caller-owned output storage"
1343        );
1344        assert!(
1345            scratch.selected_indices.is_empty(),
1346            "all-zero conflict fast path must not populate pairwise selection scratch"
1347        );
1348    }
1349
1350    #[test]
1351    fn concurrent_tenant_selection_respects_conflicts() {
1352        let reg = TenantRegistry::new();
1353        let a = reg.register("a").unwrap();
1354        let b = reg.register("b").unwrap();
1355        let c = reg.register("c").unwrap();
1356        let n = 3;
1357        let mut conflicts = vec![0_u32; n * n];
1358        conflicts[0 * n + 1] = 1;
1359        conflicts[1 * n + 0] = 1;
1360
1361        let selected = reg.select_concurrent_tenants(&conflicts);
1362
1363        assert!(selected.contains(&a.id()) || selected.contains(&b.id()));
1364        assert!(!(selected.contains(&a.id()) && selected.contains(&b.id())));
1365        assert!(selected.contains(&c.id()));
1366    }
1367
1368    #[test]
1369    fn concurrent_registration_assigns_unique_ids() {
1370        use std::thread;
1371        let reg = Arc::new(TenantRegistry::new());
1372        let mut handles = Vec::new();
1373        for i in 0..32 {
1374            let reg = Arc::clone(&reg);
1375            handles.push(thread::spawn(move || {
1376                reg.register(format!("t{i}")).unwrap().id()
1377            }));
1378        }
1379        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1380        let mut sorted = ids.clone();
1381        sorted.sort();
1382        sorted.dedup();
1383        assert_eq!(sorted.len(), ids.len(), "concurrent ids must be unique");
1384    }
1385}