1use parking_lot::RwLock;
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use tokio::sync::mpsc;
34
35use crate::error::{Result, RingKernelError};
36use crate::hlc::HlcTimestamp;
37use crate::message::{MessageEnvelope, MessageId};
38use crate::runtime::KernelId;
39
40pub mod audit_tag;
41pub mod tenant;
42
43pub use audit_tag::AuditTag;
44pub use tenant::{TenantId, TenantInfo, TenantQuota, TenantRegistry, UNSPECIFIED_TENANT};
45
46#[derive(Debug, Clone)]
48pub struct K2KConfig {
49 pub max_pending_messages: usize,
51 pub delivery_timeout_ms: u64,
53 pub enable_tracing: bool,
55 pub max_hops: u8,
57}
58
59impl Default for K2KConfig {
60 fn default() -> Self {
61 Self {
62 max_pending_messages: 1024,
63 delivery_timeout_ms: 5000,
64 enable_tracing: false,
65 max_hops: 8,
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct K2KMessage {
73 pub id: MessageId,
75 pub source: KernelId,
77 pub destination: KernelId,
79 pub envelope: MessageEnvelope,
81 pub hops: u8,
83 pub sent_at: HlcTimestamp,
85 pub priority: u8,
87}
88
89impl K2KMessage {
90 pub fn new(
92 source: KernelId,
93 destination: KernelId,
94 envelope: MessageEnvelope,
95 timestamp: HlcTimestamp,
96 ) -> Self {
97 Self {
98 id: MessageId::generate(),
99 source,
100 destination,
101 envelope,
102 hops: 0,
103 sent_at: timestamp,
104 priority: 0,
105 }
106 }
107
108 pub fn with_priority(mut self, priority: u8) -> Self {
110 self.priority = priority;
111 self
112 }
113
114 pub fn increment_hops(&mut self) -> Result<()> {
116 self.hops += 1;
117 if self.hops > 16 {
118 return Err(RingKernelError::K2KError(
119 "Maximum hop count exceeded".to_string(),
120 ));
121 }
122 Ok(())
123 }
124}
125
126#[derive(Debug, Clone)]
128pub struct DeliveryReceipt {
129 pub message_id: MessageId,
131 pub source: KernelId,
133 pub destination: KernelId,
135 pub status: DeliveryStatus,
137 pub timestamp: HlcTimestamp,
139}
140
141#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum DeliveryStatus {
144 Delivered,
146 Pending,
148 NotFound,
150 QueueFull,
152 Timeout,
154 MaxHopsExceeded,
156 TenantMismatch,
159}
160
161pub struct K2KEndpoint {
163 pub(crate) kernel_id: KernelId,
165 receiver: mpsc::Receiver<K2KMessage>,
167 broker: Arc<K2KBroker>,
169}
170
171impl K2KEndpoint {
172 pub async fn receive(&mut self) -> Option<K2KMessage> {
174 self.receiver.recv().await
175 }
176
177 pub fn try_receive(&mut self) -> Option<K2KMessage> {
179 self.receiver.try_recv().ok()
180 }
181
182 pub async fn send(
184 &self,
185 destination: KernelId,
186 envelope: MessageEnvelope,
187 ) -> Result<DeliveryReceipt> {
188 self.broker
189 .send(self.kernel_id.clone(), destination, envelope)
190 .await
191 }
192
193 pub async fn send_priority(
195 &self,
196 destination: KernelId,
197 envelope: MessageEnvelope,
198 priority: u8,
199 ) -> Result<DeliveryReceipt> {
200 self.broker
201 .send_priority(self.kernel_id.clone(), destination, envelope, priority)
202 .await
203 }
204
205 pub fn pending_count(&self) -> usize {
207 0 }
210}
211
212pub struct K2KSubBroker {
223 tenant_id: TenantId,
225 endpoints: RwLock<HashMap<KernelId, mpsc::Sender<K2KMessage>>>,
227 routing_table: RwLock<HashMap<KernelId, KernelId>>,
229 kernel_audit_tags: RwLock<HashMap<KernelId, AuditTag>>,
232 messages_delivered: AtomicU64,
234}
235
236impl K2KSubBroker {
237 fn new(tenant_id: TenantId) -> Self {
238 Self {
239 tenant_id,
240 endpoints: RwLock::new(HashMap::new()),
241 routing_table: RwLock::new(HashMap::new()),
242 kernel_audit_tags: RwLock::new(HashMap::new()),
243 messages_delivered: AtomicU64::new(0),
244 }
245 }
246
247 pub fn tenant_id(&self) -> TenantId {
249 self.tenant_id
250 }
251
252 pub fn endpoint_count(&self) -> usize {
254 self.endpoints.read().len()
255 }
256
257 pub fn messages_delivered(&self) -> u64 {
259 self.messages_delivered.load(Ordering::Relaxed)
260 }
261
262 pub fn knows(&self, kernel_id: &KernelId) -> bool {
265 self.endpoints.read().contains_key(kernel_id)
266 || self.routing_table.read().contains_key(kernel_id)
267 }
268
269 pub fn audit_tag_for(&self, kernel_id: &KernelId) -> AuditTag {
272 self.kernel_audit_tags
273 .read()
274 .get(kernel_id)
275 .copied()
276 .unwrap_or_else(AuditTag::unspecified)
277 }
278}
279
280pub struct K2KBroker {
291 config: K2KConfig,
293 tenants: RwLock<HashMap<TenantId, Arc<K2KSubBroker>>>,
295 kernel_tenant: RwLock<HashMap<KernelId, TenantId>>,
297 registry: Arc<TenantRegistry>,
299 receipts: RwLock<HashMap<MessageId, DeliveryReceipt>>,
301 message_counter: AtomicU64,
303 cross_tenant_rejections: AtomicU64,
305}
306
307impl K2KBroker {
308 pub fn new(config: K2KConfig) -> Arc<Self> {
310 Self::with_registry(config, Arc::new(TenantRegistry::new()))
311 }
312
313 pub fn with_registry(config: K2KConfig, registry: Arc<TenantRegistry>) -> Arc<Self> {
319 let mut tenants = HashMap::new();
320 tenants.insert(
324 UNSPECIFIED_TENANT,
325 Arc::new(K2KSubBroker::new(UNSPECIFIED_TENANT)),
326 );
327 Arc::new(Self {
328 config,
329 tenants: RwLock::new(tenants),
330 kernel_tenant: RwLock::new(HashMap::new()),
331 registry,
332 receipts: RwLock::new(HashMap::new()),
333 message_counter: AtomicU64::new(0),
334 cross_tenant_rejections: AtomicU64::new(0),
335 })
336 }
337
338 pub fn registry(&self) -> &Arc<TenantRegistry> {
340 &self.registry
341 }
342
343 pub fn tenant_count(&self) -> usize {
345 self.tenants.read().len()
346 }
347
348 pub fn sub_broker(&self, tenant_id: TenantId) -> Option<Arc<K2KSubBroker>> {
350 self.tenants.read().get(&tenant_id).cloned()
351 }
352
353 pub fn register(self: &Arc<Self>, kernel_id: KernelId) -> K2KEndpoint {
360 self.register_tenant(UNSPECIFIED_TENANT, AuditTag::unspecified(), kernel_id)
361 }
362
363 pub fn register_tenant(
373 self: &Arc<Self>,
374 tenant_id: TenantId,
375 audit_tag: AuditTag,
376 kernel_id: KernelId,
377 ) -> K2KEndpoint {
378 let (sender, receiver) = mpsc::channel(self.config.max_pending_messages);
379
380 let sub = {
381 let mut tenants = self.tenants.write();
382 tenants
383 .entry(tenant_id)
384 .or_insert_with(|| Arc::new(K2KSubBroker::new(tenant_id)))
385 .clone()
386 };
387
388 let mut kernel_tenant = self.kernel_tenant.write();
389 if let Some(prev_tenant) = kernel_tenant.get(&kernel_id).copied() {
390 if prev_tenant != tenant_id {
391 if let Some(prev_sub) = self.tenants.read().get(&prev_tenant).cloned() {
392 prev_sub.endpoints.write().remove(&kernel_id);
393 prev_sub.kernel_audit_tags.write().remove(&kernel_id);
394 prev_sub.routing_table.write().remove(&kernel_id);
395 }
396 }
397 }
398 kernel_tenant.insert(kernel_id.clone(), tenant_id);
399 drop(kernel_tenant);
400
401 sub.endpoints.write().insert(kernel_id.clone(), sender);
402 sub.kernel_audit_tags
403 .write()
404 .insert(kernel_id.clone(), audit_tag);
405
406 K2KEndpoint {
407 kernel_id,
408 receiver,
409 broker: Arc::clone(self),
410 }
411 }
412
413 pub fn unregister(&self, kernel_id: &KernelId) {
415 let tenant_id = self.kernel_tenant.write().remove(kernel_id);
416 if let Some(tenant_id) = tenant_id {
417 if let Some(sub) = self.tenants.read().get(&tenant_id).cloned() {
418 sub.endpoints.write().remove(kernel_id);
419 sub.kernel_audit_tags.write().remove(kernel_id);
420 sub.routing_table.write().remove(kernel_id);
421 }
422 }
423 }
424
425 pub fn is_registered(&self, kernel_id: &KernelId) -> bool {
427 self.kernel_tenant.read().contains_key(kernel_id)
428 }
429
430 pub fn tenant_of(&self, kernel_id: &KernelId) -> Option<TenantId> {
432 self.kernel_tenant.read().get(kernel_id).copied()
433 }
434
435 pub fn registered_kernels(&self) -> Vec<KernelId> {
437 self.kernel_tenant.read().keys().cloned().collect()
438 }
439
440 pub fn registered_kernels_for(&self, tenant_id: TenantId) -> Vec<KernelId> {
442 self.tenants
443 .read()
444 .get(&tenant_id)
445 .map(|sub| sub.endpoints.read().keys().cloned().collect())
446 .unwrap_or_default()
447 }
448
449 pub async fn send(
456 &self,
457 source: KernelId,
458 destination: KernelId,
459 envelope: MessageEnvelope,
460 ) -> Result<DeliveryReceipt> {
461 self.send_priority(source, destination, envelope, 0).await
462 }
463
464 pub async fn send_priority(
466 &self,
467 source: KernelId,
468 destination: KernelId,
469 envelope: MessageEnvelope,
470 priority: u8,
471 ) -> Result<DeliveryReceipt> {
472 let source_tenant = self
474 .kernel_tenant
475 .read()
476 .get(&source)
477 .copied()
478 .unwrap_or(UNSPECIFIED_TENANT);
479
480 let dest_tenant = self
486 .kernel_tenant
487 .read()
488 .get(&destination)
489 .copied()
490 .unwrap_or(source_tenant);
491
492 if source_tenant != dest_tenant {
494 self.cross_tenant_rejections.fetch_add(1, Ordering::Relaxed);
495 self.registry.audit_cross_tenant(
496 source_tenant,
497 dest_tenant,
498 source.as_str(),
499 destination.as_str(),
500 envelope.audit_tag,
501 );
502 return Err(RingKernelError::TenantMismatch {
503 from: source_tenant,
504 to: dest_tenant,
505 });
506 }
507
508 self.registry
510 .check_quota(source_tenant, envelope.audit_tag)?;
511 self.registry.record_message(source_tenant);
512
513 let mut envelope = envelope;
516 envelope.tenant_id = source_tenant;
517 if envelope.audit_tag.is_unspecified() {
518 let sub = self
519 .tenants
520 .read()
521 .get(&source_tenant)
522 .cloned()
523 .expect("tenant sub-broker must exist for registered sender");
524 envelope.audit_tag = sub.audit_tag_for(&source);
525 }
526
527 let timestamp = envelope.header.timestamp;
528 let mut message = K2KMessage::new(source.clone(), destination.clone(), envelope, timestamp);
529 message.priority = priority;
530
531 self.deliver_in(source_tenant, message).await
532 }
533
534 pub async fn send_with_audit(
541 &self,
542 source: KernelId,
543 destination: KernelId,
544 envelope: MessageEnvelope,
545 audit_tag: AuditTag,
546 ) -> Result<DeliveryReceipt> {
547 let envelope = envelope.with_audit_tag(audit_tag);
548 self.send(source, destination, envelope).await
549 }
550
551 async fn deliver_in(
553 &self,
554 tenant_id: TenantId,
555 message: K2KMessage,
556 ) -> Result<DeliveryReceipt> {
557 let sub = self
558 .tenants
559 .read()
560 .get(&tenant_id)
561 .cloned()
562 .ok_or_else(|| {
563 RingKernelError::K2KError(format!(
564 "tenant sub-broker {} disappeared mid-send",
565 tenant_id
566 ))
567 })?;
568
569 let message_id = message.id;
570 let source = message.source.clone();
571 let destination = message.destination.clone();
572 let timestamp = message.sent_at;
573
574 let endpoints = sub.endpoints.read();
576 if let Some(sender) = endpoints.get(&destination) {
577 match sender.try_send(message) {
578 Ok(()) => {
579 sub.messages_delivered.fetch_add(1, Ordering::Relaxed);
580 self.message_counter.fetch_add(1, Ordering::Relaxed);
581 let receipt = DeliveryReceipt {
582 message_id,
583 source,
584 destination,
585 status: DeliveryStatus::Delivered,
586 timestamp,
587 };
588 self.receipts.write().insert(message_id, receipt.clone());
589 return Ok(receipt);
590 }
591 Err(mpsc::error::TrySendError::Full(_)) => {
592 return Ok(DeliveryReceipt {
593 message_id,
594 source,
595 destination,
596 status: DeliveryStatus::QueueFull,
597 timestamp,
598 });
599 }
600 Err(mpsc::error::TrySendError::Closed(_)) => {
601 return Ok(DeliveryReceipt {
602 message_id,
603 source,
604 destination,
605 status: DeliveryStatus::NotFound,
606 timestamp,
607 });
608 }
609 }
610 }
611 drop(endpoints);
612
613 let next_hop = sub.routing_table.read().get(&destination).cloned();
616 if let Some(next_hop) = next_hop {
617 let routed_message = K2KMessage {
618 id: message_id,
619 source: source.clone(),
620 destination: destination.clone(),
621 envelope: message.envelope,
622 hops: message.hops + 1,
623 sent_at: message.sent_at,
624 priority: message.priority,
625 };
626
627 if routed_message.hops > self.config.max_hops {
628 return Ok(DeliveryReceipt {
629 message_id,
630 source,
631 destination,
632 status: DeliveryStatus::MaxHopsExceeded,
633 timestamp,
634 });
635 }
636
637 let endpoints = sub.endpoints.read();
638 if let Some(sender) = endpoints.get(&next_hop) {
639 if sender.try_send(routed_message).is_ok() {
640 sub.messages_delivered.fetch_add(1, Ordering::Relaxed);
641 self.message_counter.fetch_add(1, Ordering::Relaxed);
642 return Ok(DeliveryReceipt {
643 message_id,
644 source,
645 destination,
646 status: DeliveryStatus::Pending,
647 timestamp,
648 });
649 }
650 }
651 }
652
653 Ok(DeliveryReceipt {
655 message_id,
656 source,
657 destination,
658 status: DeliveryStatus::NotFound,
659 timestamp,
660 })
661 }
662
663 pub fn add_route(&self, destination: KernelId, next_hop: KernelId) {
666 self.add_route_in(UNSPECIFIED_TENANT, destination, next_hop);
667 }
668
669 pub fn add_route_in(&self, tenant_id: TenantId, destination: KernelId, next_hop: KernelId) {
671 let sub = {
672 let mut tenants = self.tenants.write();
673 tenants
674 .entry(tenant_id)
675 .or_insert_with(|| Arc::new(K2KSubBroker::new(tenant_id)))
676 .clone()
677 };
678 sub.routing_table.write().insert(destination, next_hop);
679 }
680
681 pub fn remove_route(&self, destination: &KernelId) {
683 self.remove_route_in(UNSPECIFIED_TENANT, destination);
684 }
685
686 pub fn remove_route_in(&self, tenant_id: TenantId, destination: &KernelId) {
688 if let Some(sub) = self.tenants.read().get(&tenant_id).cloned() {
689 sub.routing_table.write().remove(destination);
690 }
691 }
692
693 pub fn stats(&self) -> K2KStats {
695 let tenants = self.tenants.read();
696 let mut registered = 0usize;
697 let mut routes = 0usize;
698 for sub in tenants.values() {
699 registered += sub.endpoints.read().len();
700 routes += sub.routing_table.read().len();
701 }
702 K2KStats {
703 registered_endpoints: registered,
704 messages_delivered: self.message_counter.load(Ordering::Relaxed),
705 routes_configured: routes,
706 tenant_count: tenants.len(),
707 cross_tenant_rejections: self.cross_tenant_rejections.load(Ordering::Relaxed),
708 }
709 }
710
711 pub fn tenant_stats(&self, tenant_id: TenantId) -> Option<TenantStats> {
713 self.tenants.read().get(&tenant_id).map(|sub| TenantStats {
714 tenant_id,
715 registered_endpoints: sub.endpoints.read().len(),
716 routes_configured: sub.routing_table.read().len(),
717 messages_delivered: sub.messages_delivered.load(Ordering::Relaxed),
718 })
719 }
720
721 pub fn get_receipt(&self, message_id: &MessageId) -> Option<DeliveryReceipt> {
723 self.receipts.read().get(message_id).cloned()
724 }
725}
726
727#[derive(Debug, Clone, Default)]
729pub struct K2KStats {
730 pub registered_endpoints: usize,
732 pub messages_delivered: u64,
734 pub routes_configured: usize,
736 pub tenant_count: usize,
738 pub cross_tenant_rejections: u64,
740}
741
742#[derive(Debug, Clone, Default)]
744pub struct TenantStats {
745 pub tenant_id: TenantId,
747 pub registered_endpoints: usize,
749 pub routes_configured: usize,
751 pub messages_delivered: u64,
753}
754
755pub struct K2KBuilder {
757 config: K2KConfig,
758 registry: Option<Arc<TenantRegistry>>,
759}
760
761impl K2KBuilder {
762 pub fn new() -> Self {
764 Self {
765 config: K2KConfig::default(),
766 registry: None,
767 }
768 }
769
770 pub fn max_pending_messages(mut self, count: usize) -> Self {
772 self.config.max_pending_messages = count;
773 self
774 }
775
776 pub fn delivery_timeout_ms(mut self, timeout: u64) -> Self {
778 self.config.delivery_timeout_ms = timeout;
779 self
780 }
781
782 pub fn enable_tracing(mut self, enable: bool) -> Self {
784 self.config.enable_tracing = enable;
785 self
786 }
787
788 pub fn max_hops(mut self, hops: u8) -> Self {
790 self.config.max_hops = hops;
791 self
792 }
793
794 pub fn with_registry(mut self, registry: Arc<TenantRegistry>) -> Self {
796 self.registry = Some(registry);
797 self
798 }
799
800 pub fn build(self) -> Arc<K2KBroker> {
802 match self.registry {
803 Some(registry) => K2KBroker::with_registry(self.config, registry),
804 None => K2KBroker::new(self.config),
805 }
806 }
807}
808
809impl Default for K2KBuilder {
810 fn default() -> Self {
811 Self::new()
812 }
813}
814
815#[derive(Debug, Clone)]
837pub struct K2KMessageRegistration {
838 pub type_id: u64,
840 pub type_name: &'static str,
842 pub k2k_routable: bool,
844 pub category: Option<&'static str>,
846}
847
848inventory::collect!(K2KMessageRegistration);
850
851pub struct K2KTypeRegistry {
874 by_type_id: HashMap<u64, &'static K2KMessageRegistration>,
876 by_type_name: HashMap<&'static str, &'static K2KMessageRegistration>,
878 by_category: HashMap<&'static str, Vec<u64>>,
880}
881
882impl K2KTypeRegistry {
883 pub fn discover() -> Self {
888 let mut registry = Self {
889 by_type_id: HashMap::new(),
890 by_type_name: HashMap::new(),
891 by_category: HashMap::new(),
892 };
893
894 for reg in inventory::iter::<K2KMessageRegistration>() {
895 registry.by_type_id.insert(reg.type_id, reg);
896 registry.by_type_name.insert(reg.type_name, reg);
897 if let Some(cat) = reg.category {
898 registry
899 .by_category
900 .entry(cat)
901 .or_default()
902 .push(reg.type_id);
903 }
904 }
905
906 registry
907 }
908
909 pub fn is_routable(&self, type_id: u64) -> bool {
911 self.by_type_id
912 .get(&type_id)
913 .map(|r| r.k2k_routable)
914 .unwrap_or(false)
915 }
916
917 pub fn get(&self, type_id: u64) -> Option<&'static K2KMessageRegistration> {
919 self.by_type_id.get(&type_id).copied()
920 }
921
922 pub fn get_by_name(&self, type_name: &str) -> Option<&'static K2KMessageRegistration> {
924 self.by_type_name.get(type_name).copied()
925 }
926
927 pub fn get_category(&self, category: &str) -> &[u64] {
929 self.by_category
930 .get(category)
931 .map(|v| v.as_slice())
932 .unwrap_or(&[])
933 }
934
935 pub fn categories(&self) -> impl Iterator<Item = &'static str> + '_ {
937 self.by_category.keys().copied()
938 }
939
940 pub fn iter(&self) -> impl Iterator<Item = &'static K2KMessageRegistration> + '_ {
942 self.by_type_id.values().copied()
943 }
944
945 pub fn routable_types(&self) -> Vec<u64> {
947 self.by_type_id
948 .iter()
949 .filter(|(_, r)| r.k2k_routable)
950 .map(|(id, _)| *id)
951 .collect()
952 }
953
954 pub fn len(&self) -> usize {
956 self.by_type_id.len()
957 }
958
959 pub fn is_empty(&self) -> bool {
961 self.by_type_id.is_empty()
962 }
963}
964
965impl Default for K2KTypeRegistry {
966 fn default() -> Self {
967 Self::discover()
968 }
969}
970
971impl std::fmt::Debug for K2KTypeRegistry {
972 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
973 f.debug_struct("K2KTypeRegistry")
974 .field("registered_types", &self.by_type_id.len())
975 .field("categories", &self.by_category.keys().collect::<Vec<_>>())
976 .finish()
977 }
978}
979
980#[cfg(feature = "crypto")]
986#[derive(Debug, Clone)]
987pub struct K2KEncryptionConfig {
988 pub enabled: bool,
990 pub algorithm: K2KEncryptionAlgorithm,
992 pub forward_secrecy: bool,
994 pub key_rotation_interval_secs: u64,
996 pub require_encryption: bool,
998}
999
1000#[cfg(feature = "crypto")]
1001impl Default for K2KEncryptionConfig {
1002 fn default() -> Self {
1003 Self {
1004 enabled: true,
1005 algorithm: K2KEncryptionAlgorithm::Aes256Gcm,
1006 forward_secrecy: true,
1007 key_rotation_interval_secs: 3600, require_encryption: false,
1009 }
1010 }
1011}
1012
1013#[cfg(feature = "crypto")]
1014impl K2KEncryptionConfig {
1015 pub fn disabled() -> Self {
1017 Self {
1018 enabled: false,
1019 ..Default::default()
1020 }
1021 }
1022
1023 pub fn strict() -> Self {
1025 Self {
1026 enabled: true,
1027 require_encryption: true,
1028 forward_secrecy: true,
1029 ..Default::default()
1030 }
1031 }
1032}
1033
1034#[cfg(feature = "crypto")]
1036#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1037pub enum K2KEncryptionAlgorithm {
1038 Aes256Gcm,
1040 ChaCha20Poly1305,
1042}
1043
1044#[cfg(feature = "crypto")]
1046pub struct K2KKeyMaterial {
1047 kernel_id: KernelId,
1049 long_term_key: [u8; 32],
1051 session_key: parking_lot::RwLock<[u8; 32]>,
1053 session_generation: std::sync::atomic::AtomicU64,
1055 created_at: std::time::Instant,
1057 last_rotated: parking_lot::RwLock<std::time::Instant>,
1059}
1060
1061#[cfg(feature = "crypto")]
1062impl K2KKeyMaterial {
1063 pub fn new(kernel_id: KernelId) -> Self {
1065 use rand::RngCore;
1066 let mut rng = rand::thread_rng();
1067
1068 let mut long_term_key = [0u8; 32];
1069 let mut session_key = [0u8; 32];
1070 rng.fill_bytes(&mut long_term_key);
1071 rng.fill_bytes(&mut session_key);
1072
1073 let now = std::time::Instant::now();
1074 Self {
1075 kernel_id,
1076 long_term_key,
1077 session_key: parking_lot::RwLock::new(session_key),
1078 session_generation: std::sync::atomic::AtomicU64::new(1),
1079 created_at: now,
1080 last_rotated: parking_lot::RwLock::new(now),
1081 }
1082 }
1083
1084 pub fn from_key(kernel_id: KernelId, key: [u8; 32]) -> Self {
1086 use rand::RngCore;
1087 let mut rng = rand::thread_rng();
1088
1089 let mut session_key = [0u8; 32];
1090 rng.fill_bytes(&mut session_key);
1091
1092 let now = std::time::Instant::now();
1093 Self {
1094 kernel_id,
1095 long_term_key: key,
1096 session_key: parking_lot::RwLock::new(session_key),
1097 session_generation: std::sync::atomic::AtomicU64::new(1),
1098 created_at: now,
1099 last_rotated: parking_lot::RwLock::new(now),
1100 }
1101 }
1102
1103 pub fn kernel_id(&self) -> &KernelId {
1105 &self.kernel_id
1106 }
1107
1108 pub fn session_key(&self) -> [u8; 32] {
1110 *self.session_key.read()
1111 }
1112
1113 pub fn session_generation(&self) -> u64 {
1115 self.session_generation
1116 .load(std::sync::atomic::Ordering::Acquire)
1117 }
1118
1119 pub fn rotate_session_key(&self) {
1121 use rand::RngCore;
1122 let mut rng = rand::thread_rng();
1123
1124 let mut new_key = [0u8; 32];
1125 rng.fill_bytes(&mut new_key);
1126
1127 *self.session_key.write() = new_key;
1128 self.session_generation
1129 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
1130 *self.last_rotated.write() = std::time::Instant::now();
1131 }
1132
1133 pub fn derive_shared_secret(&self, dest_public_key: &[u8; 32]) -> [u8; 32] {
1135 use sha2::{Digest, Sha256};
1136
1137 let mut hasher = Sha256::new();
1140 hasher.update(&self.long_term_key);
1141 hasher.update(dest_public_key);
1142 hasher.update(b"k2k-shared-secret-v1");
1143
1144 let result = hasher.finalize();
1145 let mut secret = [0u8; 32];
1146 secret.copy_from_slice(&result);
1147 secret
1148 }
1149
1150 pub fn should_rotate(&self, interval_secs: u64) -> bool {
1152 if interval_secs == 0 {
1153 return false;
1154 }
1155 let elapsed = self.last_rotated.read().elapsed();
1156 elapsed.as_secs() >= interval_secs
1157 }
1158
1159 pub fn age(&self) -> std::time::Duration {
1161 self.created_at.elapsed()
1162 }
1163}
1164
1165#[cfg(feature = "crypto")]
1166impl Drop for K2KKeyMaterial {
1167 fn drop(&mut self) {
1168 use zeroize::Zeroize;
1170 self.long_term_key.zeroize();
1171 self.session_key.write().zeroize();
1172 }
1173}
1174
1175#[cfg(feature = "crypto")]
1177#[derive(Debug, Clone)]
1178pub struct EncryptedK2KMessage {
1179 pub id: MessageId,
1181 pub source: KernelId,
1183 pub destination: KernelId,
1185 pub hops: u8,
1187 pub sent_at: HlcTimestamp,
1189 pub priority: u8,
1191 pub key_generation: u64,
1193 pub nonce: [u8; 12],
1195 pub ciphertext: Vec<u8>,
1197 pub tag: [u8; 16],
1199}
1200
1201#[cfg(feature = "crypto")]
1203pub struct K2KEncryptor {
1204 config: K2KEncryptionConfig,
1206 key_material: K2KKeyMaterial,
1208 peer_keys: parking_lot::RwLock<HashMap<KernelId, [u8; 32]>>,
1210 stats: K2KEncryptionStats,
1212}
1213
1214#[cfg(feature = "crypto")]
1215impl K2KEncryptor {
1216 pub fn new(kernel_id: KernelId, config: K2KEncryptionConfig) -> Self {
1218 Self {
1219 config,
1220 key_material: K2KKeyMaterial::new(kernel_id),
1221 peer_keys: parking_lot::RwLock::new(HashMap::new()),
1222 stats: K2KEncryptionStats::default(),
1223 }
1224 }
1225
1226 pub fn with_key(kernel_id: KernelId, key: [u8; 32], config: K2KEncryptionConfig) -> Self {
1228 Self {
1229 config,
1230 key_material: K2KKeyMaterial::from_key(kernel_id, key),
1231 peer_keys: parking_lot::RwLock::new(HashMap::new()),
1232 stats: K2KEncryptionStats::default(),
1233 }
1234 }
1235
1236 pub fn public_key(&self) -> [u8; 32] {
1238 use sha2::{Digest, Sha256};
1241 let mut hasher = Sha256::new();
1242 hasher.update(&self.key_material.long_term_key);
1243 hasher.update(b"k2k-public-key-v1");
1244 let result = hasher.finalize();
1245 let mut public = [0u8; 32];
1246 public.copy_from_slice(&result);
1247 public
1248 }
1249
1250 pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
1252 self.peer_keys.write().insert(kernel_id, public_key);
1253 }
1254
1255 pub fn unregister_peer(&self, kernel_id: &KernelId) {
1257 self.peer_keys.write().remove(kernel_id);
1258 }
1259
1260 pub fn maybe_rotate(&self) {
1262 if self
1263 .key_material
1264 .should_rotate(self.config.key_rotation_interval_secs)
1265 {
1266 self.key_material.rotate_session_key();
1267 self.stats
1268 .key_rotations
1269 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1270 }
1271 }
1272
1273 pub fn encrypt(&self, message: &K2KMessage) -> Result<EncryptedK2KMessage> {
1275 if !self.config.enabled {
1276 return Err(RingKernelError::K2KError(
1277 "K2K encryption is disabled".to_string(),
1278 ));
1279 }
1280
1281 let peer_key = self
1283 .peer_keys
1284 .read()
1285 .get(&message.destination)
1286 .copied()
1287 .ok_or_else(|| {
1288 RingKernelError::K2KError(format!(
1289 "No public key registered for destination kernel: {}",
1290 message.destination
1291 ))
1292 })?;
1293
1294 let shared_secret = self.key_material.derive_shared_secret(&peer_key);
1296 let session_key = if self.config.forward_secrecy {
1297 use sha2::{Digest, Sha256};
1299 let mut hasher = Sha256::new();
1300 hasher.update(&shared_secret);
1301 hasher.update(&self.key_material.session_key());
1302 let result = hasher.finalize();
1303 let mut key = [0u8; 32];
1304 key.copy_from_slice(&result);
1305 key
1306 } else {
1307 shared_secret
1308 };
1309
1310 use rand::RngCore;
1312 let mut nonce = [0u8; 12];
1313 rand::thread_rng().fill_bytes(&mut nonce);
1314
1315 let envelope_bytes = message.envelope.to_bytes();
1317
1318 let (ciphertext, tag) = match self.config.algorithm {
1320 K2KEncryptionAlgorithm::Aes256Gcm => {
1321 use aes_gcm::{
1322 aead::{Aead, KeyInit},
1323 Aes256Gcm, Nonce,
1324 };
1325 let cipher = Aes256Gcm::new_from_slice(&session_key)
1326 .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
1327
1328 let nonce_obj = Nonce::from_slice(&nonce);
1329 let ciphertext = cipher
1330 .encrypt(nonce_obj, envelope_bytes.as_slice())
1331 .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
1332
1333 let tag_start = ciphertext.len() - 16;
1335 let mut tag = [0u8; 16];
1336 tag.copy_from_slice(&ciphertext[tag_start..]);
1337 (ciphertext[..tag_start].to_vec(), tag)
1338 }
1339 K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
1340 use chacha20poly1305::{
1341 aead::{Aead, KeyInit},
1342 ChaCha20Poly1305, Nonce,
1343 };
1344 let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
1345 .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
1346
1347 let nonce_obj = Nonce::from_slice(&nonce);
1348 let ciphertext = cipher
1349 .encrypt(nonce_obj, envelope_bytes.as_slice())
1350 .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
1351
1352 let tag_start = ciphertext.len() - 16;
1353 let mut tag = [0u8; 16];
1354 tag.copy_from_slice(&ciphertext[tag_start..]);
1355 (ciphertext[..tag_start].to_vec(), tag)
1356 }
1357 };
1358
1359 self.stats
1360 .messages_encrypted
1361 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1362 self.stats.bytes_encrypted.fetch_add(
1363 envelope_bytes.len() as u64,
1364 std::sync::atomic::Ordering::Relaxed,
1365 );
1366
1367 Ok(EncryptedK2KMessage {
1368 id: message.id,
1369 source: message.source.clone(),
1370 destination: message.destination.clone(),
1371 hops: message.hops,
1372 sent_at: message.sent_at,
1373 priority: message.priority,
1374 key_generation: self.key_material.session_generation(),
1375 nonce,
1376 ciphertext,
1377 tag,
1378 })
1379 }
1380
1381 pub fn decrypt(&self, encrypted: &EncryptedK2KMessage) -> Result<K2KMessage> {
1383 if !self.config.enabled {
1384 return Err(RingKernelError::K2KError(
1385 "K2K encryption is disabled".to_string(),
1386 ));
1387 }
1388
1389 let peer_key = self
1391 .peer_keys
1392 .read()
1393 .get(&encrypted.source)
1394 .copied()
1395 .ok_or_else(|| {
1396 RingKernelError::K2KError(format!(
1397 "No public key registered for source kernel: {}",
1398 encrypted.source
1399 ))
1400 })?;
1401
1402 let shared_secret = self.key_material.derive_shared_secret(&peer_key);
1404 let session_key = if self.config.forward_secrecy {
1405 use sha2::{Digest, Sha256};
1406 let mut hasher = Sha256::new();
1407 hasher.update(&shared_secret);
1408 hasher.update(&self.key_material.session_key());
1409 let result = hasher.finalize();
1410 let mut key = [0u8; 32];
1411 key.copy_from_slice(&result);
1412 key
1413 } else {
1414 shared_secret
1415 };
1416
1417 let mut full_ciphertext = encrypted.ciphertext.clone();
1419 full_ciphertext.extend_from_slice(&encrypted.tag);
1420
1421 let plaintext = match self.config.algorithm {
1423 K2KEncryptionAlgorithm::Aes256Gcm => {
1424 use aes_gcm::{
1425 aead::{Aead, KeyInit},
1426 Aes256Gcm, Nonce,
1427 };
1428 let cipher = Aes256Gcm::new_from_slice(&session_key)
1429 .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
1430
1431 let nonce = Nonce::from_slice(&encrypted.nonce);
1432 cipher
1433 .decrypt(nonce, full_ciphertext.as_slice())
1434 .map_err(|e| {
1435 self.stats
1436 .decryption_failures
1437 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1438 RingKernelError::K2KError(format!("Decryption failed: {}", e))
1439 })?
1440 }
1441 K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
1442 use chacha20poly1305::{
1443 aead::{Aead, KeyInit},
1444 ChaCha20Poly1305, Nonce,
1445 };
1446 let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
1447 .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
1448
1449 let nonce = Nonce::from_slice(&encrypted.nonce);
1450 cipher
1451 .decrypt(nonce, full_ciphertext.as_slice())
1452 .map_err(|e| {
1453 self.stats
1454 .decryption_failures
1455 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1456 RingKernelError::K2KError(format!("Decryption failed: {}", e))
1457 })?
1458 }
1459 };
1460
1461 let envelope = MessageEnvelope::from_bytes(&plaintext).map_err(|e| {
1463 RingKernelError::K2KError(format!("Envelope deserialization failed: {}", e))
1464 })?;
1465
1466 self.stats
1467 .messages_decrypted
1468 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1469 self.stats
1470 .bytes_decrypted
1471 .fetch_add(plaintext.len() as u64, std::sync::atomic::Ordering::Relaxed);
1472
1473 Ok(K2KMessage {
1474 id: encrypted.id,
1475 source: encrypted.source.clone(),
1476 destination: encrypted.destination.clone(),
1477 envelope,
1478 hops: encrypted.hops,
1479 sent_at: encrypted.sent_at,
1480 priority: encrypted.priority,
1481 })
1482 }
1483
1484 pub fn stats(&self) -> K2KEncryptionStatsSnapshot {
1486 K2KEncryptionStatsSnapshot {
1487 messages_encrypted: self
1488 .stats
1489 .messages_encrypted
1490 .load(std::sync::atomic::Ordering::Relaxed),
1491 messages_decrypted: self
1492 .stats
1493 .messages_decrypted
1494 .load(std::sync::atomic::Ordering::Relaxed),
1495 bytes_encrypted: self
1496 .stats
1497 .bytes_encrypted
1498 .load(std::sync::atomic::Ordering::Relaxed),
1499 bytes_decrypted: self
1500 .stats
1501 .bytes_decrypted
1502 .load(std::sync::atomic::Ordering::Relaxed),
1503 key_rotations: self
1504 .stats
1505 .key_rotations
1506 .load(std::sync::atomic::Ordering::Relaxed),
1507 decryption_failures: self
1508 .stats
1509 .decryption_failures
1510 .load(std::sync::atomic::Ordering::Relaxed),
1511 peer_count: self.peer_keys.read().len(),
1512 session_generation: self.key_material.session_generation(),
1513 }
1514 }
1515
1516 pub fn config(&self) -> &K2KEncryptionConfig {
1518 &self.config
1519 }
1520}
1521
1522#[cfg(feature = "crypto")]
1524#[derive(Default)]
1525struct K2KEncryptionStats {
1526 messages_encrypted: std::sync::atomic::AtomicU64,
1527 messages_decrypted: std::sync::atomic::AtomicU64,
1528 bytes_encrypted: std::sync::atomic::AtomicU64,
1529 bytes_decrypted: std::sync::atomic::AtomicU64,
1530 key_rotations: std::sync::atomic::AtomicU64,
1531 decryption_failures: std::sync::atomic::AtomicU64,
1532}
1533
1534#[cfg(feature = "crypto")]
1536#[derive(Debug, Clone, Default)]
1537pub struct K2KEncryptionStatsSnapshot {
1538 pub messages_encrypted: u64,
1540 pub messages_decrypted: u64,
1542 pub bytes_encrypted: u64,
1544 pub bytes_decrypted: u64,
1546 pub key_rotations: u64,
1548 pub decryption_failures: u64,
1550 pub peer_count: usize,
1552 pub session_generation: u64,
1554}
1555
1556#[cfg(feature = "crypto")]
1558pub struct EncryptedK2KEndpoint {
1559 inner: K2KEndpoint,
1561 encryptor: Arc<K2KEncryptor>,
1563}
1564
1565#[cfg(feature = "crypto")]
1566impl EncryptedK2KEndpoint {
1567 pub fn new(inner: K2KEndpoint, encryptor: Arc<K2KEncryptor>) -> Self {
1569 Self { inner, encryptor }
1570 }
1571
1572 pub fn public_key(&self) -> [u8; 32] {
1574 self.encryptor.public_key()
1575 }
1576
1577 pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
1579 self.encryptor.register_peer(kernel_id, public_key);
1580 }
1581
1582 pub async fn send_encrypted(
1584 &self,
1585 destination: KernelId,
1586 envelope: MessageEnvelope,
1587 ) -> Result<DeliveryReceipt> {
1588 self.encryptor.maybe_rotate();
1589
1590 let timestamp = envelope.header.timestamp;
1591 let message = K2KMessage::new(
1592 self.inner.kernel_id.clone(),
1593 destination.clone(),
1594 envelope,
1595 timestamp,
1596 );
1597
1598 let _encrypted = self.encryptor.encrypt(&message)?;
1600
1601 self.inner.send(destination, message.envelope).await
1604 }
1605
1606 pub async fn receive_decrypted(&mut self) -> Option<K2KMessage> {
1608 self.inner.receive().await
1609 }
1611
1612 pub fn encryption_stats(&self) -> K2KEncryptionStatsSnapshot {
1614 self.encryptor.stats()
1615 }
1616}
1617
1618#[cfg(feature = "crypto")]
1620pub struct EncryptedK2KBuilder {
1621 k2k_config: K2KConfig,
1622 encryption_config: K2KEncryptionConfig,
1623}
1624
1625#[cfg(feature = "crypto")]
1626impl EncryptedK2KBuilder {
1627 pub fn new() -> Self {
1629 Self {
1630 k2k_config: K2KConfig::default(),
1631 encryption_config: K2KEncryptionConfig::default(),
1632 }
1633 }
1634
1635 pub fn k2k_config(mut self, config: K2KConfig) -> Self {
1637 self.k2k_config = config;
1638 self
1639 }
1640
1641 pub fn encryption_config(mut self, config: K2KEncryptionConfig) -> Self {
1643 self.encryption_config = config;
1644 self
1645 }
1646
1647 pub fn with_forward_secrecy(mut self, enabled: bool) -> Self {
1649 self.encryption_config.forward_secrecy = enabled;
1650 self
1651 }
1652
1653 pub fn with_algorithm(mut self, algorithm: K2KEncryptionAlgorithm) -> Self {
1655 self.encryption_config.algorithm = algorithm;
1656 self
1657 }
1658
1659 pub fn with_key_rotation(mut self, interval_secs: u64) -> Self {
1661 self.encryption_config.key_rotation_interval_secs = interval_secs;
1662 self
1663 }
1664
1665 pub fn require_encryption(mut self, required: bool) -> Self {
1667 self.encryption_config.require_encryption = required;
1668 self
1669 }
1670
1671 pub fn build(self) -> (Arc<K2KBroker>, K2KEncryptionConfig) {
1673 (K2KBroker::new(self.k2k_config), self.encryption_config)
1674 }
1675}
1676
1677#[cfg(feature = "crypto")]
1678impl Default for EncryptedK2KBuilder {
1679 fn default() -> Self {
1680 Self::new()
1681 }
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686 use super::*;
1687
1688 #[tokio::test]
1689 async fn test_k2k_broker_registration() {
1690 let broker = K2KBuilder::new().build();
1691
1692 let kernel1 = KernelId::new("kernel1");
1693 let kernel2 = KernelId::new("kernel2");
1694
1695 let _endpoint1 = broker.register(kernel1.clone());
1696 let _endpoint2 = broker.register(kernel2.clone());
1697
1698 assert!(broker.is_registered(&kernel1));
1699 assert!(broker.is_registered(&kernel2));
1700 assert_eq!(broker.registered_kernels().len(), 2);
1701 }
1702
1703 #[tokio::test]
1704 async fn test_k2k_message_delivery() {
1705 let broker = K2KBuilder::new().build();
1706
1707 let kernel1 = KernelId::new("kernel1");
1708 let kernel2 = KernelId::new("kernel2");
1709
1710 let endpoint1 = broker.register(kernel1.clone());
1711 let mut endpoint2 = broker.register(kernel2.clone());
1712
1713 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1715
1716 let receipt = endpoint1.send(kernel2.clone(), envelope).await.unwrap();
1718 assert_eq!(receipt.status, DeliveryStatus::Delivered);
1719
1720 let message = endpoint2.try_receive();
1722 assert!(message.is_some());
1723 assert_eq!(message.unwrap().source, kernel1);
1724 }
1725
1726 #[test]
1727 fn test_k2k_config_default() {
1728 let config = K2KConfig::default();
1729 assert_eq!(config.max_pending_messages, 1024);
1730 assert_eq!(config.delivery_timeout_ms, 5000);
1731 }
1732
1733 mod multi_tenant {
1738 use super::*;
1739 use crate::audit::MemorySink;
1740
1741 fn env() -> MessageEnvelope {
1742 MessageEnvelope::empty(1, 2, HlcTimestamp::now(1))
1743 }
1744
1745 #[tokio::test]
1748 async fn legacy_single_tenant_send_unchanged() {
1749 let broker = K2KBuilder::new().build();
1750 let k1 = KernelId::new("k1");
1751 let k2 = KernelId::new("k2");
1752 let e1 = broker.register(k1.clone());
1753 let mut e2 = broker.register(k2.clone());
1754
1755 let receipt = e1.send(k2.clone(), env()).await.unwrap();
1756 assert_eq!(receipt.status, DeliveryStatus::Delivered);
1757 let msg = e2.try_receive().unwrap();
1758 assert_eq!(msg.source, k1);
1759 assert_eq!(msg.envelope.tenant_id, UNSPECIFIED_TENANT);
1760 }
1761
1762 #[tokio::test]
1763 async fn single_tenant_fast_path_uses_unspecified_tenant() {
1764 let broker = K2KBuilder::new().build();
1765 let k = KernelId::new("k");
1766 let _e = broker.register(k.clone());
1767 assert_eq!(broker.tenant_of(&k), Some(UNSPECIFIED_TENANT));
1768 assert_eq!(broker.tenant_count(), 1);
1769 }
1770
1771 #[tokio::test]
1774 async fn cross_tenant_send_rejected_with_tenant_mismatch() {
1775 let broker = K2KBuilder::new().build();
1776 broker
1777 .registry()
1778 .register(1, TenantQuota::default())
1779 .unwrap();
1780 broker
1781 .registry()
1782 .register(2, TenantQuota::default())
1783 .unwrap();
1784
1785 let ka = KernelId::new("tenant1_kernel");
1786 let kb = KernelId::new("tenant2_kernel");
1787 let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1788 let _eb = broker.register_tenant(2, AuditTag::new(20, 200), kb.clone());
1789
1790 let err = ea.send(kb.clone(), env()).await.unwrap_err();
1791 match err {
1792 RingKernelError::TenantMismatch { from, to } => {
1793 assert_eq!(from, 1);
1794 assert_eq!(to, 2);
1795 }
1796 other => panic!("expected TenantMismatch, got {:?}", other),
1797 }
1798 assert_eq!(broker.stats().cross_tenant_rejections, 1);
1799 }
1800
1801 #[tokio::test]
1802 async fn cross_tenant_attempt_recorded_in_audit_sink() {
1803 let sink = Arc::new(MemorySink::new(100));
1804 let registry = Arc::new(TenantRegistry::with_audit_sink(sink.clone()));
1805 registry.register(1, TenantQuota::default()).unwrap();
1806 registry.register(2, TenantQuota::default()).unwrap();
1807
1808 let broker = K2KBuilder::new().with_registry(registry).build();
1809 let ka = KernelId::new("ka");
1810 let kb = KernelId::new("kb");
1811 let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1812 let _eb = broker.register_tenant(2, AuditTag::new(20, 200), kb.clone());
1813
1814 let _ = ea.send(kb.clone(), env()).await.unwrap_err();
1815 let events = sink.events();
1816 assert_eq!(events.len(), 1);
1817 assert!(events[0].description.contains("cross-tenant"));
1818 let md: std::collections::HashMap<_, _> = events[0]
1819 .metadata
1820 .iter()
1821 .cloned()
1822 .collect::<std::collections::HashMap<_, _>>();
1823 assert_eq!(md.get("from_tenant"), Some(&"1".to_string()));
1824 assert_eq!(md.get("to_tenant"), Some(&"2".to_string()));
1825 }
1826
1827 #[tokio::test]
1830 async fn same_tenant_send_succeeds_with_audit_tag() {
1831 let broker = K2KBuilder::new().build();
1832 broker
1833 .registry()
1834 .register(1, TenantQuota::default())
1835 .unwrap();
1836
1837 let ka = KernelId::new("a");
1838 let kb = KernelId::new("b");
1839 let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1840 let mut eb = broker.register_tenant(1, AuditTag::new(10, 100), kb.clone());
1841
1842 let receipt = ea.send(kb.clone(), env()).await.unwrap();
1843 assert_eq!(receipt.status, DeliveryStatus::Delivered);
1844
1845 let msg = eb.try_receive().unwrap();
1846 assert_eq!(msg.envelope.tenant_id, 1);
1847 assert_eq!(msg.envelope.audit_tag, AuditTag::new(10, 100));
1848 }
1849
1850 #[tokio::test]
1853 async fn engagement_cost_accumulates_across_sends() {
1854 let broker = K2KBuilder::new().build();
1855 broker
1856 .registry()
1857 .register(1, TenantQuota::unlimited())
1858 .unwrap();
1859
1860 let ka = KernelId::new("a");
1861 let kb = KernelId::new("b");
1862 let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1863 let _eb = broker.register_tenant(1, AuditTag::new(10, 100), kb.clone());
1864
1865 for _ in 0..4 {
1866 let _ = ea.send(kb.clone(), env()).await.unwrap();
1867 broker.registry().track_usage(
1868 1,
1869 AuditTag::new(10, 100),
1870 std::time::Duration::from_millis(50),
1871 );
1872 }
1873 let cost = broker
1874 .registry()
1875 .get_engagement_cost_for(1, AuditTag::new(10, 100));
1876 assert_eq!(cost, std::time::Duration::from_millis(200));
1877 }
1878
1879 #[tokio::test]
1880 async fn engagement_cost_separate_across_audit_tags() {
1881 let broker = K2KBuilder::new().build();
1882 broker
1883 .registry()
1884 .register(1, TenantQuota::unlimited())
1885 .unwrap();
1886
1887 let tag_a = AuditTag::new(10, 1);
1888 let tag_b = AuditTag::new(10, 2);
1889 broker
1890 .registry()
1891 .track_usage(1, tag_a, std::time::Duration::from_millis(150));
1892 broker
1893 .registry()
1894 .track_usage(1, tag_b, std::time::Duration::from_millis(300));
1895
1896 assert_eq!(
1897 broker.registry().get_engagement_cost_for(1, tag_a),
1898 std::time::Duration::from_millis(150)
1899 );
1900 assert_eq!(
1901 broker.registry().get_engagement_cost_for(1, tag_b),
1902 std::time::Duration::from_millis(300)
1903 );
1904 }
1905
1906 #[tokio::test]
1909 async fn quota_enforcement_rejects_over_rate_limit() {
1910 let broker = K2KBuilder::new().build();
1911 let mut quota = TenantQuota::default();
1912 quota.max_messages_per_sec = 2;
1913 broker.registry().register(1, quota).unwrap();
1914
1915 let ka = KernelId::new("a");
1916 let kb = KernelId::new("b");
1917 let ea = broker.register_tenant(1, AuditTag::new(10, 1), ka.clone());
1918 let _eb = broker.register_tenant(1, AuditTag::new(10, 1), kb.clone());
1919
1920 assert!(ea.send(kb.clone(), env()).await.is_ok());
1921 assert!(ea.send(kb.clone(), env()).await.is_ok());
1922 let err = ea.send(kb.clone(), env()).await.unwrap_err();
1923 assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
1924 }
1925
1926 #[tokio::test]
1929 async fn register_tenant_kernel_and_unregister() {
1930 let broker = K2KBuilder::new().build();
1931 broker
1932 .registry()
1933 .register(7, TenantQuota::default())
1934 .unwrap();
1935 let k = KernelId::new("k");
1936 let _ep = broker.register_tenant(7, AuditTag::new(1, 1), k.clone());
1937 assert_eq!(broker.tenant_of(&k), Some(7));
1938 assert_eq!(broker.registered_kernels_for(7), vec![k.clone()]);
1939
1940 broker.unregister(&k);
1941 assert!(!broker.is_registered(&k));
1942 assert!(broker.registered_kernels_for(7).is_empty());
1943 }
1944
1945 #[tokio::test]
1946 async fn tenant_stats_reports_per_tenant_counts() {
1947 let broker = K2KBuilder::new().build();
1948 broker
1949 .registry()
1950 .register(1, TenantQuota::default())
1951 .unwrap();
1952 broker
1953 .registry()
1954 .register(2, TenantQuota::default())
1955 .unwrap();
1956
1957 let _ea = broker.register_tenant(1, AuditTag::unspecified(), KernelId::new("a"));
1958 let _eb = broker.register_tenant(1, AuditTag::unspecified(), KernelId::new("b"));
1959 let _ec = broker.register_tenant(2, AuditTag::unspecified(), KernelId::new("c"));
1960
1961 let s1 = broker.tenant_stats(1).unwrap();
1962 let s2 = broker.tenant_stats(2).unwrap();
1963 assert_eq!(s1.registered_endpoints, 2);
1964 assert_eq!(s2.registered_endpoints, 1);
1965 }
1966
1967 #[tokio::test]
1968 async fn re_registering_kernel_moves_it_between_tenants() {
1969 let broker = K2KBuilder::new().build();
1970 broker
1971 .registry()
1972 .register(1, TenantQuota::default())
1973 .unwrap();
1974 broker
1975 .registry()
1976 .register(2, TenantQuota::default())
1977 .unwrap();
1978
1979 let k = KernelId::new("roaming");
1980 let _e1 = broker.register_tenant(1, AuditTag::unspecified(), k.clone());
1981 assert_eq!(broker.tenant_of(&k), Some(1));
1982
1983 let _e2 = broker.register_tenant(2, AuditTag::unspecified(), k.clone());
1984 assert_eq!(broker.tenant_of(&k), Some(2));
1985 assert!(broker.registered_kernels_for(1).is_empty());
1986 assert_eq!(broker.registered_kernels_for(2), vec![k]);
1987 }
1988
1989 #[tokio::test]
1990 async fn send_with_audit_overrides_registration_tag() {
1991 let broker = K2KBuilder::new().build();
1992 broker
1993 .registry()
1994 .register(1, TenantQuota::unlimited())
1995 .unwrap();
1996
1997 let ka = KernelId::new("a");
1998 let kb = KernelId::new("b");
1999 let _ea = broker.register_tenant(1, AuditTag::new(10, 1), ka.clone());
2000 let mut eb = broker.register_tenant(1, AuditTag::new(10, 1), kb.clone());
2001
2002 let receipt = broker
2003 .send_with_audit(ka.clone(), kb.clone(), env(), AuditTag::new(10, 99))
2004 .await
2005 .unwrap();
2006 assert_eq!(receipt.status, DeliveryStatus::Delivered);
2007 let msg = eb.try_receive().unwrap();
2008 assert_eq!(msg.envelope.audit_tag, AuditTag::new(10, 99));
2009 }
2010
2011 #[tokio::test]
2012 async fn default_audit_tag_behavior() {
2013 let broker = K2KBuilder::new().build();
2016 let ka = KernelId::new("a");
2017 let kb = KernelId::new("b");
2018 let _ea = broker.register(ka.clone());
2019 let mut eb = broker.register(kb.clone());
2020
2021 let _ = broker.send(ka.clone(), kb.clone(), env()).await.unwrap();
2022 let msg = eb.try_receive().unwrap();
2023 assert_eq!(msg.envelope.tenant_id, UNSPECIFIED_TENANT);
2024 assert!(msg.envelope.audit_tag.is_unspecified());
2025 }
2026 }
2027
2028 #[cfg(feature = "crypto")]
2030 mod crypto_tests {
2031 use super::*;
2032
2033 #[test]
2034 fn test_k2k_encryption_config_default() {
2035 let config = K2KEncryptionConfig::default();
2036 assert!(config.enabled);
2037 assert!(config.forward_secrecy);
2038 assert_eq!(config.algorithm, K2KEncryptionAlgorithm::Aes256Gcm);
2039 assert_eq!(config.key_rotation_interval_secs, 3600);
2040 }
2041
2042 #[test]
2043 fn test_k2k_encryption_config_disabled() {
2044 let config = K2KEncryptionConfig::disabled();
2045 assert!(!config.enabled);
2046 }
2047
2048 #[test]
2049 fn test_k2k_encryption_config_strict() {
2050 let config = K2KEncryptionConfig::strict();
2051 assert!(config.enabled);
2052 assert!(config.require_encryption);
2053 assert!(config.forward_secrecy);
2054 }
2055
2056 #[test]
2057 fn test_k2k_key_material_creation() {
2058 let kernel_id = KernelId::new("test_kernel");
2059 let key_material = K2KKeyMaterial::new(kernel_id.clone());
2060
2061 assert_eq!(key_material.kernel_id(), &kernel_id);
2062 assert_eq!(key_material.session_generation(), 1);
2063 }
2064
2065 #[test]
2066 fn test_k2k_key_material_rotation() {
2067 let kernel_id = KernelId::new("test_kernel");
2068 let key_material = K2KKeyMaterial::new(kernel_id);
2069
2070 let old_session_key = key_material.session_key();
2071 let old_generation = key_material.session_generation();
2072
2073 key_material.rotate_session_key();
2074
2075 let new_session_key = key_material.session_key();
2076 let new_generation = key_material.session_generation();
2077
2078 assert_ne!(old_session_key, new_session_key);
2079 assert_eq!(new_generation, old_generation + 1);
2080 }
2081
2082 #[test]
2083 fn test_k2k_key_material_shared_secret() {
2084 let kernel1 = K2KKeyMaterial::new(KernelId::new("kernel1"));
2085 let kernel2 = K2KKeyMaterial::new(KernelId::new("kernel2"));
2086
2087 let pk1 = {
2089 use sha2::{Digest, Sha256};
2090 let mut hasher = Sha256::new();
2091 hasher.update(&kernel1.long_term_key);
2092 hasher.update(b"k2k-public-key-v1");
2093 let result = hasher.finalize();
2094 let mut public = [0u8; 32];
2095 public.copy_from_slice(&result);
2096 public
2097 };
2098 let pk2 = {
2099 use sha2::{Digest, Sha256};
2100 let mut hasher = Sha256::new();
2101 hasher.update(&kernel2.long_term_key);
2102 hasher.update(b"k2k-public-key-v1");
2103 let result = hasher.finalize();
2104 let mut public = [0u8; 32];
2105 public.copy_from_slice(&result);
2106 public
2107 };
2108
2109 let secret1 = kernel1.derive_shared_secret(&pk2);
2111 let secret2 = kernel2.derive_shared_secret(&pk1);
2112
2113 assert_eq!(secret1.len(), 32);
2116 assert_eq!(secret2.len(), 32);
2117 }
2118
2119 #[test]
2120 fn test_k2k_encryptor_creation() {
2121 let kernel_id = KernelId::new("test_kernel");
2122 let config = K2KEncryptionConfig::default();
2123 let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
2124
2125 let public_key = encryptor.public_key();
2126 assert_eq!(public_key.len(), 32);
2127
2128 let stats = encryptor.stats();
2129 assert_eq!(stats.messages_encrypted, 0);
2130 assert_eq!(stats.messages_decrypted, 0);
2131 assert_eq!(stats.peer_count, 0);
2132 }
2133
2134 #[test]
2135 fn test_k2k_encryptor_peer_registration() {
2136 let kernel_id = KernelId::new("test_kernel");
2137 let config = K2KEncryptionConfig::default();
2138 let encryptor = K2KEncryptor::new(kernel_id, config);
2139
2140 let peer_id = KernelId::new("peer_kernel");
2141 let peer_key = [42u8; 32];
2142
2143 encryptor.register_peer(peer_id.clone(), peer_key);
2144 assert_eq!(encryptor.stats().peer_count, 1);
2145
2146 encryptor.unregister_peer(&peer_id);
2147 assert_eq!(encryptor.stats().peer_count, 0);
2148 }
2149
2150 #[test]
2151 fn test_k2k_encrypted_builder() {
2152 let (broker, config) = EncryptedK2KBuilder::new()
2153 .with_forward_secrecy(true)
2154 .with_algorithm(K2KEncryptionAlgorithm::ChaCha20Poly1305)
2155 .with_key_rotation(1800)
2156 .require_encryption(true)
2157 .build();
2158
2159 assert!(config.forward_secrecy);
2160 assert_eq!(config.algorithm, K2KEncryptionAlgorithm::ChaCha20Poly1305);
2161 assert_eq!(config.key_rotation_interval_secs, 1800);
2162 assert!(config.require_encryption);
2163
2164 let stats = broker.stats();
2166 assert_eq!(stats.registered_endpoints, 0);
2167 }
2168
2169 #[test]
2170 fn test_k2k_encryption_stats_snapshot() {
2171 let stats = K2KEncryptionStatsSnapshot::default();
2172 assert_eq!(stats.messages_encrypted, 0);
2173 assert_eq!(stats.messages_decrypted, 0);
2174 assert_eq!(stats.bytes_encrypted, 0);
2175 assert_eq!(stats.bytes_decrypted, 0);
2176 assert_eq!(stats.key_rotations, 0);
2177 assert_eq!(stats.decryption_failures, 0);
2178 assert_eq!(stats.peer_count, 0);
2179 assert_eq!(stats.session_generation, 0);
2180 }
2181
2182 #[test]
2183 fn test_k2k_encryption_algorithms() {
2184 assert_ne!(
2186 K2KEncryptionAlgorithm::Aes256Gcm,
2187 K2KEncryptionAlgorithm::ChaCha20Poly1305
2188 );
2189 }
2190
2191 #[test]
2192 fn test_k2k_key_material_should_rotate() {
2193 let kernel_id = KernelId::new("test_kernel");
2194 let key_material = K2KKeyMaterial::new(kernel_id);
2195
2196 assert!(!key_material.should_rotate(0));
2198
2199 assert!(!key_material.should_rotate(3600));
2201 }
2202
2203 #[test]
2204 fn test_k2k_encryptor_disabled_encryption() {
2205 let kernel_id = KernelId::new("test_kernel");
2206 let config = K2KEncryptionConfig::disabled();
2207 let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
2208
2209 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
2211 let message = K2KMessage::new(
2212 kernel_id,
2213 KernelId::new("dest"),
2214 envelope,
2215 HlcTimestamp::now(1),
2216 );
2217
2218 let result = encryptor.encrypt(&message);
2220 assert!(result.is_err());
2221 }
2222
2223 #[test]
2224 fn test_k2k_encryptor_missing_peer_key() {
2225 let kernel_id = KernelId::new("test_kernel");
2226 let config = K2KEncryptionConfig::default();
2227 let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
2228
2229 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
2231 let message = K2KMessage::new(
2232 kernel_id,
2233 KernelId::new("unknown_dest"),
2234 envelope,
2235 HlcTimestamp::now(1),
2236 );
2237
2238 let result = encryptor.encrypt(&message);
2240 assert!(result.is_err());
2241 assert!(result.unwrap_err().to_string().contains("No public key"));
2242 }
2243 }
2244}