1use std::collections::HashMap;
18use std::fmt;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use parking_lot::RwLock;
24
25use crate::audit::{AuditEvent, AuditEventType, AuditLevel, AuditSink};
26use crate::error::{Result, RingKernelError};
27use crate::k2k::audit_tag::AuditTag;
28
29pub type TenantId = u64;
35
36pub const UNSPECIFIED_TENANT: TenantId = 0;
38
39#[derive(Debug, Clone)]
48pub struct TenantQuota {
49 pub max_concurrent_kernels: u32,
51 pub max_gpu_memory_bytes: u64,
53 pub max_messages_per_sec: u64,
55 pub per_engagement_budget: HashMap<u64, Duration>,
58}
59
60impl TenantQuota {
61 pub fn unlimited() -> Self {
63 Self {
64 max_concurrent_kernels: u32::MAX,
65 max_gpu_memory_bytes: u64::MAX,
66 max_messages_per_sec: u64::MAX,
67 per_engagement_budget: HashMap::new(),
68 }
69 }
70
71 pub fn standard() -> Self {
73 Self {
74 max_concurrent_kernels: 16,
75 max_gpu_memory_bytes: 2 * 1024 * 1024 * 1024,
76 max_messages_per_sec: 100_000,
77 per_engagement_budget: HashMap::new(),
78 }
79 }
80
81 pub fn with_engagement_budget(mut self, engagement_id: u64, budget: Duration) -> Self {
83 self.per_engagement_budget.insert(engagement_id, budget);
84 self
85 }
86}
87
88impl Default for TenantQuota {
89 fn default() -> Self {
90 Self::standard()
91 }
92}
93
94#[derive(Debug)]
100pub struct TenantInfo {
101 pub tenant_id: TenantId,
103 pub quota: TenantQuota,
105 pub current_kernels: AtomicU64,
107 pub messages_this_window: AtomicU64,
109 pub window_start_secs: AtomicU64,
111 pub engagement_cost_ns: RwLock<HashMap<u64, u64>>,
113 pub registered_at: Instant,
115}
116
117impl TenantInfo {
118 fn new(tenant_id: TenantId, quota: TenantQuota) -> Self {
119 let now_secs = secs_since_epoch();
120 Self {
121 tenant_id,
122 quota,
123 current_kernels: AtomicU64::new(0),
124 messages_this_window: AtomicU64::new(0),
125 window_start_secs: AtomicU64::new(now_secs),
126 engagement_cost_ns: RwLock::new(HashMap::new()),
127 registered_at: Instant::now(),
128 }
129 }
130}
131
132fn secs_since_epoch() -> u64 {
133 std::time::SystemTime::now()
134 .duration_since(std::time::UNIX_EPOCH)
135 .map(|d| d.as_secs())
136 .unwrap_or(0)
137}
138
139pub struct TenantRegistry {
146 tenants: RwLock<HashMap<TenantId, Arc<TenantInfo>>>,
147 audit_sink: Option<Arc<dyn AuditSink>>,
148}
149
150impl TenantRegistry {
151 pub fn new() -> Self {
153 Self {
154 tenants: RwLock::new(HashMap::new()),
155 audit_sink: None,
156 }
157 }
158
159 pub fn with_audit_sink(sink: Arc<dyn AuditSink>) -> Self {
162 Self {
163 tenants: RwLock::new(HashMap::new()),
164 audit_sink: Some(sink),
165 }
166 }
167
168 pub fn set_audit_sink(&mut self, sink: Arc<dyn AuditSink>) {
170 self.audit_sink = Some(sink);
171 }
172
173 pub fn register(&self, tenant_id: TenantId, quota: TenantQuota) -> Result<()> {
175 let mut tenants = self.tenants.write();
176 if tenants.contains_key(&tenant_id) {
177 return Err(RingKernelError::InvalidConfig(format!(
178 "tenant {} already registered",
179 tenant_id
180 )));
181 }
182 tenants.insert(tenant_id, Arc::new(TenantInfo::new(tenant_id, quota)));
183 Ok(())
184 }
185
186 pub fn deregister(&self, tenant_id: TenantId) -> bool {
188 self.tenants.write().remove(&tenant_id).is_some()
189 }
190
191 pub fn is_registered(&self, tenant_id: TenantId) -> bool {
193 self.tenants.read().contains_key(&tenant_id)
194 }
195
196 pub fn get(&self, tenant_id: TenantId) -> Option<Arc<TenantInfo>> {
198 self.tenants.read().get(&tenant_id).cloned()
199 }
200
201 pub fn tenant_count(&self) -> usize {
203 self.tenants.read().len()
204 }
205
206 pub fn tenant_ids(&self) -> Vec<TenantId> {
208 self.tenants.read().keys().copied().collect()
209 }
210
211 pub fn check_quota(&self, tenant_id: TenantId, audit_tag: AuditTag) -> Result<()> {
213 if tenant_id == UNSPECIFIED_TENANT {
214 return Ok(());
215 }
216
217 let info = self
218 .tenants
219 .read()
220 .get(&tenant_id)
221 .cloned()
222 .ok_or_else(|| {
223 RingKernelError::InvalidConfig(format!("tenant {} not registered", tenant_id))
224 })?;
225
226 let now = secs_since_epoch();
228 let window_start = info.window_start_secs.load(Ordering::Relaxed);
229 if now != window_start {
230 let _ = info.window_start_secs.compare_exchange(
231 window_start,
232 now,
233 Ordering::AcqRel,
234 Ordering::Relaxed,
235 );
236 info.messages_this_window.store(0, Ordering::Relaxed);
237 }
238 let sent = info.messages_this_window.load(Ordering::Relaxed);
239 if sent >= info.quota.max_messages_per_sec {
240 let err = RingKernelError::LoadSheddingRejected {
241 level: format!(
242 "tenant {} message-rate: {}/{}",
243 tenant_id, sent, info.quota.max_messages_per_sec
244 ),
245 };
246 self.audit_quota_exceeded(tenant_id, audit_tag, &err);
247 return Err(err);
248 }
249
250 if let Some(budget) = info
251 .quota
252 .per_engagement_budget
253 .get(&audit_tag.engagement_id)
254 {
255 let used_ns = info
256 .engagement_cost_ns
257 .read()
258 .get(&audit_tag.engagement_id)
259 .copied()
260 .unwrap_or(0);
261 if Duration::from_nanos(used_ns) >= *budget {
262 let err = RingKernelError::LoadSheddingRejected {
263 level: format!(
264 "tenant {} engagement {} budget exceeded: {}ns >= {}ns",
265 tenant_id,
266 audit_tag.engagement_id,
267 used_ns,
268 budget.as_nanos()
269 ),
270 };
271 self.audit_quota_exceeded(tenant_id, audit_tag, &err);
272 return Err(err);
273 }
274 }
275
276 Ok(())
277 }
278
279 pub fn record_message(&self, tenant_id: TenantId) {
281 if tenant_id == UNSPECIFIED_TENANT {
282 return;
283 }
284 if let Some(info) = self.tenants.read().get(&tenant_id) {
285 info.messages_this_window.fetch_add(1, Ordering::Relaxed);
286 }
287 }
288
289 pub fn track_usage(&self, tenant_id: TenantId, audit_tag: AuditTag, gpu_seconds: Duration) {
291 if tenant_id == UNSPECIFIED_TENANT {
292 return;
293 }
294 if let Some(info) = self.tenants.read().get(&tenant_id) {
295 let mut map = info.engagement_cost_ns.write();
296 let entry = map.entry(audit_tag.engagement_id).or_insert(0);
297 *entry = entry.saturating_add(gpu_seconds.as_nanos() as u64);
298 }
299 }
300
301 pub fn get_engagement_cost(&self, audit_tag: AuditTag) -> Duration {
304 let mut total_ns: u128 = 0;
305 for info in self.tenants.read().values() {
306 if let Some(ns) = info
307 .engagement_cost_ns
308 .read()
309 .get(&audit_tag.engagement_id)
310 .copied()
311 {
312 total_ns = total_ns.saturating_add(ns as u128);
313 }
314 }
315 Duration::from_nanos(total_ns.min(u64::MAX as u128) as u64)
316 }
317
318 pub fn get_engagement_cost_for(&self, tenant_id: TenantId, audit_tag: AuditTag) -> Duration {
320 self.tenants
321 .read()
322 .get(&tenant_id)
323 .and_then(|info| {
324 info.engagement_cost_ns
325 .read()
326 .get(&audit_tag.engagement_id)
327 .copied()
328 })
329 .map(Duration::from_nanos)
330 .unwrap_or(Duration::ZERO)
331 }
332
333 pub fn is_cross_tenant(&self, from: TenantId, to: TenantId) -> bool {
335 from != to
336 }
337
338 pub fn audit_cross_tenant(
340 &self,
341 from_tenant: TenantId,
342 to_tenant: TenantId,
343 source_kernel: &str,
344 destination_kernel: &str,
345 audit_tag: AuditTag,
346 ) {
347 let Some(sink) = self.audit_sink.as_ref() else {
348 return;
349 };
350 let event = AuditEvent::new(
351 AuditLevel::Security,
352 AuditEventType::SecurityViolation,
353 "k2k_broker",
354 format!(
355 "cross-tenant K2K send rejected: from tenant {} to tenant {}",
356 from_tenant, to_tenant
357 ),
358 )
359 .with_target(destination_kernel.to_string())
360 .with_metadata("from_tenant", from_tenant.to_string())
361 .with_metadata("to_tenant", to_tenant.to_string())
362 .with_metadata("source_kernel", source_kernel.to_string())
363 .with_metadata("destination_kernel", destination_kernel.to_string())
364 .with_metadata("org_id", audit_tag.org_id.to_string())
365 .with_metadata("engagement_id", audit_tag.engagement_id.to_string());
366 let _ = sink.write(&event);
367 }
368
369 fn audit_quota_exceeded(
370 &self,
371 tenant_id: TenantId,
372 audit_tag: AuditTag,
373 err: &RingKernelError,
374 ) {
375 let Some(sink) = self.audit_sink.as_ref() else {
376 return;
377 };
378 let event = AuditEvent::new(
379 AuditLevel::Warning,
380 AuditEventType::ResourceLimitExceeded,
381 "k2k_tenant_registry",
382 format!("tenant {} quota exceeded: {}", tenant_id, err),
383 )
384 .with_metadata("tenant", tenant_id.to_string())
385 .with_metadata("org_id", audit_tag.org_id.to_string())
386 .with_metadata("engagement_id", audit_tag.engagement_id.to_string());
387 let _ = sink.write(&event);
388 }
389
390 pub fn acquire_kernel_slot(&self, tenant_id: TenantId) -> Result<()> {
393 if tenant_id == UNSPECIFIED_TENANT {
394 return Ok(());
395 }
396 let info = self
397 .tenants
398 .read()
399 .get(&tenant_id)
400 .cloned()
401 .ok_or_else(|| {
402 RingKernelError::InvalidConfig(format!("tenant {} not registered", tenant_id))
403 })?;
404 let max = info.quota.max_concurrent_kernels as u64;
405 loop {
406 let current = info.current_kernels.load(Ordering::Acquire);
407 if current >= max {
408 return Err(RingKernelError::LoadSheddingRejected {
409 level: format!("tenant {} kernel-slots: {}/{}", tenant_id, current, max),
410 });
411 }
412 if info
413 .current_kernels
414 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
415 .is_ok()
416 {
417 return Ok(());
418 }
419 }
420 }
421
422 pub fn release_kernel_slot(&self, tenant_id: TenantId) {
424 if tenant_id == UNSPECIFIED_TENANT {
425 return;
426 }
427 if let Some(info) = self.tenants.read().get(&tenant_id) {
428 loop {
429 let current = info.current_kernels.load(Ordering::Acquire);
430 if current == 0 {
431 return;
432 }
433 if info
434 .current_kernels
435 .compare_exchange(current, current - 1, Ordering::AcqRel, Ordering::Acquire)
436 .is_ok()
437 {
438 return;
439 }
440 }
441 }
442 }
443}
444
445impl Default for TenantRegistry {
446 fn default() -> Self {
447 Self::new()
448 }
449}
450
451impl fmt::Debug for TenantRegistry {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 f.debug_struct("TenantRegistry")
454 .field("tenants", &self.tenants.read().len())
455 .field("has_audit_sink", &self.audit_sink.is_some())
456 .finish()
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463 use crate::audit::MemorySink;
464
465 #[test]
466 fn test_register_and_query() {
467 let reg = TenantRegistry::new();
468 assert_eq!(reg.tenant_count(), 0);
469 reg.register(1, TenantQuota::default()).unwrap();
470 assert!(reg.is_registered(1));
471 assert_eq!(reg.tenant_count(), 1);
472 }
473
474 #[test]
475 fn test_duplicate_registration_fails() {
476 let reg = TenantRegistry::new();
477 reg.register(1, TenantQuota::default()).unwrap();
478 let err = reg.register(1, TenantQuota::default()).unwrap_err();
479 assert!(matches!(err, RingKernelError::InvalidConfig(_)));
480 }
481
482 #[test]
483 fn test_deregister() {
484 let reg = TenantRegistry::new();
485 reg.register(1, TenantQuota::default()).unwrap();
486 assert!(reg.deregister(1));
487 assert!(!reg.is_registered(1));
488 assert!(!reg.deregister(1));
489 }
490
491 #[test]
492 fn test_unspecified_tenant_fast_path() {
493 let reg = TenantRegistry::new();
494 assert!(reg
495 .check_quota(UNSPECIFIED_TENANT, AuditTag::default())
496 .is_ok());
497 reg.record_message(UNSPECIFIED_TENANT);
498 reg.track_usage(
499 UNSPECIFIED_TENANT,
500 AuditTag::default(),
501 Duration::from_millis(100),
502 );
503 }
504
505 #[test]
506 fn test_track_usage_and_cost() {
507 let reg = TenantRegistry::new();
508 reg.register(42, TenantQuota::unlimited()).unwrap();
509
510 let tag_a = AuditTag::new(42, 1);
511 let tag_b = AuditTag::new(42, 2);
512
513 reg.track_usage(42, tag_a, Duration::from_millis(250));
514 reg.track_usage(42, tag_a, Duration::from_millis(750));
515 reg.track_usage(42, tag_b, Duration::from_millis(100));
516
517 assert_eq!(
518 reg.get_engagement_cost_for(42, tag_a),
519 Duration::from_millis(1000)
520 );
521 assert_eq!(
522 reg.get_engagement_cost_for(42, tag_b),
523 Duration::from_millis(100)
524 );
525 assert_eq!(reg.get_engagement_cost(tag_a), Duration::from_millis(1000));
526 }
527
528 #[test]
529 fn test_cross_tenant_engagement_aggregation() {
530 let reg = TenantRegistry::new();
531 reg.register(1, TenantQuota::unlimited()).unwrap();
532 reg.register(2, TenantQuota::unlimited()).unwrap();
533
534 let tag = AuditTag::new(99, 7);
535 reg.track_usage(1, tag, Duration::from_millis(500));
536 reg.track_usage(2, tag, Duration::from_millis(500));
537
538 assert_eq!(reg.get_engagement_cost(tag), Duration::from_millis(1000));
539 }
540
541 #[test]
542 fn test_check_quota_message_rate() {
543 let mut quota = TenantQuota::default();
544 quota.max_messages_per_sec = 5;
545 let reg = TenantRegistry::new();
546 reg.register(1, quota).unwrap();
547
548 for _ in 0..5 {
549 assert!(reg.check_quota(1, AuditTag::default()).is_ok());
550 reg.record_message(1);
551 }
552 let err = reg.check_quota(1, AuditTag::default()).unwrap_err();
553 assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
554 }
555
556 #[test]
557 fn test_check_quota_engagement_budget() {
558 let quota = TenantQuota::default().with_engagement_budget(7, Duration::from_millis(100));
559 let reg = TenantRegistry::new();
560 reg.register(1, quota).unwrap();
561
562 let tag = AuditTag::new(0, 7);
563 assert!(reg.check_quota(1, tag).is_ok());
564 reg.track_usage(1, tag, Duration::from_millis(150));
565 let err = reg.check_quota(1, tag).unwrap_err();
566 assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
567 }
568
569 #[test]
570 fn test_audit_cross_tenant_logs_event() {
571 let sink = Arc::new(MemorySink::new(100));
572 let reg = TenantRegistry::with_audit_sink(sink.clone());
573 reg.register(1, TenantQuota::default()).unwrap();
574 reg.register(2, TenantQuota::default()).unwrap();
575
576 reg.audit_cross_tenant(1, 2, "src", "dst", AuditTag::new(100, 200));
577 let events = sink.events();
578 assert_eq!(events.len(), 1);
579 assert_eq!(events[0].event_type, AuditEventType::SecurityViolation);
580 assert!(events[0].description.contains("cross-tenant"));
581 }
582
583 #[test]
584 fn test_kernel_slot_acquire_release() {
585 let mut quota = TenantQuota::default();
586 quota.max_concurrent_kernels = 2;
587 let reg = TenantRegistry::new();
588 reg.register(1, quota).unwrap();
589
590 assert!(reg.acquire_kernel_slot(1).is_ok());
591 assert!(reg.acquire_kernel_slot(1).is_ok());
592 let err = reg.acquire_kernel_slot(1).unwrap_err();
593 assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
594
595 reg.release_kernel_slot(1);
596 assert!(reg.acquire_kernel_slot(1).is_ok());
597 }
598
599 #[test]
600 fn test_kernel_slot_unregistered_tenant_fails() {
601 let reg = TenantRegistry::new();
602 let err = reg.acquire_kernel_slot(99).unwrap_err();
603 assert!(matches!(err, RingKernelError::InvalidConfig(_)));
604 }
605
606 #[test]
607 fn test_is_cross_tenant() {
608 let reg = TenantRegistry::new();
609 assert!(!reg.is_cross_tenant(1, 1));
610 assert!(reg.is_cross_tenant(1, 2));
611 assert!(reg.is_cross_tenant(UNSPECIFIED_TENANT, 1));
612 }
613}