Skip to main content

vyre_runtime/
tenant.rs

1//! Multi-tenant megakernel multiplexing.
2//!
3//! A single persistent megakernel per GPU can service many producer
4//! tools without each one paying the dispatch-setup cost. The
5//! `tenant_id` field already lives in the ring-slot protocol
6//! (`protocol::TENANT_WORD`); this module owns the host-side
7//! bookkeeping that hands each producer a stable id, reserves an
8//! opcode-range per producer, and gates publish operations against a
9//! per-tenant mask so one producer cannot accidentally drive another
10//! producer's opcodes.
11//!
12//! ## Tenants and opcodes
13//!
14//! Every tenant owns an opcode range `[base, base + cap)` where the
15//! whole range sits inside the user-extension space reserved by
16//! `vyre_runtime::megakernel::protocol::opcode` (≥ `0x4000_0000`).
17//! When [`TenantRegistry::register`] returns a [`TenantHandle`],
18//! callers publish into slot args `[rule_local_opcode, ...]` and
19//! the registry maps that to `(tenant_base + rule_local_opcode)`
20//! before writing into the ring. A tenant that tries to publish an
21//! opcode outside its own range fails with a structured error.
22//!
23//! ## Draining
24//!
25//! Unregistering a tenant revokes future publishes but does NOT
26//! revoke in-flight slots  -  the GPU is still going to execute any
27//! slot it already CAS-claimed. Callers that need hard draining
28//! drive [`TenantHandle::quiesce`] which spins on the megakernel
29//! DONE_COUNT until every slot the tenant published has been
30//! acknowledged.
31//!
32//! ## Daemon surface
33//!
34//! The registry is the reusable piece. A full `MegakernelDaemon`
35//! (listening on a Unix socket, vending handles over RPC) is a thin
36//! wrapper that we can ship alongside the runtime  -  the registry
37//! here already handles the interesting concurrency.
38
39use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use dashmap::DashMap;
44
45use crate::megakernel::protocol::opcode::SHUTDOWN;
46use crate::megakernel::Megakernel;
47use crate::PipelineError;
48
49/// First opcode the tenant registry hands out. Sits inside the
50/// user-extension range reserved by the megakernel protocol so fused
51/// rule documents compose with tenant allocation without colliding
52/// with built-in opcodes.
53pub const TENANT_OPCODE_BASE: u32 = 0x4000_0000;
54
55/// Upper bound on the tenant-id space. `tenant_id == TENANT_ID_MAX`
56/// is reserved as an invalid / revoked sentinel.
57pub const TENANT_ID_MAX: u32 = u32::MAX - 1;
58
59/// Size of the opcode window reserved per tenant. 1 << 20 = 1 MiB
60/// of opcodes  -  well over any realistic rule count per producer
61/// while still allowing ~4094 simultaneous tenants inside the u32
62/// opcode range.
63pub const OPCODE_RANGE_PER_TENANT: u32 = 1 << 20;
64
65const QUIESCE_SPIN_POLLS: u64 = 64;
66const QUIESCE_MIN_PARK: Duration = Duration::from_micros(2);
67const QUIESCE_MAX_PARK: Duration = Duration::from_micros(50);
68const QUIESCE_BACKOFF_SHIFT_CAP: u64 = 5;
69
70#[allow(clippy::unnecessary_min_or_max)]
71fn quiesce_backoff_duration(poll: u64) -> Duration {
72    let parked_poll = poll.checked_sub(QUIESCE_SPIN_POLLS).unwrap_or(0);
73    let shift = u32::try_from(parked_poll.min(QUIESCE_BACKOFF_SHIFT_CAP)).unwrap_or_else(|error| {
74        panic!(
75            "tenant quiesce backoff shift cannot fit u32: {error}. Fix: lower QUIESCE_BACKOFF_SHIFT_CAP."
76        )
77    });
78    let multiplier = 1_u32.checked_shl(shift).unwrap_or_else(|| {
79        panic!("tenant quiesce backoff multiplier overflowed u32. Fix: lower shift cap.")
80    });
81    QUIESCE_MIN_PARK
82        .checked_mul(multiplier)
83        .unwrap_or_else(|| {
84            panic!("tenant quiesce backoff duration overflowed. Fix: lower quiesce park bounds.")
85        })
86        .min(QUIESCE_MAX_PARK)
87}
88
89fn quiesce_idle(poll: u64) {
90    if poll < QUIESCE_SPIN_POLLS {
91        std::hint::spin_loop();
92    } else {
93        std::thread::park_timeout(quiesce_backoff_duration(poll));
94    }
95}
96
97fn tenant_registry_retry_idle(retry: u64) {
98    if retry < QUIESCE_SPIN_POLLS {
99        std::hint::spin_loop();
100    } else {
101        std::thread::park_timeout(quiesce_backoff_duration(retry));
102    }
103}
104
105/// Errors surfaced by the tenant registry.
106#[derive(Debug, thiserror::Error)]
107#[non_exhaustive]
108pub enum TenantError {
109    /// The registry ran out of tenant ids. Unregister unused tenants
110    /// or raise the range per tenant.
111    #[error("tenant registry exhausted after {issued} registrations. Fix: shrink OPCODE_RANGE_PER_TENANT or recycle tenants.")]
112    RegistryFull {
113        /// Number of tenants already issued when exhaustion hit.
114        issued: u32,
115    },
116    /// Tried to publish an opcode outside the tenant's reserved
117    /// range. Almost always a caller bug.
118    #[error(
119        "tenant {tenant_id} published local opcode {local_opcode}; out of range [0, {cap}). \
120         Fix: caller must stay inside the opcode window returned by `register()`."
121    )]
122    OpcodeOutOfRange {
123        /// Tenant id that tripped.
124        tenant_id: u32,
125        /// Local opcode the caller supplied.
126        local_opcode: u32,
127        /// Cap on the tenant's local opcode range.
128        cap: u32,
129    },
130    /// Tenant was unregistered concurrently; its handle is stale.
131    #[error("tenant {tenant_id} was revoked; handle is stale. Fix: acquire a fresh handle from the registry.")]
132    Revoked {
133        /// Tenant id that was revoked.
134        tenant_id: u32,
135    },
136    /// Quiesce timed out with inflight slots still outstanding.
137    #[error(
138        "tenant {tenant_id} quiesce timed out with {outstanding} inflight slots. \
139         Fix: ensure the megakernel is making progress (check DONE_COUNT) or raise the timeout."
140    )]
141    QuiesceTimeout {
142        /// Tenant id whose quiesce tripped.
143        tenant_id: u32,
144        /// Number of slots still inflight at timeout.
145        outstanding: u64,
146    },
147    /// Tenant has reached its configured outstanding-slot cap.
148    #[error(
149        "tenant {tenant_id} has {outstanding} outstanding slots, cap {cap}. \
150         Fix: wait for drain progress or register the tenant with a larger bounded backlog."
151    )]
152    Backpressure {
153        /// Tenant id whose backlog is full.
154        tenant_id: u32,
155        /// Current host-visible outstanding slots.
156        outstanding: u64,
157        /// Configured outstanding-slot cap.
158        cap: u64,
159    },
160    /// Protocol error bubbled up from `Megakernel::publish_slot`.
161    #[error("{0}")]
162    Pipeline(#[from] PipelineError),
163}
164
165/// One tenant's accounting state. Lives inside an `Arc` so handles
166/// stay valid after the registry borrow drops.
167struct TenantState {
168    id: u32,
169    base_opcode: u32,
170    opcode_cap: u32,
171    /// Number of slots this tenant has ever published.
172    published_count: AtomicU64,
173    /// Maximum host-visible slots this tenant may keep outstanding.
174    max_outstanding_slots: u64,
175    /// Number of slots the GPU has reported DONE for this tenant.
176    /// Advanced by [`TenantHandle::note_drained`].
177    drained_count: AtomicU64,
178    /// Number of quiesce calls completed or timed out for this tenant.
179    quiesce_calls: AtomicU64,
180    /// Number of quiesce calls that timed out before the tenant drained.
181    quiesce_timeouts: AtomicU64,
182    /// Cumulative host-observed drain wait across quiesce calls.
183    quiesce_wait_ns: AtomicU64,
184    /// Set to 1 on `unregister`; publishes reject afterwards.
185    revoked: AtomicU32,
186    /// Stable label for diagnostics (for example, `"scanner-a"`, `"scanner-b"`).
187    label: String,
188}
189
190/// Stable handle returned by [`TenantRegistry::register`]. Clones
191/// share the same underlying state, so multiple producer threads
192/// inside one tenant can publish through their own handles.
193#[derive(Clone)]
194pub struct TenantHandle {
195    state: Arc<TenantState>,
196}
197
198/// Host-visible tenant runtime counters.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct TenantRuntimeCounters {
201    /// Tenant id.
202    pub tenant_id: u32,
203    /// Number of slots ever published by this tenant.
204    pub published_count: u64,
205    /// Number of slots observed drained for this tenant.
206    pub drained_count: u64,
207    /// Current host-visible backlog (`published_count - drained_count`).
208    pub outstanding_slots: u64,
209    /// Configured outstanding-slot cap for this tenant.
210    pub max_outstanding_slots: u64,
211    /// Number of quiesce calls recorded for this tenant.
212    pub quiesce_calls: u64,
213    /// Number of quiesce calls that timed out.
214    pub quiesce_timeouts: u64,
215    /// Cumulative nanoseconds spent waiting for this tenant to drain.
216    pub quiesce_wait_ns: u64,
217}
218
219impl std::fmt::Debug for TenantHandle {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("TenantHandle")
222            .field("id", &self.state.id)
223            .field("label", &self.state.label)
224            .field("base_opcode", &self.state.base_opcode)
225            .field(
226                "published_count",
227                &self.state.published_count.load(Ordering::Relaxed),
228            )
229            .field("max_outstanding_slots", &self.state.max_outstanding_slots)
230            .field(
231                "drained_count",
232                &self.state.drained_count.load(Ordering::Relaxed),
233            )
234            .field(
235                "revoked",
236                &(self.state.revoked.load(Ordering::Acquire) != 0),
237            )
238            .finish()
239    }
240}
241
242impl TenantHandle {
243    /// Stable tenant id; maps onto the ring-slot `TENANT_WORD`.
244    #[must_use]
245    pub fn id(&self) -> u32 {
246        self.state.id
247    }
248
249    /// Human-readable label supplied at registration time.
250    #[must_use]
251    pub fn label(&self) -> &str {
252        &self.state.label
253    }
254
255    /// First opcode this tenant owns.
256    #[must_use]
257    pub fn base_opcode(&self) -> u32 {
258        self.state.base_opcode
259    }
260
261    /// Convert a tenant-local opcode to the global opcode used in
262    /// the ring slot. Caller enforces `local < opcode_cap()`.
263    ///
264    /// # Errors
265    ///
266    /// Returns [`TenantError::OpcodeOutOfRange`] when the local
267    /// value is outside the reserved window.
268    pub fn global_opcode(&self, local: u32) -> Result<u32, TenantError> {
269        if local >= self.state.opcode_cap {
270            return Err(TenantError::OpcodeOutOfRange {
271                tenant_id: self.id(),
272                local_opcode: local,
273                cap: self.state.opcode_cap,
274            });
275        }
276        let global = self.state.base_opcode + local;
277        if let Err(e) = crate::megakernel::protocol::opcode::validate_user_opcode(global) {
278            return Err(TenantError::Pipeline(PipelineError::Backend(format!(
279                "tenant registry produced invalid global opcode {global}: {e}. Fix: repair tenant opcode window allocation before publishing."
280            ))));
281        }
282        Ok(global)
283    }
284
285    /// Publish a slot into the tenant's ring with a tenant-local
286    /// opcode. Convenience wrapper that composes
287    /// [`Megakernel::publish_slot`] with tenant bookkeeping.
288    ///
289    /// # Errors
290    ///
291    /// - [`TenantError::Revoked`] if the tenant was unregistered.
292    /// - [`TenantError::OpcodeOutOfRange`] if `local_opcode` is
293    ///   outside the tenant's window.
294    /// - [`TenantError::Pipeline`] when the underlying
295    ///   `publish_slot` rejects (e.g., slot still in-flight).
296    pub fn publish_slot(
297        &self,
298        ring_bytes: &mut [u8],
299        slot_idx: u32,
300        local_opcode: u32,
301        args: &[u32],
302    ) -> Result<(), TenantError> {
303        if self.state.revoked.load(Ordering::Acquire) != 0 {
304            return Err(TenantError::Revoked {
305                tenant_id: self.state.id,
306            });
307        }
308        let global = self.global_opcode(local_opcode)?;
309        self.reserve_publish_slot()?;
310        if let Err(error) =
311            Megakernel::publish_slot(ring_bytes, slot_idx, self.state.id, global, args)
312        {
313            checked_atomic_sub_u64(&self.state.published_count, 1, "tenant published rollback");
314            return Err(error.into());
315        }
316        Ok(())
317    }
318
319    fn reserve_publish_slot(&self) -> Result<(), TenantError> {
320        let cap = self.state.max_outstanding_slots;
321        vyre_driver::accounting::checked_atomic_update_u64_with_order(
322            &self.state.published_count,
323            Ordering::Acquire,
324            Ordering::AcqRel,
325            Ordering::Acquire,
326            |published| {
327                let drained = self.state.drained_count.load(Ordering::Acquire);
328                let outstanding =
329                    vyre_driver::accounting::checked_sub_u64_lazy(published, drained, || {
330                        TenantError::Pipeline(PipelineError::QueueFull {
331                            queue: "tenant",
332                            fix: "tenant drained_count exceeded published_count; rebuild tenant accounting state",
333                        })
334                    })?;
335                if outstanding >= cap {
336                    return Err(TenantError::Backpressure {
337                        tenant_id: self.state.id,
338                        outstanding,
339                        cap,
340                    });
341                }
342                vyre_driver::accounting::checked_add_u64_lazy(published, 1, || {
343                    TenantError::Pipeline(PipelineError::QueueFull {
344                        queue: "tenant",
345                        fix: "tenant published_count overflowed u64; quiesce or recreate the tenant before publishing more slots",
346                    })
347                })
348            },
349            |_, _| Ok(()),
350        )?;
351        Ok(())
352    }
353
354    /// Number of slots this tenant has ever published.
355    #[must_use]
356    pub fn published_count(&self) -> u64 {
357        self.state.published_count.load(Ordering::Relaxed)
358    }
359
360    /// Number of slots this tenant has observed drained (via
361    /// [`note_drained`](Self::note_drained)).
362    #[must_use]
363    pub fn drained_count(&self) -> u64 {
364        self.state.drained_count.load(Ordering::Relaxed)
365    }
366
367    /// Maximum host-visible slots this tenant may keep outstanding.
368    #[must_use]
369    pub fn max_outstanding_slots(&self) -> u64 {
370        self.state.max_outstanding_slots
371    }
372
373    /// Snapshot host-visible runtime counters for this tenant.
374    #[must_use]
375    pub fn runtime_counters(&self) -> TenantRuntimeCounters {
376        let published_count = self.state.published_count.load(Ordering::Acquire);
377        let drained_count = self.state.drained_count.load(Ordering::Acquire);
378        TenantRuntimeCounters {
379            tenant_id: self.state.id,
380            published_count,
381            drained_count,
382            outstanding_slots: vyre_driver::accounting::checked_sub_u64_lazy(
383                published_count,
384                drained_count,
385                || "tenant drained_count exceeded published_count. Fix: rebuild tenant accounting state.",
386            )
387            .unwrap_or_else(|message| panic!("{message}")),
388            max_outstanding_slots: self.state.max_outstanding_slots,
389            quiesce_calls: self.state.quiesce_calls.load(Ordering::Acquire),
390            quiesce_timeouts: self.state.quiesce_timeouts.load(Ordering::Acquire),
391            quiesce_wait_ns: self.state.quiesce_wait_ns.load(Ordering::Acquire),
392        }
393    }
394
395    /// Mark `count` slots as drained. The host pump that observes
396    /// DONE_COUNT calls this when it sees the global counter
397    /// advance past the tenant's last-published cursor.
398    pub fn note_drained(&self, count: u64) {
399        checked_atomic_add_u64(&self.state.drained_count, count, "tenant drained_count");
400    }
401
402    /// Block-style quiesce: bounded backoff until every published
403    /// slot has been drained or `max_spins` polls elapse.
404    ///
405    /// # Errors
406    ///
407    /// Returns [`TenantError::QuiesceTimeout`] when `max_spins`
408    /// iterations pass without full drain. The outstanding count
409    /// at timeout is included for diagnostics.
410    pub fn quiesce(&self, max_spins: u64) -> Result<(), TenantError> {
411        let started = Instant::now();
412        for poll in 0..max_spins {
413            let pub_count = self.state.published_count.load(Ordering::Acquire);
414            let drained = self.state.drained_count.load(Ordering::Acquire);
415            if drained >= pub_count {
416                self.record_quiesce(started, false);
417                return Ok(());
418            }
419            quiesce_idle(poll);
420        }
421        let pub_count = self.state.published_count.load(Ordering::Acquire);
422        let drained = self.state.drained_count.load(Ordering::Acquire);
423        self.record_quiesce(started, true);
424        Err(TenantError::QuiesceTimeout {
425            tenant_id: self.state.id,
426            outstanding: vyre_driver::accounting::checked_sub_u64_lazy(pub_count, drained, || {
427                TenantError::Pipeline(PipelineError::QueueFull {
428                    queue: "tenant",
429                    fix: "tenant drained_count exceeded published_count during quiesce; rebuild tenant accounting state",
430                })
431            })?,
432        })
433    }
434
435    fn record_quiesce(&self, started: Instant, timed_out: bool) {
436        checked_atomic_add_u64(&self.state.quiesce_calls, 1, "tenant quiesce_calls");
437        if timed_out {
438            checked_atomic_add_u64(&self.state.quiesce_timeouts, 1, "tenant quiesce_timeouts");
439        }
440        let elapsed_ns = u64::try_from(started.elapsed().as_nanos()).unwrap_or_else(|error| {
441            panic!(
442                "tenant quiesce elapsed nanoseconds cannot fit u64: {error}. Fix: quiesce with a bounded timeout."
443            )
444        });
445        checked_atomic_add_u64(
446            &self.state.quiesce_wait_ns,
447            elapsed_ns,
448            "tenant quiesce_wait_ns",
449        );
450    }
451}
452
453/// Thread-safe tenant registry. One per megakernel instance.
454
455pub struct TenantRegistry {
456    tenants: DashMap<u32, TenantHandle>,
457    next_id: AtomicU32,
458}
459
460impl Default for TenantRegistry {
461    fn default() -> Self {
462        Self {
463            tenants: DashMap::new(),
464            next_id: AtomicU32::new(0),
465        }
466    }
467}
468
469/// Caller-owned scratch for repeated concurrent-tenant selection.
470#[derive(Debug, Default)]
471pub struct TenantSelectionScratch {
472    active_ids: Vec<u32>,
473    selected_indices: Vec<usize>,
474}
475
476impl TenantSelectionScratch {
477    /// Construct empty tenant-selection scratch.
478    #[must_use]
479    pub const fn new() -> Self {
480        Self {
481            active_ids: Vec::new(),
482            selected_indices: Vec::new(),
483        }
484    }
485}
486
487fn checked_atomic_add_u64(counter: &AtomicU64, value: u64, label: &'static str) {
488    vyre_driver::accounting::checked_atomic_add_u64_with_order(
489        counter,
490        value,
491        Ordering::Acquire,
492        Ordering::AcqRel,
493        Ordering::Acquire,
494        |_, _| {
495            format!("{label} overflowed u64. Fix: quiesce or recreate the tenant accounting state.")
496        },
497    )
498    .unwrap_or_else(|message| panic!("{message}"));
499}
500
501fn checked_atomic_sub_u64(counter: &AtomicU64, value: u64, label: &'static str) {
502    vyre_driver::accounting::checked_atomic_sub_u64_with_order(
503        counter,
504        value,
505        Ordering::Acquire,
506        Ordering::AcqRel,
507        Ordering::Acquire,
508        |_, _| {
509            format!("{label} underflowed u64. Fix: rebuild tenant accounting state.")
510        },
511    )
512    .unwrap_or_else(|message| panic!("{message}"));
513}
514
515impl TenantRegistry {
516    /// Fresh registry with no tenants.
517    #[must_use]
518    pub fn new() -> Self {
519        Self::default()
520    }
521
522    /// Register a new tenant with the given diagnostic label.
523    /// Returns a handle whose opcode range is reserved until
524    /// [`unregister`](Self::unregister) is called.
525    ///
526    /// # Errors
527    ///
528    /// Returns [`TenantError::RegistryFull`] when the tenant id or
529    /// opcode space is exhausted.
530    pub fn register(&self, label: impl Into<String>) -> Result<TenantHandle, TenantError> {
531        self.register_with_backpressure(label, u64::MAX)
532    }
533
534    /// Register a new tenant with a bounded outstanding-slot budget.
535    ///
536    /// # Errors
537    ///
538    /// Returns [`TenantError::RegistryFull`] when the tenant id or opcode space
539    /// is exhausted.
540    pub fn register_with_backpressure(
541        &self,
542        label: impl Into<String>,
543        max_outstanding_slots: u64,
544    ) -> Result<TenantHandle, TenantError> {
545        let mut registration_retries = 0u64;
546        let issued = vyre_driver::accounting::checked_atomic_update_u32_with_order(
547            &self.next_id,
548            Ordering::Relaxed,
549            Ordering::SeqCst,
550            Ordering::Relaxed,
551            |current| {
552                if current >= TENANT_ID_MAX {
553                    return Err(TenantError::RegistryFull { issued: current });
554                }
555                let id = current.max(1);
556                id.checked_add(1)
557                    .ok_or(TenantError::RegistryFull { issued: current })
558            },
559            |_, _| {
560                tenant_registry_retry_idle(registration_retries);
561                registration_retries = vyre_driver::accounting::checked_add_u64_lazy(
562                    registration_retries,
563                    1,
564                    || {
565                        TenantError::Pipeline(PipelineError::QueueFull {
566                            queue: "tenant",
567                            fix: "tenant registration retry counter overflowed u64; retry registration later",
568                        })
569                    },
570                )?;
571                Ok(())
572            }
573        )?;
574        let id = issued.max(1);
575
576        let tenant_offset = vyre_driver::accounting::checked_mul_u32_value(
577            id,
578            OPCODE_RANGE_PER_TENANT,
579            TenantError::RegistryFull { issued },
580        )?;
581        let base_opcode = vyre_driver::accounting::checked_add_u32_value(
582            TENANT_OPCODE_BASE,
583            tenant_offset,
584            TenantError::RegistryFull { issued },
585        )?;
586        let top_opcode = vyre_driver::accounting::checked_add_u32_value(
587            base_opcode,
588            OPCODE_RANGE_PER_TENANT,
589            TenantError::RegistryFull { issued },
590        )?;
591        if top_opcode == SHUTDOWN {
592            return Err(TenantError::RegistryFull { issued });
593        }
594        let handle = TenantHandle {
595            state: Arc::new(TenantState {
596                id,
597                base_opcode,
598                opcode_cap: OPCODE_RANGE_PER_TENANT,
599                published_count: AtomicU64::new(0),
600                max_outstanding_slots: max_outstanding_slots.max(1),
601                drained_count: AtomicU64::new(0),
602                quiesce_calls: AtomicU64::new(0),
603                quiesce_timeouts: AtomicU64::new(0),
604                quiesce_wait_ns: AtomicU64::new(0),
605                revoked: AtomicU32::new(0),
606                label: label.into(),
607            }),
608        };
609        self.tenants.insert(id, handle.clone());
610        Ok(handle)
611    }
612
613    /// Unregister a tenant. Future publishes on the handle fail
614    /// with [`TenantError::Revoked`]. In-flight slots already on
615    /// the GPU still execute  -  the host is responsible for
616    /// quiescing before unregister if it needs that guarantee.
617    pub fn unregister(&self, tenant_id: u32) -> Option<TenantHandle> {
618        let (_, handle) = self.tenants.remove(&tenant_id)?;
619        handle.state.revoked.store(1, Ordering::Release);
620        Some(handle)
621    }
622
623    /// Snapshot of active tenants for observability / diagnostics.
624    #[must_use]
625    pub fn active_tenants(&self) -> Vec<TenantHandle> {
626        let mut out = Vec::with_capacity(self.tenants.len());
627        out.extend(self.tenants.iter().map(|entry| entry.value().clone()));
628        out.sort_by_key(TenantHandle::id);
629        out
630    }
631
632    /// Snapshot active tenants into caller-owned storage.
633    pub fn active_tenants_into(&self, out: &mut Vec<TenantHandle>) {
634        out.clear();
635        out.reserve(self.tenants.len());
636        self.tenants
637            .iter()
638            .for_each(|entry| out.push(entry.value().clone()));
639        out.sort_by_key(TenantHandle::id);
640    }
641
642    /// Look up a tenant by id. Returns `None` if the id was
643    /// unregistered.
644    #[must_use]
645    pub fn lookup(&self, tenant_id: u32) -> Option<TenantHandle> {
646        self.tenants
647            .get(&tenant_id)
648            .map(|entry| entry.value().clone())
649    }
650
651    /// Snapshot runtime counters for every active tenant.
652    #[must_use]
653    pub fn runtime_counters(&self) -> Vec<TenantRuntimeCounters> {
654        let mut out = Vec::with_capacity(self.tenants.len());
655        self.tenants
656            .iter()
657            .map(|entry| entry.value().runtime_counters())
658            .for_each(|counters| out.push(counters));
659        out.sort_by_key(|counters| counters.tenant_id);
660        out
661    }
662
663    /// Snapshot runtime counters into caller-owned storage.
664    pub fn runtime_counters_into(&self, out: &mut Vec<TenantRuntimeCounters>) {
665        out.clear();
666        out.reserve(self.tenants.len());
667        self.tenants
668            .iter()
669            .map(|entry| entry.value().runtime_counters())
670            .for_each(|counters| out.push(counters));
671        out.sort_by_key(|counters| counters.tenant_id);
672    }
673
674    /// Select a maximal independent subset of tenants for a fair
675    /// schedule slot.
676    ///
677    /// `conflict_adj[i*n+j] != 0` means tenants `i` and `j` cannot
678    /// share the same dispatch slot (e.g., both pinned to the same
679    /// queue, or both holding mutually-exclusive opcode locks). The
680    /// Returns a Vec of tenant ids in selection order. Empty if no
681    /// tenants are active.
682    #[must_use]
683    pub fn select_concurrent_tenants(&self, conflict_adj: &[u32]) -> Vec<u32> {
684        let mut out = Vec::new();
685        let mut scratch = TenantSelectionScratch::new();
686        self.select_concurrent_tenants_into(conflict_adj, &mut out, &mut scratch);
687        out
688    }
689
690    /// Select a maximal independent tenant subset into caller-owned storage.
691    pub fn select_concurrent_tenants_into(
692        &self,
693        conflict_adj: &[u32],
694        out: &mut Vec<u32>,
695        scratch: &mut TenantSelectionScratch,
696    ) {
697        out.clear();
698        scratch.active_ids.clear();
699        scratch.active_ids.reserve(self.tenants.len());
700        self.tenants
701            .iter()
702            .map(|entry| entry.value().id())
703            .for_each(|id| scratch.active_ids.push(id));
704        scratch.active_ids.sort_unstable();
705        let n = scratch.active_ids.len();
706        if n == 0 {
707            return;
708        }
709        if vyre_driver::accounting::checked_mul_usize_lazy(n, n, || ()).ok()
710            != Some(conflict_adj.len())
711        {
712            // Degenerate: caller didn't supply a matching adjacency.
713            // Default to all-tenants-can-run (no conflicts).
714            out.reserve(n);
715            out.extend(scratch.active_ids.iter().copied());
716            return;
717        }
718        if conflict_adj.iter().all(|conflict| *conflict == 0) {
719            out.reserve(n);
720            out.extend(scratch.active_ids.iter().copied());
721            return;
722        }
723        scratch.selected_indices.clear();
724        scratch.selected_indices.reserve(n);
725        'candidate: for candidate_idx in 0..n {
726            for &selected_idx in &scratch.selected_indices {
727                if conflict_adj[candidate_idx * n + selected_idx] != 0
728                    || conflict_adj[selected_idx * n + candidate_idx] != 0
729                {
730                    continue 'candidate;
731                }
732            }
733            scratch.selected_indices.push(candidate_idx);
734        }
735        out.reserve(scratch.selected_indices.len());
736        for &index in &scratch.selected_indices {
737            if let Some(&id) = scratch.active_ids.get(index) {
738                out.push(id);
739            }
740        }
741    }
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747
748    #[test]
749    fn two_tenants_get_distinct_id_and_opcode_ranges() {
750        let reg = TenantRegistry::new();
751        let a = reg
752            .register("scanner-a")
753            .expect("Fix: register a; restore this invariant before continuing.");
754        let b = reg
755            .register("scanner-b")
756            .expect("Fix: register b; restore this invariant before continuing.");
757        assert_ne!(a.id(), b.id());
758        assert!(a.base_opcode() + OPCODE_RANGE_PER_TENANT <= b.base_opcode());
759        assert_eq!(a.label(), "scanner-a");
760        assert_eq!(b.label(), "scanner-b");
761    }
762
763    #[test]
764    fn global_opcode_rejects_out_of_range_local() {
765        let reg = TenantRegistry::new();
766        let t = reg.register("soleno").unwrap();
767        let err = t
768            .global_opcode(OPCODE_RANGE_PER_TENANT)
769            .expect_err("oversized local opcode must reject");
770        assert!(matches!(err, TenantError::OpcodeOutOfRange { .. }));
771
772        let ok = t
773            .global_opcode(42)
774            .expect("Fix: 42 < cap; restore this invariant before continuing.");
775        assert_eq!(ok, t.base_opcode() + 42);
776    }
777
778    #[test]
779    fn publish_slot_writes_with_tenant_id_and_bumps_counter() {
780        let reg = TenantRegistry::new();
781        let t = reg.register("warpscan").unwrap();
782        let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
783
784        t.publish_slot(
785            &mut ring,
786            /* slot = */ 0,
787            /* local = */ 7,
788            &[1, 2, 3],
789        )
790        .expect("Fix: publish; restore this invariant before continuing.");
791        assert_eq!(t.published_count(), 1);
792
793        // Slot 0 should carry tenant=t.id(), opcode=t.base_opcode()+7.
794        let tenant_off = super::super::megakernel::protocol::TENANT_WORD as usize * 4;
795        let opcode_off = super::super::megakernel::protocol::OPCODE_WORD as usize * 4;
796        let stored_tenant =
797            u32::from_le_bytes(ring[tenant_off..tenant_off + 4].try_into().unwrap());
798        let stored_opcode =
799            u32::from_le_bytes(ring[opcode_off..opcode_off + 4].try_into().unwrap());
800        assert_eq!(stored_tenant, t.id());
801        assert_eq!(stored_opcode, t.base_opcode() + 7);
802    }
803
804    #[test]
805    fn unregister_blocks_future_publishes() {
806        let reg = TenantRegistry::new();
807        let t = reg.register("vein").unwrap();
808        let tenant_id = t.id();
809        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
810        t.publish_slot(&mut ring, 0, 0, &[0, 0, 0])
811            .expect("Fix: first publish ok; restore this invariant before continuing.");
812        reg.unregister(tenant_id)
813            .expect("Fix: unregister; restore this invariant before continuing.");
814        let err = t
815            .publish_slot(&mut ring, 1, 0, &[0, 0, 0])
816            .expect_err("publish after unregister must reject");
817        assert!(matches!(err, TenantError::Revoked { .. }));
818        assert!(reg.lookup(tenant_id).is_none());
819    }
820
821    #[test]
822    fn quiesce_returns_when_drained_catches_up() {
823        let reg = TenantRegistry::new();
824        let t = reg.register("t1").unwrap();
825        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
826        t.publish_slot(&mut ring, 0, 0, &[1, 2, 3]).unwrap();
827        t.publish_slot(&mut ring, 1, 0, &[4, 5, 6]).unwrap();
828        assert_eq!(t.published_count(), 2);
829        t.note_drained(2);
830        t.quiesce(1)
831            .expect("Fix: drained == published after note_drained; restore this invariant before continuing.");
832        let counters = t.runtime_counters();
833        assert_eq!(counters.published_count, 2);
834        assert_eq!(counters.drained_count, 2);
835        assert_eq!(counters.outstanding_slots, 0);
836        assert_eq!(counters.quiesce_calls, 1);
837        assert_eq!(counters.quiesce_timeouts, 0);
838    }
839
840    #[test]
841    fn quiesce_times_out_when_drain_stalled() {
842        let reg = TenantRegistry::new();
843        let t = reg.register("t2").unwrap();
844        let mut ring = Megakernel::try_encode_empty_ring(1).unwrap();
845        t.publish_slot(&mut ring, 0, 0, &[0, 0, 0]).unwrap();
846        // Never note_drained → quiesce must time out.
847        let err = t.quiesce(4).expect_err("stalled quiesce must time out");
848        assert!(matches!(
849            err,
850            TenantError::QuiesceTimeout { outstanding: 1, .. }
851        ));
852        let counters = t.runtime_counters();
853        assert_eq!(counters.outstanding_slots, 1);
854        assert_eq!(counters.quiesce_calls, 1);
855        assert_eq!(counters.quiesce_timeouts, 1);
856    }
857
858    #[test]
859    fn bounded_tenant_backpressure_rejects_unbounded_publish_backlog() {
860        let reg = TenantRegistry::new();
861        let t = reg.register_with_backpressure("bounded", 2).unwrap();
862        let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
863
864        t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
865        t.publish_slot(&mut ring, 1, 0, &[2]).unwrap();
866        let err = t
867            .publish_slot(&mut ring, 2, 0, &[3])
868            .expect_err("third outstanding publish must hit tenant backpressure");
869        assert!(matches!(
870            err,
871            TenantError::Backpressure {
872                outstanding: 2,
873                cap: 2,
874                ..
875            }
876        ));
877        assert_eq!(t.published_count(), 2);
878        let counters = t.runtime_counters();
879        assert_eq!(counters.max_outstanding_slots, 2);
880        assert_eq!(counters.outstanding_slots, 2);
881    }
882
883    #[test]
884    fn tenant_backpressure_reopens_after_drain_progress() {
885        let reg = TenantRegistry::new();
886        let t = reg.register_with_backpressure("bounded", 1).unwrap();
887        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
888
889        t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
890        assert!(matches!(
891            t.publish_slot(&mut ring, 1, 0, &[2]).unwrap_err(),
892            TenantError::Backpressure { .. }
893        ));
894        t.note_drained(1);
895        t.publish_slot(&mut ring, 1, 0, &[2])
896            .expect("Fix: drain progress must reopen the bounded tenant queue; restore this invariant before continuing.");
897        assert_eq!(t.published_count(), 2);
898        assert_eq!(t.runtime_counters().outstanding_slots, 1);
899    }
900
901    #[test]
902    fn tenant_registry_registration_retry_uses_adaptive_idle_not_unbounded_spin() {
903        for retry in [0, 1, 2, QUIESCE_SPIN_POLLS - 1, QUIESCE_SPIN_POLLS] {
904            tenant_registry_retry_idle(retry);
905        }
906        assert_eq!(
907            quiesce_backoff_duration(QUIESCE_SPIN_POLLS),
908            QUIESCE_MIN_PARK
909        );
910        assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
911    }
912
913    #[test]
914    fn quiesce_backoff_is_bounded_and_monotonic() {
915        let samples = [
916            quiesce_backoff_duration(0),
917            quiesce_backoff_duration(1),
918            quiesce_backoff_duration(2),
919            quiesce_backoff_duration(8),
920            quiesce_backoff_duration(64),
921        ];
922        assert_eq!(samples[0], QUIESCE_MIN_PARK);
923        for pair in samples.windows(2) {
924            assert!(pair[0] <= pair[1], "quiesce backoff must not shrink");
925            assert!(pair[1] <= QUIESCE_MAX_PARK, "quiesce backoff must cap");
926        }
927        assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
928    }
929
930    #[test]
931    fn active_tenants_tracks_registrations() {
932        let reg = TenantRegistry::new();
933        let a = reg.register("a").unwrap();
934        let b = reg.register("b").unwrap();
935        let active: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
936        assert!(active.contains(&a.id()));
937        assert!(active.contains(&b.id()));
938        reg.unregister(a.id());
939        let after: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
940        assert!(!after.contains(&a.id()));
941        assert!(after.contains(&b.id()));
942        let counters: Vec<u32> = reg
943            .runtime_counters()
944            .iter()
945            .map(|tenant| tenant.tenant_id)
946            .collect();
947        assert_eq!(counters, vec![b.id()]);
948    }
949
950    #[test]
951    fn tenant_snapshots_reuse_caller_storage() {
952        let reg = TenantRegistry::new();
953        let a = reg.register("a").unwrap();
954        let b = reg.register("b").unwrap();
955        let mut active = Vec::with_capacity(2);
956        let mut counters = Vec::with_capacity(2);
957
958        reg.active_tenants_into(&mut active);
959        reg.runtime_counters_into(&mut counters);
960        let active_ptr = active.as_ptr();
961        let counters_ptr = counters.as_ptr();
962        reg.active_tenants_into(&mut active);
963        reg.runtime_counters_into(&mut counters);
964
965        assert_eq!(active.as_ptr(), active_ptr);
966        assert_eq!(counters.as_ptr(), counters_ptr);
967        assert!(active.iter().any(|tenant| tenant.id() == a.id()));
968        assert!(active.iter().any(|tenant| tenant.id() == b.id()));
969        assert!(counters.iter().any(|tenant| tenant.tenant_id == a.id()));
970        assert!(counters.iter().any(|tenant| tenant.tenant_id == b.id()));
971    }
972
973    #[test]
974    fn concurrent_tenant_selection_reuses_scratch_and_output() {
975        let reg = TenantRegistry::new();
976        let a = reg.register("a").unwrap();
977        let b = reg.register("b").unwrap();
978        let c = reg.register("c").unwrap();
979        let n = 3;
980        let mut conflicts = vec![0_u32; n * n];
981        conflicts[0 * n + 1] = 1;
982        conflicts[1 * n + 0] = 1;
983        let mut out = Vec::with_capacity(3);
984        let mut scratch = TenantSelectionScratch::new();
985
986        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
987        let out_ptr = out.as_ptr();
988        let active_ids_ptr = scratch.active_ids.as_ptr();
989        let selected_ptr = scratch.selected_indices.as_ptr();
990        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
991
992        assert_eq!(out.as_ptr(), out_ptr);
993        assert_eq!(scratch.active_ids.as_ptr(), active_ids_ptr);
994        assert_eq!(scratch.selected_indices.as_ptr(), selected_ptr);
995        assert!(out.contains(&a.id()) || out.contains(&b.id()));
996        assert!(!(out.contains(&a.id()) && out.contains(&b.id())));
997        assert!(out.contains(&c.id()));
998    }
999
1000    #[test]
1001    fn concurrent_tenant_selection_fast_paths_all_zero_conflicts() {
1002        let reg = TenantRegistry::new();
1003        let a = reg.register("a").unwrap();
1004        let b = reg.register("b").unwrap();
1005        let c = reg.register("c").unwrap();
1006        let mut out = Vec::with_capacity(8);
1007        let mut scratch = TenantSelectionScratch::new();
1008        let conflicts = vec![0_u32; 9];
1009        let out_ptr = out.as_ptr();
1010
1011        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1012
1013        assert_eq!(out, vec![a.id(), b.id(), c.id()]);
1014        assert_eq!(
1015            out.as_ptr(),
1016            out_ptr,
1017            "all-zero conflict fast path must reuse caller-owned output storage"
1018        );
1019        assert!(
1020            scratch.selected_indices.is_empty(),
1021            "all-zero conflict fast path must not populate pairwise selection scratch"
1022        );
1023    }
1024
1025    #[test]
1026    fn concurrent_tenant_selection_respects_conflicts() {
1027        let reg = TenantRegistry::new();
1028        let a = reg.register("a").unwrap();
1029        let b = reg.register("b").unwrap();
1030        let c = reg.register("c").unwrap();
1031        let n = 3;
1032        let mut conflicts = vec![0_u32; n * n];
1033        conflicts[0 * n + 1] = 1;
1034        conflicts[1 * n + 0] = 1;
1035
1036        let selected = reg.select_concurrent_tenants(&conflicts);
1037
1038        assert!(selected.contains(&a.id()) || selected.contains(&b.id()));
1039        assert!(!(selected.contains(&a.id()) && selected.contains(&b.id())));
1040        assert!(selected.contains(&c.id()));
1041    }
1042
1043    #[test]
1044    fn concurrent_registration_assigns_unique_ids() {
1045        use std::thread;
1046        let reg = Arc::new(TenantRegistry::new());
1047        let mut handles = Vec::new();
1048        for i in 0..32 {
1049            let reg = Arc::clone(&reg);
1050            handles.push(thread::spawn(move || {
1051                reg.register(format!("t{i}")).unwrap().id()
1052            }));
1053        }
1054        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1055        let mut sorted = ids.clone();
1056        sorted.sort();
1057        sorted.dedup();
1058        assert_eq!(sorted.len(), ids.len(), "concurrent ids must be unique");
1059    }
1060}
1061