Skip to main content

ringkernel_core/k2k/
tenant.rs

1//! K2K tenant registry, quotas, and per-engagement cost tracking.
2//!
3//! This module implements the lower-level, per-message tenancy boundary used
4//! inside the K2K broker. It is distinct from the governance-level
5//! [`crate::tenancy`] module (which uses string tenant IDs and coarse
6//! kernel-count / memory quotas). Here, [`TenantId`] is a `u64` that matches
7//! the format stamped into per-message envelopes and into GPU-side routing
8//! tables.
9//!
10//! # Responsibilities
11//!
12//! - Register tenants with [`TenantQuota`] limits
13//! - Check quotas on kernel registration and message sends
14//! - Track billable GPU-seconds per `(tenant_id, audit_tag)` pair
15//! - Emit audit events for cross-tenant attempts via [`AuditSink`]
16
17use std::collections::HashMap;
18use std::fmt;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use parking_lot::RwLock;
24
25use crate::audit::{AuditEvent, AuditEventType, AuditLevel, AuditSink};
26use crate::error::{Result, RingKernelError};
27use crate::k2k::audit_tag::AuditTag;
28
29/// Stable tenant identifier as used in K2K routing tables and message envelopes.
30///
31/// `0` is reserved for the "unspecified tenant" — the default for legacy
32/// single-tenant deployments. Tenant IDs are `u64` so they can be stamped
33/// directly into GPU-shared routing entries without string marshalling.
34pub type TenantId = u64;
35
36/// Reserved tenant ID meaning "no tenant specified" (backward-compat default).
37pub const UNSPECIFIED_TENANT: TenantId = 0;
38
39// ============================================================================
40// TenantQuota
41// ============================================================================
42
43/// Per-tenant quota and cost tracking limits.
44///
45/// Unlike the governance-level [`crate::tenancy::ResourceQuota`], this is the
46/// hot-path quota checked on every K2K send.
47#[derive(Debug, Clone)]
48pub struct TenantQuota {
49    /// Maximum concurrent kernels this tenant may register.
50    pub max_concurrent_kernels: u32,
51    /// Maximum GPU memory (bytes) the tenant may allocate.
52    pub max_gpu_memory_bytes: u64,
53    /// Maximum messages per second across all of the tenant's kernels.
54    pub max_messages_per_sec: u64,
55    /// Per-engagement billable-time budgets, keyed by
56    /// [`AuditTag::engagement_id`]. Absent entries = no budget configured.
57    pub per_engagement_budget: HashMap<u64, Duration>,
58}
59
60impl TenantQuota {
61    /// Create a quota with no hard limits (useful for tests / trusted tenants).
62    pub fn unlimited() -> Self {
63        Self {
64            max_concurrent_kernels: u32::MAX,
65            max_gpu_memory_bytes: u64::MAX,
66            max_messages_per_sec: u64::MAX,
67            per_engagement_budget: HashMap::new(),
68        }
69    }
70
71    /// Create a modest default quota: 16 kernels, 2 GiB, 100k msgs/sec.
72    pub fn standard() -> Self {
73        Self {
74            max_concurrent_kernels: 16,
75            max_gpu_memory_bytes: 2 * 1024 * 1024 * 1024,
76            max_messages_per_sec: 100_000,
77            per_engagement_budget: HashMap::new(),
78        }
79    }
80
81    /// Set a budget for a specific engagement.
82    pub fn with_engagement_budget(mut self, engagement_id: u64, budget: Duration) -> Self {
83        self.per_engagement_budget.insert(engagement_id, budget);
84        self
85    }
86}
87
88impl Default for TenantQuota {
89    fn default() -> Self {
90        Self::standard()
91    }
92}
93
94// ============================================================================
95// TenantInfo (internal registry entry)
96// ============================================================================
97
98/// Per-tenant bookkeeping stored inside [`TenantRegistry`].
99#[derive(Debug)]
100pub struct TenantInfo {
101    /// Tenant identity.
102    pub tenant_id: TenantId,
103    /// Quota (immutable after registration; re-register to update).
104    pub quota: TenantQuota,
105    /// Number of kernels currently registered for this tenant.
106    pub current_kernels: AtomicU64,
107    /// Number of messages sent in the current rate-limit window.
108    pub messages_this_window: AtomicU64,
109    /// Start of the current rate-limit window (seconds-granularity).
110    pub window_start_secs: AtomicU64,
111    /// Per-engagement billable nanoseconds (keyed by `engagement_id`).
112    pub engagement_cost_ns: RwLock<HashMap<u64, u64>>,
113    /// When the tenant was registered.
114    pub registered_at: Instant,
115}
116
117impl TenantInfo {
118    fn new(tenant_id: TenantId, quota: TenantQuota) -> Self {
119        let now_secs = secs_since_epoch();
120        Self {
121            tenant_id,
122            quota,
123            current_kernels: AtomicU64::new(0),
124            messages_this_window: AtomicU64::new(0),
125            window_start_secs: AtomicU64::new(now_secs),
126            engagement_cost_ns: RwLock::new(HashMap::new()),
127            registered_at: Instant::now(),
128        }
129    }
130}
131
132fn secs_since_epoch() -> u64 {
133    std::time::SystemTime::now()
134        .duration_since(std::time::UNIX_EPOCH)
135        .map(|d| d.as_secs())
136        .unwrap_or(0)
137}
138
139// ============================================================================
140// TenantRegistry
141// ============================================================================
142
143/// Registry of active tenants with per-tenant quotas and per-engagement cost
144/// accounting.
145pub struct TenantRegistry {
146    tenants: RwLock<HashMap<TenantId, Arc<TenantInfo>>>,
147    audit_sink: Option<Arc<dyn AuditSink>>,
148}
149
150impl TenantRegistry {
151    /// Create an empty registry with no audit sink.
152    pub fn new() -> Self {
153        Self {
154            tenants: RwLock::new(HashMap::new()),
155            audit_sink: None,
156        }
157    }
158
159    /// Create a registry that forwards audit events (cross-tenant attempts,
160    /// quota violations) to the given sink.
161    pub fn with_audit_sink(sink: Arc<dyn AuditSink>) -> Self {
162        Self {
163            tenants: RwLock::new(HashMap::new()),
164            audit_sink: Some(sink),
165        }
166    }
167
168    /// Replace the audit sink on an existing registry.
169    pub fn set_audit_sink(&mut self, sink: Arc<dyn AuditSink>) {
170        self.audit_sink = Some(sink);
171    }
172
173    /// Register a new tenant with the given quota.
174    pub fn register(&self, tenant_id: TenantId, quota: TenantQuota) -> Result<()> {
175        let mut tenants = self.tenants.write();
176        if tenants.contains_key(&tenant_id) {
177            return Err(RingKernelError::InvalidConfig(format!(
178                "tenant {} already registered",
179                tenant_id
180            )));
181        }
182        tenants.insert(tenant_id, Arc::new(TenantInfo::new(tenant_id, quota)));
183        Ok(())
184    }
185
186    /// Deregister a tenant, dropping all quota and cost state.
187    pub fn deregister(&self, tenant_id: TenantId) -> bool {
188        self.tenants.write().remove(&tenant_id).is_some()
189    }
190
191    /// Returns `true` if the tenant is registered.
192    pub fn is_registered(&self, tenant_id: TenantId) -> bool {
193        self.tenants.read().contains_key(&tenant_id)
194    }
195
196    /// Get a snapshot of tenant info (fast `Arc` clone, no lock held).
197    pub fn get(&self, tenant_id: TenantId) -> Option<Arc<TenantInfo>> {
198        self.tenants.read().get(&tenant_id).cloned()
199    }
200
201    /// Number of registered tenants.
202    pub fn tenant_count(&self) -> usize {
203        self.tenants.read().len()
204    }
205
206    /// All registered tenant IDs.
207    pub fn tenant_ids(&self) -> Vec<TenantId> {
208        self.tenants.read().keys().copied().collect()
209    }
210
211    /// Check whether a send from `tenant_id` with `audit_tag` is within quota.
212    pub fn check_quota(&self, tenant_id: TenantId, audit_tag: AuditTag) -> Result<()> {
213        if tenant_id == UNSPECIFIED_TENANT {
214            return Ok(());
215        }
216
217        let info = self
218            .tenants
219            .read()
220            .get(&tenant_id)
221            .cloned()
222            .ok_or_else(|| {
223                RingKernelError::InvalidConfig(format!("tenant {} not registered", tenant_id))
224            })?;
225
226        // Rate limit: reset the window every second.
227        let now = secs_since_epoch();
228        let window_start = info.window_start_secs.load(Ordering::Relaxed);
229        if now != window_start {
230            let _ = info.window_start_secs.compare_exchange(
231                window_start,
232                now,
233                Ordering::AcqRel,
234                Ordering::Relaxed,
235            );
236            info.messages_this_window.store(0, Ordering::Relaxed);
237        }
238        let sent = info.messages_this_window.load(Ordering::Relaxed);
239        if sent >= info.quota.max_messages_per_sec {
240            let err = RingKernelError::LoadSheddingRejected {
241                level: format!(
242                    "tenant {} message-rate: {}/{}",
243                    tenant_id, sent, info.quota.max_messages_per_sec
244                ),
245            };
246            self.audit_quota_exceeded(tenant_id, audit_tag, &err);
247            return Err(err);
248        }
249
250        if let Some(budget) = info
251            .quota
252            .per_engagement_budget
253            .get(&audit_tag.engagement_id)
254        {
255            let used_ns = info
256                .engagement_cost_ns
257                .read()
258                .get(&audit_tag.engagement_id)
259                .copied()
260                .unwrap_or(0);
261            if Duration::from_nanos(used_ns) >= *budget {
262                let err = RingKernelError::LoadSheddingRejected {
263                    level: format!(
264                        "tenant {} engagement {} budget exceeded: {}ns >= {}ns",
265                        tenant_id,
266                        audit_tag.engagement_id,
267                        used_ns,
268                        budget.as_nanos()
269                    ),
270                };
271                self.audit_quota_exceeded(tenant_id, audit_tag, &err);
272                return Err(err);
273            }
274        }
275
276        Ok(())
277    }
278
279    /// Record a message send for rate-limit accounting.
280    pub fn record_message(&self, tenant_id: TenantId) {
281        if tenant_id == UNSPECIFIED_TENANT {
282            return;
283        }
284        if let Some(info) = self.tenants.read().get(&tenant_id) {
285            info.messages_this_window.fetch_add(1, Ordering::Relaxed);
286        }
287    }
288
289    /// Record billable GPU-seconds against a `(tenant_id, audit_tag)` pair.
290    pub fn track_usage(&self, tenant_id: TenantId, audit_tag: AuditTag, gpu_seconds: Duration) {
291        if tenant_id == UNSPECIFIED_TENANT {
292            return;
293        }
294        if let Some(info) = self.tenants.read().get(&tenant_id) {
295            let mut map = info.engagement_cost_ns.write();
296            let entry = map.entry(audit_tag.engagement_id).or_insert(0);
297            *entry = entry.saturating_add(gpu_seconds.as_nanos() as u64);
298        }
299    }
300
301    /// Get total billable duration for a specific engagement across all
302    /// registered tenants that have reported cost for it.
303    pub fn get_engagement_cost(&self, audit_tag: AuditTag) -> Duration {
304        let mut total_ns: u128 = 0;
305        for info in self.tenants.read().values() {
306            if let Some(ns) = info
307                .engagement_cost_ns
308                .read()
309                .get(&audit_tag.engagement_id)
310                .copied()
311            {
312                total_ns = total_ns.saturating_add(ns as u128);
313            }
314        }
315        Duration::from_nanos(total_ns.min(u64::MAX as u128) as u64)
316    }
317
318    /// Get billable duration for a specific `(tenant_id, engagement_id)` pair.
319    pub fn get_engagement_cost_for(&self, tenant_id: TenantId, audit_tag: AuditTag) -> Duration {
320        self.tenants
321            .read()
322            .get(&tenant_id)
323            .and_then(|info| {
324                info.engagement_cost_ns
325                    .read()
326                    .get(&audit_tag.engagement_id)
327                    .copied()
328            })
329            .map(Duration::from_nanos)
330            .unwrap_or(Duration::ZERO)
331    }
332
333    /// Returns `true` if a send would be cross-tenant.
334    pub fn is_cross_tenant(&self, from: TenantId, to: TenantId) -> bool {
335        from != to
336    }
337
338    /// Emit an audit event for a cross-tenant K2K send attempt (rejected).
339    pub fn audit_cross_tenant(
340        &self,
341        from_tenant: TenantId,
342        to_tenant: TenantId,
343        source_kernel: &str,
344        destination_kernel: &str,
345        audit_tag: AuditTag,
346    ) {
347        let Some(sink) = self.audit_sink.as_ref() else {
348            return;
349        };
350        let event = AuditEvent::new(
351            AuditLevel::Security,
352            AuditEventType::SecurityViolation,
353            "k2k_broker",
354            format!(
355                "cross-tenant K2K send rejected: from tenant {} to tenant {}",
356                from_tenant, to_tenant
357            ),
358        )
359        .with_target(destination_kernel.to_string())
360        .with_metadata("from_tenant", from_tenant.to_string())
361        .with_metadata("to_tenant", to_tenant.to_string())
362        .with_metadata("source_kernel", source_kernel.to_string())
363        .with_metadata("destination_kernel", destination_kernel.to_string())
364        .with_metadata("org_id", audit_tag.org_id.to_string())
365        .with_metadata("engagement_id", audit_tag.engagement_id.to_string());
366        let _ = sink.write(&event);
367    }
368
369    fn audit_quota_exceeded(
370        &self,
371        tenant_id: TenantId,
372        audit_tag: AuditTag,
373        err: &RingKernelError,
374    ) {
375        let Some(sink) = self.audit_sink.as_ref() else {
376            return;
377        };
378        let event = AuditEvent::new(
379            AuditLevel::Warning,
380            AuditEventType::ResourceLimitExceeded,
381            "k2k_tenant_registry",
382            format!("tenant {} quota exceeded: {}", tenant_id, err),
383        )
384        .with_metadata("tenant", tenant_id.to_string())
385        .with_metadata("org_id", audit_tag.org_id.to_string())
386        .with_metadata("engagement_id", audit_tag.engagement_id.to_string());
387        let _ = sink.write(&event);
388    }
389
390    /// Increment the registered-kernel counter for this tenant, enforcing
391    /// [`TenantQuota::max_concurrent_kernels`].
392    pub fn acquire_kernel_slot(&self, tenant_id: TenantId) -> Result<()> {
393        if tenant_id == UNSPECIFIED_TENANT {
394            return Ok(());
395        }
396        let info = self
397            .tenants
398            .read()
399            .get(&tenant_id)
400            .cloned()
401            .ok_or_else(|| {
402                RingKernelError::InvalidConfig(format!("tenant {} not registered", tenant_id))
403            })?;
404        let max = info.quota.max_concurrent_kernels as u64;
405        loop {
406            let current = info.current_kernels.load(Ordering::Acquire);
407            if current >= max {
408                return Err(RingKernelError::LoadSheddingRejected {
409                    level: format!("tenant {} kernel-slots: {}/{}", tenant_id, current, max),
410                });
411            }
412            if info
413                .current_kernels
414                .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
415                .is_ok()
416            {
417                return Ok(());
418            }
419        }
420    }
421
422    /// Release a kernel slot (call on kernel termination / deregister).
423    pub fn release_kernel_slot(&self, tenant_id: TenantId) {
424        if tenant_id == UNSPECIFIED_TENANT {
425            return;
426        }
427        if let Some(info) = self.tenants.read().get(&tenant_id) {
428            loop {
429                let current = info.current_kernels.load(Ordering::Acquire);
430                if current == 0 {
431                    return;
432                }
433                if info
434                    .current_kernels
435                    .compare_exchange(current, current - 1, Ordering::AcqRel, Ordering::Acquire)
436                    .is_ok()
437                {
438                    return;
439                }
440            }
441        }
442    }
443}
444
445impl Default for TenantRegistry {
446    fn default() -> Self {
447        Self::new()
448    }
449}
450
451impl fmt::Debug for TenantRegistry {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        f.debug_struct("TenantRegistry")
454            .field("tenants", &self.tenants.read().len())
455            .field("has_audit_sink", &self.audit_sink.is_some())
456            .finish()
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use crate::audit::MemorySink;
464
465    #[test]
466    fn test_register_and_query() {
467        let reg = TenantRegistry::new();
468        assert_eq!(reg.tenant_count(), 0);
469        reg.register(1, TenantQuota::default()).unwrap();
470        assert!(reg.is_registered(1));
471        assert_eq!(reg.tenant_count(), 1);
472    }
473
474    #[test]
475    fn test_duplicate_registration_fails() {
476        let reg = TenantRegistry::new();
477        reg.register(1, TenantQuota::default()).unwrap();
478        let err = reg.register(1, TenantQuota::default()).unwrap_err();
479        assert!(matches!(err, RingKernelError::InvalidConfig(_)));
480    }
481
482    #[test]
483    fn test_deregister() {
484        let reg = TenantRegistry::new();
485        reg.register(1, TenantQuota::default()).unwrap();
486        assert!(reg.deregister(1));
487        assert!(!reg.is_registered(1));
488        assert!(!reg.deregister(1));
489    }
490
491    #[test]
492    fn test_unspecified_tenant_fast_path() {
493        let reg = TenantRegistry::new();
494        assert!(reg
495            .check_quota(UNSPECIFIED_TENANT, AuditTag::default())
496            .is_ok());
497        reg.record_message(UNSPECIFIED_TENANT);
498        reg.track_usage(
499            UNSPECIFIED_TENANT,
500            AuditTag::default(),
501            Duration::from_millis(100),
502        );
503    }
504
505    #[test]
506    fn test_track_usage_and_cost() {
507        let reg = TenantRegistry::new();
508        reg.register(42, TenantQuota::unlimited()).unwrap();
509
510        let tag_a = AuditTag::new(42, 1);
511        let tag_b = AuditTag::new(42, 2);
512
513        reg.track_usage(42, tag_a, Duration::from_millis(250));
514        reg.track_usage(42, tag_a, Duration::from_millis(750));
515        reg.track_usage(42, tag_b, Duration::from_millis(100));
516
517        assert_eq!(
518            reg.get_engagement_cost_for(42, tag_a),
519            Duration::from_millis(1000)
520        );
521        assert_eq!(
522            reg.get_engagement_cost_for(42, tag_b),
523            Duration::from_millis(100)
524        );
525        assert_eq!(reg.get_engagement_cost(tag_a), Duration::from_millis(1000));
526    }
527
528    #[test]
529    fn test_cross_tenant_engagement_aggregation() {
530        let reg = TenantRegistry::new();
531        reg.register(1, TenantQuota::unlimited()).unwrap();
532        reg.register(2, TenantQuota::unlimited()).unwrap();
533
534        let tag = AuditTag::new(99, 7);
535        reg.track_usage(1, tag, Duration::from_millis(500));
536        reg.track_usage(2, tag, Duration::from_millis(500));
537
538        assert_eq!(reg.get_engagement_cost(tag), Duration::from_millis(1000));
539    }
540
541    #[test]
542    fn test_check_quota_message_rate() {
543        let mut quota = TenantQuota::default();
544        quota.max_messages_per_sec = 5;
545        let reg = TenantRegistry::new();
546        reg.register(1, quota).unwrap();
547
548        for _ in 0..5 {
549            assert!(reg.check_quota(1, AuditTag::default()).is_ok());
550            reg.record_message(1);
551        }
552        let err = reg.check_quota(1, AuditTag::default()).unwrap_err();
553        assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
554    }
555
556    #[test]
557    fn test_check_quota_engagement_budget() {
558        let quota = TenantQuota::default().with_engagement_budget(7, Duration::from_millis(100));
559        let reg = TenantRegistry::new();
560        reg.register(1, quota).unwrap();
561
562        let tag = AuditTag::new(0, 7);
563        assert!(reg.check_quota(1, tag).is_ok());
564        reg.track_usage(1, tag, Duration::from_millis(150));
565        let err = reg.check_quota(1, tag).unwrap_err();
566        assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
567    }
568
569    #[test]
570    fn test_audit_cross_tenant_logs_event() {
571        let sink = Arc::new(MemorySink::new(100));
572        let reg = TenantRegistry::with_audit_sink(sink.clone());
573        reg.register(1, TenantQuota::default()).unwrap();
574        reg.register(2, TenantQuota::default()).unwrap();
575
576        reg.audit_cross_tenant(1, 2, "src", "dst", AuditTag::new(100, 200));
577        let events = sink.events();
578        assert_eq!(events.len(), 1);
579        assert_eq!(events[0].event_type, AuditEventType::SecurityViolation);
580        assert!(events[0].description.contains("cross-tenant"));
581    }
582
583    #[test]
584    fn test_kernel_slot_acquire_release() {
585        let mut quota = TenantQuota::default();
586        quota.max_concurrent_kernels = 2;
587        let reg = TenantRegistry::new();
588        reg.register(1, quota).unwrap();
589
590        assert!(reg.acquire_kernel_slot(1).is_ok());
591        assert!(reg.acquire_kernel_slot(1).is_ok());
592        let err = reg.acquire_kernel_slot(1).unwrap_err();
593        assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
594
595        reg.release_kernel_slot(1);
596        assert!(reg.acquire_kernel_slot(1).is_ok());
597    }
598
599    #[test]
600    fn test_kernel_slot_unregistered_tenant_fails() {
601        let reg = TenantRegistry::new();
602        let err = reg.acquire_kernel_slot(99).unwrap_err();
603        assert!(matches!(err, RingKernelError::InvalidConfig(_)));
604    }
605
606    #[test]
607    fn test_is_cross_tenant() {
608        let reg = TenantRegistry::new();
609        assert!(!reg.is_cross_tenant(1, 1));
610        assert!(reg.is_cross_tenant(1, 2));
611        assert!(reg.is_cross_tenant(UNSPECIFIED_TENANT, 1));
612    }
613}