Skip to main content

vyre_runtime/
tenant.rs

1//! Multi-tenant megakernel multiplexing.
2//!
3//! A single persistent megakernel per GPU can service many producer
4//! tools without each one paying the dispatch-setup cost. The
5//! `tenant_id` field already lives in the ring-slot protocol
6//! (`protocol::TENANT_WORD`); this module owns the host-side
7//! bookkeeping that hands each producer a stable id, reserves an
8//! opcode-range per producer, and gates publish operations against a
9//! per-tenant mask so one producer cannot accidentally drive another
10//! producer's opcodes.
11//!
12//! ## Tenants and opcodes
13//!
14//! Every tenant owns an opcode range `[base, base + cap)` where the
15//! whole range sits inside the user-extension space reserved by
16//! `vyre_runtime::megakernel::protocol::opcode` (≥ `0x4000_0000`).
17//! When [`TenantRegistry::register`] returns a [`TenantHandle`],
18//! callers publish into slot args `[rule_local_opcode, ...]` and
19//! the registry maps that to `(tenant_base + rule_local_opcode)`
20//! before writing into the ring. A tenant that tries to publish an
21//! opcode outside its own range fails with a structured error.
22//!
23//! ## Draining
24//!
25//! Unregistering a tenant revokes future publishes but does NOT
26//! revoke in-flight slots  -  the GPU is still going to execute any
27//! slot it already CAS-claimed. Callers that need hard draining
28//! drive [`TenantHandle::quiesce`] which spins on the megakernel
29//! DONE_COUNT until every slot the tenant published has been
30//! acknowledged.
31//!
32//! ## Daemon surface
33//!
34//! The registry is the reusable piece. A full `MegakernelDaemon`
35//! (listening on a Unix socket, vending handles over RPC) is a thin
36//! wrapper that we can ship alongside the runtime  -  the registry
37//! here already handles the interesting concurrency.
38
39use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use dashmap::DashMap;
44
45use crate::megakernel::protocol::opcode::SHUTDOWN;
46use crate::megakernel::Megakernel;
47use crate::PipelineError;
48
49/// First opcode the tenant registry hands out. Sits inside the
50/// user-extension range reserved by the megakernel protocol so fused
51/// rule documents compose with tenant allocation without colliding
52/// with built-in opcodes.
53pub const TENANT_OPCODE_BASE: u32 = 0x4000_0000;
54
55/// Upper bound on the tenant-id space. `tenant_id == TENANT_ID_MAX`
56/// is reserved as an invalid / revoked sentinel.
57pub const TENANT_ID_MAX: u32 = u32::MAX - 1;
58
59/// Size of the opcode window reserved per tenant. 1 << 20 = 1 MiB
60/// of opcodes  -  well over any realistic rule count per producer
61/// while still allowing ~4094 simultaneous tenants inside the u32
62/// opcode range.
63pub const OPCODE_RANGE_PER_TENANT: u32 = 1 << 20;
64
65const QUIESCE_SPIN_POLLS: u64 = 64;
66const QUIESCE_MIN_PARK: Duration = Duration::from_micros(2);
67const QUIESCE_MAX_PARK: Duration = Duration::from_micros(50);
68const QUIESCE_BACKOFF_SHIFT_CAP: u64 = 5;
69
70#[allow(clippy::unnecessary_min_or_max)]
71fn quiesce_backoff_duration(poll: u64) -> Duration {
72    let parked_poll = poll.checked_sub(QUIESCE_SPIN_POLLS).unwrap_or(0);
73    let shift = u32::try_from(parked_poll.min(QUIESCE_BACKOFF_SHIFT_CAP)).unwrap_or_else(|error| {
74        panic!(
75            "tenant quiesce backoff shift cannot fit u32: {error}. Fix: lower QUIESCE_BACKOFF_SHIFT_CAP."
76        )
77    });
78    let multiplier = 1_u32.checked_shl(shift).unwrap_or_else(|| {
79        panic!("tenant quiesce backoff multiplier overflowed u32. Fix: lower shift cap.")
80    });
81    QUIESCE_MIN_PARK
82        .checked_mul(multiplier)
83        .unwrap_or_else(|| {
84            panic!("tenant quiesce backoff duration overflowed. Fix: lower quiesce park bounds.")
85        })
86        .min(QUIESCE_MAX_PARK)
87}
88
89fn quiesce_idle(poll: u64) {
90    if poll < QUIESCE_SPIN_POLLS {
91        std::hint::spin_loop();
92    } else {
93        std::thread::park_timeout(quiesce_backoff_duration(poll));
94    }
95}
96
97fn tenant_registry_retry_idle(retry: u64) {
98    if retry < QUIESCE_SPIN_POLLS {
99        std::hint::spin_loop();
100    } else {
101        std::thread::park_timeout(quiesce_backoff_duration(retry));
102    }
103}
104
105/// Errors surfaced by the tenant registry.
106#[derive(Debug, thiserror::Error)]
107#[non_exhaustive]
108pub enum TenantError {
109    /// The registry ran out of tenant ids. Unregister unused tenants
110    /// or raise the range per tenant.
111    #[error("tenant registry exhausted after {issued} registrations. Fix: shrink OPCODE_RANGE_PER_TENANT or recycle tenants.")]
112    RegistryFull {
113        /// Number of tenants already issued when exhaustion hit.
114        issued: u32,
115    },
116    /// Tried to publish an opcode outside the tenant's reserved
117    /// range. Almost always a caller bug.
118    #[error(
119        "tenant {tenant_id} published local opcode {local_opcode}; out of range [0, {cap}). \
120         Fix: caller must stay inside the opcode window returned by `register()`."
121    )]
122    OpcodeOutOfRange {
123        /// Tenant id that tripped.
124        tenant_id: u32,
125        /// Local opcode the caller supplied.
126        local_opcode: u32,
127        /// Cap on the tenant's local opcode range.
128        cap: u32,
129    },
130    /// Tenant was unregistered concurrently; its handle is stale.
131    #[error("tenant {tenant_id} was revoked; handle is stale. Fix: acquire a fresh handle from the registry.")]
132    Revoked {
133        /// Tenant id that was revoked.
134        tenant_id: u32,
135    },
136    /// Quiesce timed out with inflight slots still outstanding.
137    #[error(
138        "tenant {tenant_id} quiesce timed out with {outstanding} inflight slots. \
139         Fix: ensure the megakernel is making progress (check DONE_COUNT) or raise the timeout."
140    )]
141    QuiesceTimeout {
142        /// Tenant id whose quiesce tripped.
143        tenant_id: u32,
144        /// Number of slots still inflight at timeout.
145        outstanding: u64,
146    },
147    /// Tenant has reached its configured outstanding-slot cap.
148    #[error(
149        "tenant {tenant_id} has {outstanding} outstanding slots, cap {cap}. \
150         Fix: wait for drain progress or register the tenant with a larger bounded backlog."
151    )]
152    Backpressure {
153        /// Tenant id whose backlog is full.
154        tenant_id: u32,
155        /// Current host-visible outstanding slots.
156        outstanding: u64,
157        /// Configured outstanding-slot cap.
158        cap: u64,
159    },
160    /// Protocol error bubbled up from `Megakernel::publish_slot`.
161    #[error("{0}")]
162    Pipeline(#[from] PipelineError),
163}
164
165/// One tenant's accounting state. Lives inside an `Arc` so handles
166/// stay valid after the registry borrow drops.
167struct TenantState {
168    id: u32,
169    base_opcode: u32,
170    opcode_cap: u32,
171    /// Number of slots this tenant has ever published.
172    published_count: AtomicU64,
173    /// Maximum host-visible slots this tenant may keep outstanding.
174    max_outstanding_slots: u64,
175    /// Number of slots the GPU has reported DONE for this tenant.
176    /// Advanced by [`TenantHandle::note_drained`].
177    drained_count: AtomicU64,
178    /// Number of quiesce calls completed or timed out for this tenant.
179    quiesce_calls: AtomicU64,
180    /// Number of quiesce calls that timed out before the tenant drained.
181    quiesce_timeouts: AtomicU64,
182    /// Cumulative host-observed drain wait across quiesce calls.
183    quiesce_wait_ns: AtomicU64,
184    /// Set to 1 on `unregister`; publishes reject afterwards.
185    revoked: AtomicU32,
186    /// Stable label for diagnostics (for example, `"scanner-a"`, `"scanner-b"`).
187    label: String,
188}
189
190/// Stable handle returned by [`TenantRegistry::register`]. Clones
191/// share the same underlying state, so multiple producer threads
192/// inside one tenant can publish through their own handles.
193#[derive(Clone)]
194pub struct TenantHandle {
195    state: Arc<TenantState>,
196}
197
198/// Host-visible tenant runtime counters.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub struct TenantRuntimeCounters {
201    /// Tenant id.
202    pub tenant_id: u32,
203    /// Number of slots ever published by this tenant.
204    pub published_count: u64,
205    /// Number of slots observed drained for this tenant.
206    pub drained_count: u64,
207    /// Current host-visible backlog (`published_count - drained_count`).
208    pub outstanding_slots: u64,
209    /// Configured outstanding-slot cap for this tenant.
210    pub max_outstanding_slots: u64,
211    /// Number of quiesce calls recorded for this tenant.
212    pub quiesce_calls: u64,
213    /// Number of quiesce calls that timed out.
214    pub quiesce_timeouts: u64,
215    /// Cumulative nanoseconds spent waiting for this tenant to drain.
216    pub quiesce_wait_ns: u64,
217}
218
219impl std::fmt::Debug for TenantHandle {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("TenantHandle")
222            .field("id", &self.state.id)
223            .field("label", &self.state.label)
224            .field("base_opcode", &self.state.base_opcode)
225            .field(
226                "published_count",
227                &self.state.published_count.load(Ordering::Relaxed),
228            )
229            .field("max_outstanding_slots", &self.state.max_outstanding_slots)
230            .field(
231                "drained_count",
232                &self.state.drained_count.load(Ordering::Relaxed),
233            )
234            .field(
235                "revoked",
236                &(self.state.revoked.load(Ordering::Acquire) != 0),
237            )
238            .finish()
239    }
240}
241
242impl TenantHandle {
243    /// Stable tenant id; maps onto the ring-slot `TENANT_WORD`.
244    #[must_use]
245    pub fn id(&self) -> u32 {
246        self.state.id
247    }
248
249    /// Human-readable label supplied at registration time.
250    #[must_use]
251    pub fn label(&self) -> &str {
252        &self.state.label
253    }
254
255    /// First opcode this tenant owns.
256    #[must_use]
257    pub fn base_opcode(&self) -> u32 {
258        self.state.base_opcode
259    }
260
261    /// Convert a tenant-local opcode to the global opcode used in
262    /// the ring slot. Caller enforces `local < opcode_cap()`.
263    ///
264    /// # Errors
265    ///
266    /// Returns [`TenantError::OpcodeOutOfRange`] when the local
267    /// value is outside the reserved window.
268    pub fn global_opcode(&self, local: u32) -> Result<u32, TenantError> {
269        if local >= self.state.opcode_cap {
270            return Err(TenantError::OpcodeOutOfRange {
271                tenant_id: self.id(),
272                local_opcode: local,
273                cap: self.state.opcode_cap,
274            });
275        }
276        let global = self.state.base_opcode + local;
277        if let Err(e) = crate::megakernel::protocol::opcode::validate_user_opcode(global) {
278            return Err(TenantError::Pipeline(PipelineError::Backend(format!(
279                "tenant registry produced invalid global opcode {global}: {e}. Fix: repair tenant opcode window allocation before publishing."
280            ))));
281        }
282        Ok(global)
283    }
284
285    /// Publish a slot into the tenant's ring with a tenant-local
286    /// opcode. Convenience wrapper that composes
287    /// [`Megakernel::publish_slot`] with tenant bookkeeping.
288    ///
289    /// # Errors
290    ///
291    /// - [`TenantError::Revoked`] if the tenant was unregistered.
292    /// - [`TenantError::OpcodeOutOfRange`] if `local_opcode` is
293    ///   outside the tenant's window.
294    /// - [`TenantError::Pipeline`] when the underlying
295    ///   `publish_slot` rejects (e.g., slot still in-flight).
296    pub fn publish_slot(
297        &self,
298        ring_bytes: &mut [u8],
299        slot_idx: u32,
300        local_opcode: u32,
301        args: &[u32],
302    ) -> Result<(), TenantError> {
303        if self.state.revoked.load(Ordering::Acquire) != 0 {
304            return Err(TenantError::Revoked {
305                tenant_id: self.state.id,
306            });
307        }
308        let global = self.global_opcode(local_opcode)?;
309        self.reserve_publish_slot()?;
310        if let Err(error) =
311            Megakernel::publish_slot(ring_bytes, slot_idx, self.state.id, global, args)
312        {
313            checked_atomic_sub_u64(&self.state.published_count, 1, "tenant published rollback");
314            return Err(error.into());
315        }
316        Ok(())
317    }
318
319    fn reserve_publish_slot(&self) -> Result<(), TenantError> {
320        let cap = self.state.max_outstanding_slots;
321        vyre_driver::accounting::checked_atomic_update_u64_with_order(
322            &self.state.published_count,
323            Ordering::Acquire,
324            Ordering::AcqRel,
325            Ordering::Acquire,
326            |published| {
327                let drained = self.state.drained_count.load(Ordering::Acquire);
328                let outstanding = vyre_driver::accounting::checked_sub_u64_lazy(
329                    published,
330                    drained,
331                    || {
332                        TenantError::Pipeline(PipelineError::QueueFull {
333                            queue: "tenant",
334                            fix: "tenant drained_count exceeded published_count; rebuild tenant accounting state",
335                        })
336                    },
337                )?;
338                if outstanding >= cap {
339                    return Err(TenantError::Backpressure {
340                        tenant_id: self.state.id,
341                        outstanding,
342                        cap,
343                    });
344                }
345                vyre_driver::accounting::checked_add_u64_lazy(published, 1, || {
346                    TenantError::Pipeline(PipelineError::QueueFull {
347                        queue: "tenant",
348                        fix: "tenant published_count overflowed u64; quiesce or recreate the tenant before publishing more slots",
349                    })
350                })
351            },
352            |_, _| Ok(()),
353        )?;
354        Ok(())
355    }
356
357    /// Number of slots this tenant has ever published.
358    #[must_use]
359    pub fn published_count(&self) -> u64 {
360        self.state.published_count.load(Ordering::Relaxed)
361    }
362
363    /// Number of slots this tenant has observed drained (via
364    /// [`note_drained`](Self::note_drained)).
365    #[must_use]
366    pub fn drained_count(&self) -> u64 {
367        self.state.drained_count.load(Ordering::Relaxed)
368    }
369
370    /// Maximum host-visible slots this tenant may keep outstanding.
371    #[must_use]
372    pub fn max_outstanding_slots(&self) -> u64 {
373        self.state.max_outstanding_slots
374    }
375
376    /// Snapshot host-visible runtime counters for this tenant.
377    #[must_use]
378    pub fn runtime_counters(&self) -> TenantRuntimeCounters {
379        let published_count = self.state.published_count.load(Ordering::Acquire);
380        let drained_count = self.state.drained_count.load(Ordering::Acquire);
381        TenantRuntimeCounters {
382            tenant_id: self.state.id,
383            published_count,
384            drained_count,
385            outstanding_slots: vyre_driver::accounting::checked_sub_u64_lazy(
386                published_count,
387                drained_count,
388                || "tenant drained_count exceeded published_count. Fix: rebuild tenant accounting state.",
389            )
390            .unwrap_or_else(|message| panic!("{message}")),
391            max_outstanding_slots: self.state.max_outstanding_slots,
392            quiesce_calls: self.state.quiesce_calls.load(Ordering::Acquire),
393            quiesce_timeouts: self.state.quiesce_timeouts.load(Ordering::Acquire),
394            quiesce_wait_ns: self.state.quiesce_wait_ns.load(Ordering::Acquire),
395        }
396    }
397
398    /// Mark `count` slots as drained. The host pump that observes
399    /// DONE_COUNT calls this when it sees the global counter
400    /// advance past the tenant's last-published cursor.
401    pub fn note_drained(&self, count: u64) {
402        checked_atomic_add_u64(&self.state.drained_count, count, "tenant drained_count");
403    }
404
405    /// Block-style quiesce: bounded backoff until every published
406    /// slot has been drained or `max_spins` polls elapse.
407    ///
408    /// # Errors
409    ///
410    /// Returns [`TenantError::QuiesceTimeout`] when `max_spins`
411    /// iterations pass without full drain. The outstanding count
412    /// at timeout is included for diagnostics.
413    pub fn quiesce(&self, max_spins: u64) -> Result<(), TenantError> {
414        let started = Instant::now();
415        for poll in 0..max_spins {
416            let pub_count = self.state.published_count.load(Ordering::Acquire);
417            let drained = self.state.drained_count.load(Ordering::Acquire);
418            if drained >= pub_count {
419                self.record_quiesce(started, false);
420                return Ok(());
421            }
422            quiesce_idle(poll);
423        }
424        let pub_count = self.state.published_count.load(Ordering::Acquire);
425        let drained = self.state.drained_count.load(Ordering::Acquire);
426        self.record_quiesce(started, true);
427        Err(TenantError::QuiesceTimeout {
428            tenant_id: self.state.id,
429            outstanding: vyre_driver::accounting::checked_sub_u64_lazy(pub_count, drained, || {
430                TenantError::Pipeline(PipelineError::QueueFull {
431                    queue: "tenant",
432                    fix: "tenant drained_count exceeded published_count during quiesce; rebuild tenant accounting state",
433                })
434            })?,
435        })
436    }
437
438    fn record_quiesce(&self, started: Instant, timed_out: bool) {
439        checked_atomic_add_u64(&self.state.quiesce_calls, 1, "tenant quiesce_calls");
440        if timed_out {
441            checked_atomic_add_u64(&self.state.quiesce_timeouts, 1, "tenant quiesce_timeouts");
442        }
443        let elapsed_ns = u64::try_from(started.elapsed().as_nanos()).unwrap_or_else(|error| {
444            panic!(
445                "tenant quiesce elapsed nanoseconds cannot fit u64: {error}. Fix: quiesce with a bounded timeout."
446            )
447        });
448        checked_atomic_add_u64(
449            &self.state.quiesce_wait_ns,
450            elapsed_ns,
451            "tenant quiesce_wait_ns",
452        );
453    }
454}
455
456/// Thread-safe tenant registry. One per megakernel instance.
457
458pub struct TenantRegistry {
459    tenants: DashMap<u32, TenantHandle>,
460    next_id: AtomicU32,
461}
462
463impl Default for TenantRegistry {
464    fn default() -> Self {
465        Self {
466            tenants: DashMap::new(),
467            next_id: AtomicU32::new(0),
468        }
469    }
470}
471
472/// Caller-owned scratch for repeated concurrent-tenant selection.
473#[derive(Debug, Default)]
474pub struct TenantSelectionScratch {
475    active_ids: Vec<u32>,
476    selected_indices: Vec<usize>,
477}
478
479impl TenantSelectionScratch {
480    /// Construct empty tenant-selection scratch.
481    #[must_use]
482    pub const fn new() -> Self {
483        Self {
484            active_ids: Vec::new(),
485            selected_indices: Vec::new(),
486        }
487    }
488}
489
490fn checked_atomic_add_u64(counter: &AtomicU64, value: u64, label: &'static str) {
491    vyre_driver::accounting::checked_atomic_add_u64_with_order(
492        counter,
493        value,
494        Ordering::Acquire,
495        Ordering::AcqRel,
496        Ordering::Acquire,
497        |_, _| {
498            format!("{label} overflowed u64. Fix: quiesce or recreate the tenant accounting state.")
499        },
500    )
501    .unwrap_or_else(|message| panic!("{message}"));
502}
503
504fn checked_atomic_sub_u64(counter: &AtomicU64, value: u64, label: &'static str) {
505    vyre_driver::accounting::checked_atomic_sub_u64_with_order(
506        counter,
507        value,
508        Ordering::Acquire,
509        Ordering::AcqRel,
510        Ordering::Acquire,
511        |_, _| format!("{label} underflowed u64. Fix: rebuild tenant accounting state."),
512    )
513    .unwrap_or_else(|message| panic!("{message}"));
514}
515
516impl TenantRegistry {
517    /// Fresh registry with no tenants.
518    #[must_use]
519    pub fn new() -> Self {
520        Self::default()
521    }
522
523    /// Register a new tenant with the given diagnostic label.
524    /// Returns a handle whose opcode range is reserved until
525    /// [`unregister`](Self::unregister) is called.
526    ///
527    /// # Errors
528    ///
529    /// Returns [`TenantError::RegistryFull`] when the tenant id or
530    /// opcode space is exhausted.
531    pub fn register(&self, label: impl Into<String>) -> Result<TenantHandle, TenantError> {
532        self.register_with_backpressure(label, u64::MAX)
533    }
534
535    /// Register a new tenant with a bounded outstanding-slot budget.
536    ///
537    /// # Errors
538    ///
539    /// Returns [`TenantError::RegistryFull`] when the tenant id or opcode space
540    /// is exhausted.
541    pub fn register_with_backpressure(
542        &self,
543        label: impl Into<String>,
544        max_outstanding_slots: u64,
545    ) -> Result<TenantHandle, TenantError> {
546        let mut registration_retries = 0u64;
547        let issued = vyre_driver::accounting::checked_atomic_update_u32_with_order(
548            &self.next_id,
549            Ordering::Relaxed,
550            Ordering::SeqCst,
551            Ordering::Relaxed,
552            |current| {
553                if current >= TENANT_ID_MAX {
554                    return Err(TenantError::RegistryFull { issued: current });
555                }
556                let id = current.max(1);
557                id.checked_add(1)
558                    .ok_or(TenantError::RegistryFull { issued: current })
559            },
560            |_, _| {
561                tenant_registry_retry_idle(registration_retries);
562                registration_retries = vyre_driver::accounting::checked_add_u64_lazy(
563                    registration_retries,
564                    1,
565                    || {
566                        TenantError::Pipeline(PipelineError::QueueFull {
567                            queue: "tenant",
568                            fix: "tenant registration retry counter overflowed u64; retry registration later",
569                        })
570                    },
571                )?;
572                Ok(())
573            },
574        )?;
575        let id = issued.max(1);
576
577        let tenant_offset = vyre_driver::accounting::checked_mul_u32_value(
578            id,
579            OPCODE_RANGE_PER_TENANT,
580            TenantError::RegistryFull { issued },
581        )?;
582        let base_opcode = vyre_driver::accounting::checked_add_u32_value(
583            TENANT_OPCODE_BASE,
584            tenant_offset,
585            TenantError::RegistryFull { issued },
586        )?;
587        let top_opcode = vyre_driver::accounting::checked_add_u32_value(
588            base_opcode,
589            OPCODE_RANGE_PER_TENANT,
590            TenantError::RegistryFull { issued },
591        )?;
592        if top_opcode == SHUTDOWN {
593            return Err(TenantError::RegistryFull { issued });
594        }
595        let handle = TenantHandle {
596            state: Arc::new(TenantState {
597                id,
598                base_opcode,
599                opcode_cap: OPCODE_RANGE_PER_TENANT,
600                published_count: AtomicU64::new(0),
601                max_outstanding_slots: max_outstanding_slots.max(1),
602                drained_count: AtomicU64::new(0),
603                quiesce_calls: AtomicU64::new(0),
604                quiesce_timeouts: AtomicU64::new(0),
605                quiesce_wait_ns: AtomicU64::new(0),
606                revoked: AtomicU32::new(0),
607                label: label.into(),
608            }),
609        };
610        self.tenants.insert(id, handle.clone());
611        Ok(handle)
612    }
613
614    /// Unregister a tenant. Future publishes on the handle fail
615    /// with [`TenantError::Revoked`]. In-flight slots already on
616    /// the GPU still execute  -  the host is responsible for
617    /// quiescing before unregister if it needs that guarantee.
618    pub fn unregister(&self, tenant_id: u32) -> Option<TenantHandle> {
619        let (_, handle) = self.tenants.remove(&tenant_id)?;
620        handle.state.revoked.store(1, Ordering::Release);
621        Some(handle)
622    }
623
624    /// Snapshot of active tenants for observability / diagnostics.
625    #[must_use]
626    pub fn active_tenants(&self) -> Vec<TenantHandle> {
627        let mut out = Vec::with_capacity(self.tenants.len());
628        out.extend(self.tenants.iter().map(|entry| entry.value().clone()));
629        out.sort_by_key(TenantHandle::id);
630        out
631    }
632
633    /// Snapshot active tenants into caller-owned storage.
634    pub fn active_tenants_into(&self, out: &mut Vec<TenantHandle>) {
635        out.clear();
636        out.reserve(self.tenants.len());
637        self.tenants
638            .iter()
639            .for_each(|entry| out.push(entry.value().clone()));
640        out.sort_by_key(TenantHandle::id);
641    }
642
643    /// Look up a tenant by id. Returns `None` if the id was
644    /// unregistered.
645    #[must_use]
646    pub fn lookup(&self, tenant_id: u32) -> Option<TenantHandle> {
647        self.tenants
648            .get(&tenant_id)
649            .map(|entry| entry.value().clone())
650    }
651
652    /// Snapshot runtime counters for every active tenant.
653    #[must_use]
654    pub fn runtime_counters(&self) -> Vec<TenantRuntimeCounters> {
655        let mut out = Vec::with_capacity(self.tenants.len());
656        self.tenants
657            .iter()
658            .map(|entry| entry.value().runtime_counters())
659            .for_each(|counters| out.push(counters));
660        out.sort_by_key(|counters| counters.tenant_id);
661        out
662    }
663
664    /// Snapshot runtime counters into caller-owned storage.
665    pub fn runtime_counters_into(&self, out: &mut Vec<TenantRuntimeCounters>) {
666        out.clear();
667        out.reserve(self.tenants.len());
668        self.tenants
669            .iter()
670            .map(|entry| entry.value().runtime_counters())
671            .for_each(|counters| out.push(counters));
672        out.sort_by_key(|counters| counters.tenant_id);
673    }
674
675    /// Select a maximal independent subset of tenants for a fair
676    /// schedule slot.
677    ///
678    /// `conflict_adj[i*n+j] != 0` means tenants `i` and `j` cannot
679    /// share the same dispatch slot (e.g., both pinned to the same
680    /// queue, or both holding mutually-exclusive opcode locks). The
681    /// Returns a Vec of tenant ids in selection order. Empty if no
682    /// tenants are active.
683    #[must_use]
684    pub fn select_concurrent_tenants(&self, conflict_adj: &[u32]) -> Vec<u32> {
685        let mut out = Vec::new();
686        let mut scratch = TenantSelectionScratch::new();
687        self.select_concurrent_tenants_into(conflict_adj, &mut out, &mut scratch);
688        out
689    }
690
691    /// Select a maximal independent tenant subset into caller-owned storage.
692    pub fn select_concurrent_tenants_into(
693        &self,
694        conflict_adj: &[u32],
695        out: &mut Vec<u32>,
696        scratch: &mut TenantSelectionScratch,
697    ) {
698        out.clear();
699        scratch.active_ids.clear();
700        scratch.active_ids.reserve(self.tenants.len());
701        self.tenants
702            .iter()
703            .map(|entry| entry.value().id())
704            .for_each(|id| scratch.active_ids.push(id));
705        scratch.active_ids.sort_unstable();
706        let n = scratch.active_ids.len();
707        if n == 0 {
708            return;
709        }
710        if vyre_driver::accounting::checked_mul_usize_lazy(n, n, || ()).ok()
711            != Some(conflict_adj.len())
712        {
713            // Degenerate: caller didn't supply a matching adjacency.
714            // Default to all-tenants-can-run (no conflicts).
715            out.reserve(n);
716            out.extend(scratch.active_ids.iter().copied());
717            return;
718        }
719        if conflict_adj.iter().all(|conflict| *conflict == 0) {
720            out.reserve(n);
721            out.extend(scratch.active_ids.iter().copied());
722            return;
723        }
724        scratch.selected_indices.clear();
725        scratch.selected_indices.reserve(n);
726        'candidate: for candidate_idx in 0..n {
727            for &selected_idx in &scratch.selected_indices {
728                if conflict_adj[candidate_idx * n + selected_idx] != 0
729                    || conflict_adj[selected_idx * n + candidate_idx] != 0
730                {
731                    continue 'candidate;
732                }
733            }
734            scratch.selected_indices.push(candidate_idx);
735        }
736        out.reserve(scratch.selected_indices.len());
737        for &index in &scratch.selected_indices {
738            if let Some(&id) = scratch.active_ids.get(index) {
739                out.push(id);
740            }
741        }
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748
749    #[test]
750    fn two_tenants_get_distinct_id_and_opcode_ranges() {
751        let reg = TenantRegistry::new();
752        let a = reg
753            .register("scanner-a")
754            .expect("Fix: register a; restore this invariant before continuing.");
755        let b = reg
756            .register("scanner-b")
757            .expect("Fix: register b; restore this invariant before continuing.");
758        assert_ne!(a.id(), b.id());
759        assert!(a.base_opcode() + OPCODE_RANGE_PER_TENANT <= b.base_opcode());
760        assert_eq!(a.label(), "scanner-a");
761        assert_eq!(b.label(), "scanner-b");
762    }
763
764    #[test]
765    fn global_opcode_rejects_out_of_range_local() {
766        let reg = TenantRegistry::new();
767        let t = reg.register("soleno").unwrap();
768        let err = t
769            .global_opcode(OPCODE_RANGE_PER_TENANT)
770            .expect_err("oversized local opcode must reject");
771        assert!(matches!(err, TenantError::OpcodeOutOfRange { .. }));
772
773        let ok = t
774            .global_opcode(42)
775            .expect("Fix: 42 < cap; restore this invariant before continuing.");
776        assert_eq!(ok, t.base_opcode() + 42);
777    }
778
779    #[test]
780    fn publish_slot_writes_with_tenant_id_and_bumps_counter() {
781        let reg = TenantRegistry::new();
782        let t = reg.register("warpscan").unwrap();
783        let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
784
785        t.publish_slot(
786            &mut ring,
787            /* slot = */ 0,
788            /* local = */ 7,
789            &[1, 2, 3],
790        )
791        .expect("Fix: publish; restore this invariant before continuing.");
792        assert_eq!(t.published_count(), 1);
793
794        // Slot 0 should carry tenant=t.id(), opcode=t.base_opcode()+7.
795        let tenant_off = super::super::megakernel::protocol::TENANT_WORD as usize * 4;
796        let opcode_off = super::super::megakernel::protocol::OPCODE_WORD as usize * 4;
797        let stored_tenant =
798            u32::from_le_bytes(ring[tenant_off..tenant_off + 4].try_into().unwrap());
799        let stored_opcode =
800            u32::from_le_bytes(ring[opcode_off..opcode_off + 4].try_into().unwrap());
801        assert_eq!(stored_tenant, t.id());
802        assert_eq!(stored_opcode, t.base_opcode() + 7);
803    }
804
805    #[test]
806    fn unregister_blocks_future_publishes() {
807        let reg = TenantRegistry::new();
808        let t = reg.register("vein").unwrap();
809        let tenant_id = t.id();
810        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
811        t.publish_slot(&mut ring, 0, 0, &[0, 0, 0])
812            .expect("Fix: first publish ok; restore this invariant before continuing.");
813        reg.unregister(tenant_id)
814            .expect("Fix: unregister; restore this invariant before continuing.");
815        let err = t
816            .publish_slot(&mut ring, 1, 0, &[0, 0, 0])
817            .expect_err("publish after unregister must reject");
818        assert!(matches!(err, TenantError::Revoked { .. }));
819        assert!(reg.lookup(tenant_id).is_none());
820    }
821
822    #[test]
823    fn quiesce_returns_when_drained_catches_up() {
824        let reg = TenantRegistry::new();
825        let t = reg.register("t1").unwrap();
826        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
827        t.publish_slot(&mut ring, 0, 0, &[1, 2, 3]).unwrap();
828        t.publish_slot(&mut ring, 1, 0, &[4, 5, 6]).unwrap();
829        assert_eq!(t.published_count(), 2);
830        t.note_drained(2);
831        t.quiesce(1)
832            .expect("Fix: drained == published after note_drained; restore this invariant before continuing.");
833        let counters = t.runtime_counters();
834        assert_eq!(counters.published_count, 2);
835        assert_eq!(counters.drained_count, 2);
836        assert_eq!(counters.outstanding_slots, 0);
837        assert_eq!(counters.quiesce_calls, 1);
838        assert_eq!(counters.quiesce_timeouts, 0);
839    }
840
841    #[test]
842    fn quiesce_times_out_when_drain_stalled() {
843        let reg = TenantRegistry::new();
844        let t = reg.register("t2").unwrap();
845        let mut ring = Megakernel::try_encode_empty_ring(1).unwrap();
846        t.publish_slot(&mut ring, 0, 0, &[0, 0, 0]).unwrap();
847        // Never note_drained → quiesce must time out.
848        let err = t.quiesce(4).expect_err("stalled quiesce must time out");
849        assert!(matches!(
850            err,
851            TenantError::QuiesceTimeout { outstanding: 1, .. }
852        ));
853        let counters = t.runtime_counters();
854        assert_eq!(counters.outstanding_slots, 1);
855        assert_eq!(counters.quiesce_calls, 1);
856        assert_eq!(counters.quiesce_timeouts, 1);
857    }
858
859    #[test]
860    fn bounded_tenant_backpressure_rejects_unbounded_publish_backlog() {
861        let reg = TenantRegistry::new();
862        let t = reg.register_with_backpressure("bounded", 2).unwrap();
863        let mut ring = Megakernel::try_encode_empty_ring(4).unwrap();
864
865        t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
866        t.publish_slot(&mut ring, 1, 0, &[2]).unwrap();
867        let err = t
868            .publish_slot(&mut ring, 2, 0, &[3])
869            .expect_err("third outstanding publish must hit tenant backpressure");
870        assert!(matches!(
871            err,
872            TenantError::Backpressure {
873                outstanding: 2,
874                cap: 2,
875                ..
876            }
877        ));
878        assert_eq!(t.published_count(), 2);
879        let counters = t.runtime_counters();
880        assert_eq!(counters.max_outstanding_slots, 2);
881        assert_eq!(counters.outstanding_slots, 2);
882    }
883
884    #[test]
885    fn tenant_backpressure_reopens_after_drain_progress() {
886        let reg = TenantRegistry::new();
887        let t = reg.register_with_backpressure("bounded", 1).unwrap();
888        let mut ring = Megakernel::try_encode_empty_ring(2).unwrap();
889
890        t.publish_slot(&mut ring, 0, 0, &[1]).unwrap();
891        assert!(matches!(
892            t.publish_slot(&mut ring, 1, 0, &[2]).unwrap_err(),
893            TenantError::Backpressure { .. }
894        ));
895        t.note_drained(1);
896        t.publish_slot(&mut ring, 1, 0, &[2])
897            .expect("Fix: drain progress must reopen the bounded tenant queue; restore this invariant before continuing.");
898        assert_eq!(t.published_count(), 2);
899        assert_eq!(t.runtime_counters().outstanding_slots, 1);
900    }
901
902    #[test]
903    fn tenant_registry_registration_retry_uses_adaptive_idle_not_unbounded_spin() {
904        for retry in [0, 1, 2, QUIESCE_SPIN_POLLS - 1, QUIESCE_SPIN_POLLS] {
905            tenant_registry_retry_idle(retry);
906        }
907        assert_eq!(
908            quiesce_backoff_duration(QUIESCE_SPIN_POLLS),
909            QUIESCE_MIN_PARK
910        );
911        assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
912    }
913
914    #[test]
915    fn quiesce_backoff_is_bounded_and_monotonic() {
916        let samples = [
917            quiesce_backoff_duration(0),
918            quiesce_backoff_duration(1),
919            quiesce_backoff_duration(2),
920            quiesce_backoff_duration(8),
921            quiesce_backoff_duration(64),
922        ];
923        assert_eq!(samples[0], QUIESCE_MIN_PARK);
924        for pair in samples.windows(2) {
925            assert!(pair[0] <= pair[1], "quiesce backoff must not shrink");
926            assert!(pair[1] <= QUIESCE_MAX_PARK, "quiesce backoff must cap");
927        }
928        assert_eq!(quiesce_backoff_duration(u64::MAX), QUIESCE_MAX_PARK);
929    }
930
931    #[test]
932    fn active_tenants_tracks_registrations() {
933        let reg = TenantRegistry::new();
934        let a = reg.register("a").unwrap();
935        let b = reg.register("b").unwrap();
936        let active: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
937        assert!(active.contains(&a.id()));
938        assert!(active.contains(&b.id()));
939        reg.unregister(a.id());
940        let after: Vec<u32> = reg.active_tenants().iter().map(|t| t.id()).collect();
941        assert!(!after.contains(&a.id()));
942        assert!(after.contains(&b.id()));
943        let counters: Vec<u32> = reg
944            .runtime_counters()
945            .iter()
946            .map(|tenant| tenant.tenant_id)
947            .collect();
948        assert_eq!(counters, vec![b.id()]);
949    }
950
951    #[test]
952    fn tenant_snapshots_reuse_caller_storage() {
953        let reg = TenantRegistry::new();
954        let a = reg.register("a").unwrap();
955        let b = reg.register("b").unwrap();
956        let mut active = Vec::with_capacity(2);
957        let mut counters = Vec::with_capacity(2);
958
959        reg.active_tenants_into(&mut active);
960        reg.runtime_counters_into(&mut counters);
961        let active_ptr = active.as_ptr();
962        let counters_ptr = counters.as_ptr();
963        reg.active_tenants_into(&mut active);
964        reg.runtime_counters_into(&mut counters);
965
966        assert_eq!(active.as_ptr(), active_ptr);
967        assert_eq!(counters.as_ptr(), counters_ptr);
968        assert!(active.iter().any(|tenant| tenant.id() == a.id()));
969        assert!(active.iter().any(|tenant| tenant.id() == b.id()));
970        assert!(counters.iter().any(|tenant| tenant.tenant_id == a.id()));
971        assert!(counters.iter().any(|tenant| tenant.tenant_id == b.id()));
972    }
973
974    #[test]
975    fn concurrent_tenant_selection_reuses_scratch_and_output() {
976        let reg = TenantRegistry::new();
977        let a = reg.register("a").unwrap();
978        let b = reg.register("b").unwrap();
979        let c = reg.register("c").unwrap();
980        let n = 3;
981        let mut conflicts = vec![0_u32; n * n];
982        conflicts[0 * n + 1] = 1;
983        conflicts[1 * n + 0] = 1;
984        let mut out = Vec::with_capacity(3);
985        let mut scratch = TenantSelectionScratch::new();
986
987        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
988        let out_ptr = out.as_ptr();
989        let active_ids_ptr = scratch.active_ids.as_ptr();
990        let selected_ptr = scratch.selected_indices.as_ptr();
991        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
992
993        assert_eq!(out.as_ptr(), out_ptr);
994        assert_eq!(scratch.active_ids.as_ptr(), active_ids_ptr);
995        assert_eq!(scratch.selected_indices.as_ptr(), selected_ptr);
996        assert!(out.contains(&a.id()) || out.contains(&b.id()));
997        assert!(!(out.contains(&a.id()) && out.contains(&b.id())));
998        assert!(out.contains(&c.id()));
999    }
1000
1001    #[test]
1002    fn concurrent_tenant_selection_fast_paths_all_zero_conflicts() {
1003        let reg = TenantRegistry::new();
1004        let a = reg.register("a").unwrap();
1005        let b = reg.register("b").unwrap();
1006        let c = reg.register("c").unwrap();
1007        let mut out = Vec::with_capacity(8);
1008        let mut scratch = TenantSelectionScratch::new();
1009        let conflicts = vec![0_u32; 9];
1010        let out_ptr = out.as_ptr();
1011
1012        reg.select_concurrent_tenants_into(&conflicts, &mut out, &mut scratch);
1013
1014        assert_eq!(out, vec![a.id(), b.id(), c.id()]);
1015        assert_eq!(
1016            out.as_ptr(),
1017            out_ptr,
1018            "all-zero conflict fast path must reuse caller-owned output storage"
1019        );
1020        assert!(
1021            scratch.selected_indices.is_empty(),
1022            "all-zero conflict fast path must not populate pairwise selection scratch"
1023        );
1024    }
1025
1026    #[test]
1027    fn concurrent_tenant_selection_respects_conflicts() {
1028        let reg = TenantRegistry::new();
1029        let a = reg.register("a").unwrap();
1030        let b = reg.register("b").unwrap();
1031        let c = reg.register("c").unwrap();
1032        let n = 3;
1033        let mut conflicts = vec![0_u32; n * n];
1034        conflicts[0 * n + 1] = 1;
1035        conflicts[1 * n + 0] = 1;
1036
1037        let selected = reg.select_concurrent_tenants(&conflicts);
1038
1039        assert!(selected.contains(&a.id()) || selected.contains(&b.id()));
1040        assert!(!(selected.contains(&a.id()) && selected.contains(&b.id())));
1041        assert!(selected.contains(&c.id()));
1042    }
1043
1044    #[test]
1045    fn concurrent_registration_assigns_unique_ids() {
1046        use std::thread;
1047        let reg = Arc::new(TenantRegistry::new());
1048        let mut handles = Vec::new();
1049        for i in 0..32 {
1050            let reg = Arc::clone(&reg);
1051            handles.push(thread::spawn(move || {
1052                reg.register(format!("t{i}")).unwrap().id()
1053            }));
1054        }
1055        let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1056        let mut sorted = ids.clone();
1057        sorted.sort();
1058        sorted.dedup();
1059        assert_eq!(sorted.len(), ids.len(), "concurrent ids must be unique");
1060    }
1061}