Skip to main content

ringkernel_core/k2k/
mod.rs

1//! Kernel-to-Kernel (K2K) direct messaging.
2//!
3//! This module provides infrastructure for direct communication between
4//! GPU kernels without host-side mediation.
5//!
6//! # Submodules
7//!
8//! - [`tenant`] — [`tenant::TenantId`], [`tenant::TenantRegistry`],
9//!   [`tenant::TenantQuota`] with per-engagement cost tracking
10//! - [`audit_tag`] — [`audit_tag::AuditTag`] (org_id + engagement_id) for
11//!   audit trails and billable-time attribution
12//!
13//! # Multi-tenant overview
14//!
15//! The [`K2KBroker`] implements a two-tier tenancy model:
16//!
17//! 1. **Per-kernel `tenant_id`** — the primary security boundary. Kernels
18//!    registered under different tenants cannot exchange messages; sends
19//!    across the boundary return
20//!    [`crate::error::RingKernelError::TenantMismatch`].
21//! 2. **Per-message `AuditTag`** — the observability / billing tag, carried
22//!    inside every envelope alongside `tenant_id`. Cost tracking is
23//!    per-`(tenant_id, engagement_id)` via
24//!    [`tenant::TenantRegistry::track_usage`].
25//!
26//! Single-tenant deployments hit the backward-compatible fast path — see
27//! [`tenant::UNSPECIFIED_TENANT`] and [`K2KBroker::register`].
28
29use parking_lot::RwLock;
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU64, Ordering};
32use std::sync::Arc;
33use tokio::sync::mpsc;
34
35use crate::error::{Result, RingKernelError};
36use crate::hlc::HlcTimestamp;
37use crate::message::{MessageEnvelope, MessageId};
38use crate::runtime::KernelId;
39
40pub mod audit_tag;
41pub mod tenant;
42
43pub use audit_tag::AuditTag;
44pub use tenant::{TenantId, TenantInfo, TenantQuota, TenantRegistry, UNSPECIFIED_TENANT};
45
46/// Configuration for K2K messaging.
47#[derive(Debug, Clone)]
48pub struct K2KConfig {
49    /// Maximum pending messages per kernel pair.
50    pub max_pending_messages: usize,
51    /// Timeout for delivery in milliseconds.
52    pub delivery_timeout_ms: u64,
53    /// Enable message tracing.
54    pub enable_tracing: bool,
55    /// Maximum hop count for routed messages.
56    pub max_hops: u8,
57}
58
59impl Default for K2KConfig {
60    fn default() -> Self {
61        Self {
62            max_pending_messages: 1024,
63            delivery_timeout_ms: 5000,
64            enable_tracing: false,
65            max_hops: 8,
66        }
67    }
68}
69
70/// A K2K message with routing information.
71#[derive(Debug, Clone)]
72pub struct K2KMessage {
73    /// Unique message ID.
74    pub id: MessageId,
75    /// Source kernel.
76    pub source: KernelId,
77    /// Destination kernel.
78    pub destination: KernelId,
79    /// The message envelope.
80    pub envelope: MessageEnvelope,
81    /// Hop count (for detecting routing loops).
82    pub hops: u8,
83    /// Timestamp when message was sent.
84    pub sent_at: HlcTimestamp,
85    /// Priority (higher = more urgent).
86    pub priority: u8,
87}
88
89impl K2KMessage {
90    /// Create a new K2K message.
91    pub fn new(
92        source: KernelId,
93        destination: KernelId,
94        envelope: MessageEnvelope,
95        timestamp: HlcTimestamp,
96    ) -> Self {
97        Self {
98            id: MessageId::generate(),
99            source,
100            destination,
101            envelope,
102            hops: 0,
103            sent_at: timestamp,
104            priority: 0,
105        }
106    }
107
108    /// Create with priority.
109    pub fn with_priority(mut self, priority: u8) -> Self {
110        self.priority = priority;
111        self
112    }
113
114    /// Increment hop count.
115    pub fn increment_hops(&mut self) -> Result<()> {
116        self.hops += 1;
117        if self.hops > 16 {
118            return Err(RingKernelError::K2KError(
119                "Maximum hop count exceeded".to_string(),
120            ));
121        }
122        Ok(())
123    }
124}
125
126/// Receipt for a K2K message delivery.
127#[derive(Debug, Clone)]
128pub struct DeliveryReceipt {
129    /// Message ID.
130    pub message_id: MessageId,
131    /// Source kernel.
132    pub source: KernelId,
133    /// Destination kernel.
134    pub destination: KernelId,
135    /// Delivery status.
136    pub status: DeliveryStatus,
137    /// Timestamp of delivery/failure.
138    pub timestamp: HlcTimestamp,
139}
140
141/// Status of message delivery.
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum DeliveryStatus {
144    /// Message delivered successfully.
145    Delivered,
146    /// Message pending delivery.
147    Pending,
148    /// Destination kernel not found.
149    NotFound,
150    /// Destination queue full.
151    QueueFull,
152    /// Delivery timed out.
153    Timeout,
154    /// Maximum hops exceeded.
155    MaxHopsExceeded,
156    /// Cross-tenant send rejected (also surfaces as
157    /// [`crate::error::RingKernelError::TenantMismatch`]).
158    TenantMismatch,
159}
160
161/// K2K endpoint for a single kernel.
162pub struct K2KEndpoint {
163    /// Kernel ID.
164    pub(crate) kernel_id: KernelId,
165    /// Incoming message channel.
166    receiver: mpsc::Receiver<K2KMessage>,
167    /// Reference to the broker.
168    broker: Arc<K2KBroker>,
169}
170
171impl K2KEndpoint {
172    /// Receive a K2K message (blocking).
173    pub async fn receive(&mut self) -> Option<K2KMessage> {
174        self.receiver.recv().await
175    }
176
177    /// Try to receive a K2K message (non-blocking).
178    pub fn try_receive(&mut self) -> Option<K2KMessage> {
179        self.receiver.try_recv().ok()
180    }
181
182    /// Send a message to another kernel.
183    pub async fn send(
184        &self,
185        destination: KernelId,
186        envelope: MessageEnvelope,
187    ) -> Result<DeliveryReceipt> {
188        self.broker
189            .send(self.kernel_id.clone(), destination, envelope)
190            .await
191    }
192
193    /// Send a high-priority message.
194    pub async fn send_priority(
195        &self,
196        destination: KernelId,
197        envelope: MessageEnvelope,
198        priority: u8,
199    ) -> Result<DeliveryReceipt> {
200        self.broker
201            .send_priority(self.kernel_id.clone(), destination, envelope, priority)
202            .await
203    }
204
205    /// Get pending message count.
206    pub fn pending_count(&self) -> usize {
207        // Note: This is an estimate since the channel may be modified concurrently
208        0 // mpsc doesn't provide len() directly
209    }
210}
211
212// ============================================================================
213// K2KSubBroker — per-tenant routing table
214// ============================================================================
215
216/// Per-tenant sub-broker.
217///
218/// Each `K2KSubBroker` is an independent routing domain: its endpoint map and
219/// indirect-routing table are *not* visible to any other tenant. Cross-tenant
220/// sends therefore have no possible route and are rejected by the parent
221/// [`K2KBroker`] before they reach any sub-broker.
222pub struct K2KSubBroker {
223    /// The tenant this sub-broker serves. `0` = unspecified (legacy).
224    tenant_id: TenantId,
225    /// Registered endpoints (kernel_id -> sender).
226    endpoints: RwLock<HashMap<KernelId, mpsc::Sender<K2KMessage>>>,
227    /// Indirect routing table (destination -> next-hop).
228    routing_table: RwLock<HashMap<KernelId, KernelId>>,
229    /// Default audit tag applied to messages if the sender doesn't provide
230    /// one (per-kernel tag, set at registration).
231    kernel_audit_tags: RwLock<HashMap<KernelId, AuditTag>>,
232    /// Messages successfully delivered from this sub-broker.
233    messages_delivered: AtomicU64,
234}
235
236impl K2KSubBroker {
237    fn new(tenant_id: TenantId) -> Self {
238        Self {
239            tenant_id,
240            endpoints: RwLock::new(HashMap::new()),
241            routing_table: RwLock::new(HashMap::new()),
242            kernel_audit_tags: RwLock::new(HashMap::new()),
243            messages_delivered: AtomicU64::new(0),
244        }
245    }
246
247    /// Tenant this sub-broker belongs to.
248    pub fn tenant_id(&self) -> TenantId {
249        self.tenant_id
250    }
251
252    /// Number of kernels registered in this sub-broker.
253    pub fn endpoint_count(&self) -> usize {
254        self.endpoints.read().len()
255    }
256
257    /// Number of messages successfully delivered by this sub-broker.
258    pub fn messages_delivered(&self) -> u64 {
259        self.messages_delivered.load(Ordering::Relaxed)
260    }
261
262    /// Returns `true` if this sub-broker has a route (direct or indirect)
263    /// for `kernel_id`.
264    pub fn knows(&self, kernel_id: &KernelId) -> bool {
265        self.endpoints.read().contains_key(kernel_id)
266            || self.routing_table.read().contains_key(kernel_id)
267    }
268
269    /// Get the audit tag associated with a kernel at registration time, or
270    /// [`AuditTag::unspecified`] if none was set.
271    pub fn audit_tag_for(&self, kernel_id: &KernelId) -> AuditTag {
272        self.kernel_audit_tags
273            .read()
274            .get(kernel_id)
275            .copied()
276            .unwrap_or_else(AuditTag::unspecified)
277    }
278}
279
280// ============================================================================
281// K2KBroker — per-tenant sub-broker aggregator
282// ============================================================================
283
284/// K2K message broker with per-tenant isolation.
285///
286/// Internally holds a `HashMap<TenantId, K2KSubBroker>`. Single-tenant
287/// deployments see exactly one entry (for `UNSPECIFIED_TENANT = 0`); the
288/// only overhead over the legacy broker is one additional HashMap lookup
289/// per send (~20 ns).
290pub struct K2KBroker {
291    /// Configuration (shared across sub-brokers).
292    config: K2KConfig,
293    /// Per-tenant sub-brokers.
294    tenants: RwLock<HashMap<TenantId, Arc<K2KSubBroker>>>,
295    /// Reverse index: which tenant does a kernel belong to?
296    kernel_tenant: RwLock<HashMap<KernelId, TenantId>>,
297    /// Tenant registry (quotas, audit sink, engagement cost tracking).
298    registry: Arc<TenantRegistry>,
299    /// Delivery receipts (for acknowledgment).
300    receipts: RwLock<HashMap<MessageId, DeliveryReceipt>>,
301    /// Global message counter (sum across all sub-brokers, for legacy stats).
302    message_counter: AtomicU64,
303    /// Counter of cross-tenant attempts rejected.
304    cross_tenant_rejections: AtomicU64,
305}
306
307impl K2KBroker {
308    /// Create a new K2K broker with an empty [`TenantRegistry`].
309    pub fn new(config: K2KConfig) -> Arc<Self> {
310        Self::with_registry(config, Arc::new(TenantRegistry::new()))
311    }
312
313    /// Create a new K2K broker sharing the given [`TenantRegistry`].
314    ///
315    /// This is the multi-tenant constructor: wire up the registry (with its
316    /// audit sink) once, then pass the `Arc` to every broker that should
317    /// enforce against it.
318    pub fn with_registry(config: K2KConfig, registry: Arc<TenantRegistry>) -> Arc<Self> {
319        let mut tenants = HashMap::new();
320        // Always pre-create the unspecified-tenant sub-broker so legacy
321        // single-tenant callers hit it directly with no `or_insert_with` in
322        // the hot path.
323        tenants.insert(
324            UNSPECIFIED_TENANT,
325            Arc::new(K2KSubBroker::new(UNSPECIFIED_TENANT)),
326        );
327        Arc::new(Self {
328            config,
329            tenants: RwLock::new(tenants),
330            kernel_tenant: RwLock::new(HashMap::new()),
331            registry,
332            receipts: RwLock::new(HashMap::new()),
333            message_counter: AtomicU64::new(0),
334            cross_tenant_rejections: AtomicU64::new(0),
335        })
336    }
337
338    /// Shared reference to the tenant registry.
339    pub fn registry(&self) -> &Arc<TenantRegistry> {
340        &self.registry
341    }
342
343    /// Total number of active tenant sub-brokers.
344    pub fn tenant_count(&self) -> usize {
345        self.tenants.read().len()
346    }
347
348    /// Get the sub-broker for a tenant, if it exists.
349    pub fn sub_broker(&self, tenant_id: TenantId) -> Option<Arc<K2KSubBroker>> {
350        self.tenants.read().get(&tenant_id).cloned()
351    }
352
353    /// Register a kernel without a tenant / audit tag (legacy API).
354    ///
355    /// The kernel is placed in the unspecified-tenant sub-broker
356    /// (`UNSPECIFIED_TENANT = 0`). This is the backward-compatible entry
357    /// point for single-tenant deployments — existing callers don't need to
358    /// change anything.
359    pub fn register(self: &Arc<Self>, kernel_id: KernelId) -> K2KEndpoint {
360        self.register_tenant(UNSPECIFIED_TENANT, AuditTag::unspecified(), kernel_id)
361    }
362
363    /// Register a kernel into the given tenant's sub-broker, stamping the
364    /// [`AuditTag`] that will be applied to outgoing messages from this
365    /// kernel.
366    ///
367    /// If the tenant doesn't yet have a sub-broker, one is created lazily.
368    /// If the kernel was previously registered under a different tenant,
369    /// the old registration is replaced (the old mpsc sender is dropped —
370    /// in-flight messages to the old registration are lost). This preserves
371    /// the invariant that a kernel belongs to exactly one tenant.
372    pub fn register_tenant(
373        self: &Arc<Self>,
374        tenant_id: TenantId,
375        audit_tag: AuditTag,
376        kernel_id: KernelId,
377    ) -> K2KEndpoint {
378        let (sender, receiver) = mpsc::channel(self.config.max_pending_messages);
379
380        let sub = {
381            let mut tenants = self.tenants.write();
382            tenants
383                .entry(tenant_id)
384                .or_insert_with(|| Arc::new(K2KSubBroker::new(tenant_id)))
385                .clone()
386        };
387
388        let mut kernel_tenant = self.kernel_tenant.write();
389        if let Some(prev_tenant) = kernel_tenant.get(&kernel_id).copied() {
390            if prev_tenant != tenant_id {
391                if let Some(prev_sub) = self.tenants.read().get(&prev_tenant).cloned() {
392                    prev_sub.endpoints.write().remove(&kernel_id);
393                    prev_sub.kernel_audit_tags.write().remove(&kernel_id);
394                    prev_sub.routing_table.write().remove(&kernel_id);
395                }
396            }
397        }
398        kernel_tenant.insert(kernel_id.clone(), tenant_id);
399        drop(kernel_tenant);
400
401        sub.endpoints.write().insert(kernel_id.clone(), sender);
402        sub.kernel_audit_tags
403            .write()
404            .insert(kernel_id.clone(), audit_tag);
405
406        K2KEndpoint {
407            kernel_id,
408            receiver,
409            broker: Arc::clone(self),
410        }
411    }
412
413    /// Unregister a kernel from whichever tenant it belongs to.
414    pub fn unregister(&self, kernel_id: &KernelId) {
415        let tenant_id = self.kernel_tenant.write().remove(kernel_id);
416        if let Some(tenant_id) = tenant_id {
417            if let Some(sub) = self.tenants.read().get(&tenant_id).cloned() {
418                sub.endpoints.write().remove(kernel_id);
419                sub.kernel_audit_tags.write().remove(kernel_id);
420                sub.routing_table.write().remove(kernel_id);
421            }
422        }
423    }
424
425    /// Check if a kernel is registered (under any tenant).
426    pub fn is_registered(&self, kernel_id: &KernelId) -> bool {
427        self.kernel_tenant.read().contains_key(kernel_id)
428    }
429
430    /// Get the tenant this kernel belongs to, if registered.
431    pub fn tenant_of(&self, kernel_id: &KernelId) -> Option<TenantId> {
432        self.kernel_tenant.read().get(kernel_id).copied()
433    }
434
435    /// All registered kernel IDs (across every tenant).
436    pub fn registered_kernels(&self) -> Vec<KernelId> {
437        self.kernel_tenant.read().keys().cloned().collect()
438    }
439
440    /// Kernel IDs registered under a specific tenant.
441    pub fn registered_kernels_for(&self, tenant_id: TenantId) -> Vec<KernelId> {
442        self.tenants
443            .read()
444            .get(&tenant_id)
445            .map(|sub| sub.endpoints.read().keys().cloned().collect())
446            .unwrap_or_default()
447    }
448
449    /// Send a message from one kernel to another (normal priority).
450    ///
451    /// Enforces tenant isolation: the sender's and destination's tenants
452    /// must match, or the send is rejected with
453    /// [`crate::error::RingKernelError::TenantMismatch`] and an audit event
454    /// is emitted.
455    pub async fn send(
456        &self,
457        source: KernelId,
458        destination: KernelId,
459        envelope: MessageEnvelope,
460    ) -> Result<DeliveryReceipt> {
461        self.send_priority(source, destination, envelope, 0).await
462    }
463
464    /// Send a priority message.
465    pub async fn send_priority(
466        &self,
467        source: KernelId,
468        destination: KernelId,
469        envelope: MessageEnvelope,
470        priority: u8,
471    ) -> Result<DeliveryReceipt> {
472        // Resolve source tenant (sender must be registered).
473        let source_tenant = self
474            .kernel_tenant
475            .read()
476            .get(&source)
477            .copied()
478            .unwrap_or(UNSPECIFIED_TENANT);
479
480        // Resolve destination tenant. If destination is unregistered we
481        // treat it as residing in the *sender's* tenant — that way the
482        // resulting delivery attempt will hit NotFound inside the sender's
483        // own sub-broker (preserving the legacy error behavior) rather than
484        // being misclassified as cross-tenant.
485        let dest_tenant = self
486            .kernel_tenant
487            .read()
488            .get(&destination)
489            .copied()
490            .unwrap_or(source_tenant);
491
492        // Cross-tenant check.
493        if source_tenant != dest_tenant {
494            self.cross_tenant_rejections.fetch_add(1, Ordering::Relaxed);
495            self.registry.audit_cross_tenant(
496                source_tenant,
497                dest_tenant,
498                source.as_str(),
499                destination.as_str(),
500                envelope.audit_tag,
501            );
502            return Err(RingKernelError::TenantMismatch {
503                from: source_tenant,
504                to: dest_tenant,
505            });
506        }
507
508        // Quota check (observability-driven — UNSPECIFIED_TENANT is a no-op).
509        self.registry
510            .check_quota(source_tenant, envelope.audit_tag)?;
511        self.registry.record_message(source_tenant);
512
513        // Stamp envelope with sender-derived audit tag if caller didn't
514        // supply one.
515        let mut envelope = envelope;
516        envelope.tenant_id = source_tenant;
517        if envelope.audit_tag.is_unspecified() {
518            let sub = self
519                .tenants
520                .read()
521                .get(&source_tenant)
522                .cloned()
523                .expect("tenant sub-broker must exist for registered sender");
524            envelope.audit_tag = sub.audit_tag_for(&source);
525        }
526
527        let timestamp = envelope.header.timestamp;
528        let mut message = K2KMessage::new(source.clone(), destination.clone(), envelope, timestamp);
529        message.priority = priority;
530
531        self.deliver_in(source_tenant, message).await
532    }
533
534    /// Send a message stamped with an explicit audit tag (overriding the
535    /// registration-time tag).
536    ///
537    /// Useful for the same kernel participating in multiple engagements —
538    /// e.g. a shared report-generation kernel that should bill different
539    /// engagements depending on which request it's serving.
540    pub async fn send_with_audit(
541        &self,
542        source: KernelId,
543        destination: KernelId,
544        envelope: MessageEnvelope,
545        audit_tag: AuditTag,
546    ) -> Result<DeliveryReceipt> {
547        let envelope = envelope.with_audit_tag(audit_tag);
548        self.send(source, destination, envelope).await
549    }
550
551    /// Deliver a message inside a specific sub-broker.
552    async fn deliver_in(
553        &self,
554        tenant_id: TenantId,
555        message: K2KMessage,
556    ) -> Result<DeliveryReceipt> {
557        let sub = self
558            .tenants
559            .read()
560            .get(&tenant_id)
561            .cloned()
562            .ok_or_else(|| {
563                RingKernelError::K2KError(format!(
564                    "tenant sub-broker {} disappeared mid-send",
565                    tenant_id
566                ))
567            })?;
568
569        let message_id = message.id;
570        let source = message.source.clone();
571        let destination = message.destination.clone();
572        let timestamp = message.sent_at;
573
574        // Try direct delivery first.
575        let endpoints = sub.endpoints.read();
576        if let Some(sender) = endpoints.get(&destination) {
577            match sender.try_send(message) {
578                Ok(()) => {
579                    sub.messages_delivered.fetch_add(1, Ordering::Relaxed);
580                    self.message_counter.fetch_add(1, Ordering::Relaxed);
581                    let receipt = DeliveryReceipt {
582                        message_id,
583                        source,
584                        destination,
585                        status: DeliveryStatus::Delivered,
586                        timestamp,
587                    };
588                    self.receipts.write().insert(message_id, receipt.clone());
589                    return Ok(receipt);
590                }
591                Err(mpsc::error::TrySendError::Full(_)) => {
592                    return Ok(DeliveryReceipt {
593                        message_id,
594                        source,
595                        destination,
596                        status: DeliveryStatus::QueueFull,
597                        timestamp,
598                    });
599                }
600                Err(mpsc::error::TrySendError::Closed(_)) => {
601                    return Ok(DeliveryReceipt {
602                        message_id,
603                        source,
604                        destination,
605                        status: DeliveryStatus::NotFound,
606                        timestamp,
607                    });
608                }
609            }
610        }
611        drop(endpoints);
612
613        // Try indirect routing (within the same tenant only — there is no
614        // cross-tenant routing by construction).
615        let next_hop = sub.routing_table.read().get(&destination).cloned();
616        if let Some(next_hop) = next_hop {
617            let routed_message = K2KMessage {
618                id: message_id,
619                source: source.clone(),
620                destination: destination.clone(),
621                envelope: message.envelope,
622                hops: message.hops + 1,
623                sent_at: message.sent_at,
624                priority: message.priority,
625            };
626
627            if routed_message.hops > self.config.max_hops {
628                return Ok(DeliveryReceipt {
629                    message_id,
630                    source,
631                    destination,
632                    status: DeliveryStatus::MaxHopsExceeded,
633                    timestamp,
634                });
635            }
636
637            let endpoints = sub.endpoints.read();
638            if let Some(sender) = endpoints.get(&next_hop) {
639                if sender.try_send(routed_message).is_ok() {
640                    sub.messages_delivered.fetch_add(1, Ordering::Relaxed);
641                    self.message_counter.fetch_add(1, Ordering::Relaxed);
642                    return Ok(DeliveryReceipt {
643                        message_id,
644                        source,
645                        destination,
646                        status: DeliveryStatus::Pending,
647                        timestamp,
648                    });
649                }
650            }
651        }
652
653        // Destination not found within the tenant.
654        Ok(DeliveryReceipt {
655            message_id,
656            source,
657            destination,
658            status: DeliveryStatus::NotFound,
659            timestamp,
660        })
661    }
662
663    /// Add an indirect route inside the unspecified-tenant sub-broker
664    /// (legacy API — single-tenant callers should use this).
665    pub fn add_route(&self, destination: KernelId, next_hop: KernelId) {
666        self.add_route_in(UNSPECIFIED_TENANT, destination, next_hop);
667    }
668
669    /// Add an indirect route inside a specific tenant's sub-broker.
670    pub fn add_route_in(&self, tenant_id: TenantId, destination: KernelId, next_hop: KernelId) {
671        let sub = {
672            let mut tenants = self.tenants.write();
673            tenants
674                .entry(tenant_id)
675                .or_insert_with(|| Arc::new(K2KSubBroker::new(tenant_id)))
676                .clone()
677        };
678        sub.routing_table.write().insert(destination, next_hop);
679    }
680
681    /// Remove an indirect route from the unspecified-tenant sub-broker.
682    pub fn remove_route(&self, destination: &KernelId) {
683        self.remove_route_in(UNSPECIFIED_TENANT, destination);
684    }
685
686    /// Remove an indirect route from a specific tenant's sub-broker.
687    pub fn remove_route_in(&self, tenant_id: TenantId, destination: &KernelId) {
688        if let Some(sub) = self.tenants.read().get(&tenant_id).cloned() {
689            sub.routing_table.write().remove(destination);
690        }
691    }
692
693    /// Get aggregate stats across all sub-brokers.
694    pub fn stats(&self) -> K2KStats {
695        let tenants = self.tenants.read();
696        let mut registered = 0usize;
697        let mut routes = 0usize;
698        for sub in tenants.values() {
699            registered += sub.endpoints.read().len();
700            routes += sub.routing_table.read().len();
701        }
702        K2KStats {
703            registered_endpoints: registered,
704            messages_delivered: self.message_counter.load(Ordering::Relaxed),
705            routes_configured: routes,
706            tenant_count: tenants.len(),
707            cross_tenant_rejections: self.cross_tenant_rejections.load(Ordering::Relaxed),
708        }
709    }
710
711    /// Get per-tenant stats (useful for billing / dashboards).
712    pub fn tenant_stats(&self, tenant_id: TenantId) -> Option<TenantStats> {
713        self.tenants.read().get(&tenant_id).map(|sub| TenantStats {
714            tenant_id,
715            registered_endpoints: sub.endpoints.read().len(),
716            routes_configured: sub.routing_table.read().len(),
717            messages_delivered: sub.messages_delivered.load(Ordering::Relaxed),
718        })
719    }
720
721    /// Get delivery receipt for a message.
722    pub fn get_receipt(&self, message_id: &MessageId) -> Option<DeliveryReceipt> {
723        self.receipts.read().get(message_id).cloned()
724    }
725}
726
727/// K2K messaging statistics (aggregate across all tenants).
728#[derive(Debug, Clone, Default)]
729pub struct K2KStats {
730    /// Total number of registered endpoints across all tenants.
731    pub registered_endpoints: usize,
732    /// Total messages delivered (all tenants).
733    pub messages_delivered: u64,
734    /// Total configured routes across all tenants.
735    pub routes_configured: usize,
736    /// Number of active tenant sub-brokers.
737    pub tenant_count: usize,
738    /// Cross-tenant send attempts rejected so far.
739    pub cross_tenant_rejections: u64,
740}
741
742/// Per-tenant stats (one entry per sub-broker).
743#[derive(Debug, Clone, Default)]
744pub struct TenantStats {
745    /// Tenant ID.
746    pub tenant_id: TenantId,
747    /// Endpoints registered under this tenant.
748    pub registered_endpoints: usize,
749    /// Routes configured under this tenant.
750    pub routes_configured: usize,
751    /// Messages delivered by this sub-broker.
752    pub messages_delivered: u64,
753}
754
755/// Builder for creating K2K infrastructure.
756pub struct K2KBuilder {
757    config: K2KConfig,
758    registry: Option<Arc<TenantRegistry>>,
759}
760
761impl K2KBuilder {
762    /// Create a new builder.
763    pub fn new() -> Self {
764        Self {
765            config: K2KConfig::default(),
766            registry: None,
767        }
768    }
769
770    /// Set maximum pending messages.
771    pub fn max_pending_messages(mut self, count: usize) -> Self {
772        self.config.max_pending_messages = count;
773        self
774    }
775
776    /// Set delivery timeout.
777    pub fn delivery_timeout_ms(mut self, timeout: u64) -> Self {
778        self.config.delivery_timeout_ms = timeout;
779        self
780    }
781
782    /// Enable message tracing.
783    pub fn enable_tracing(mut self, enable: bool) -> Self {
784        self.config.enable_tracing = enable;
785        self
786    }
787
788    /// Set maximum hop count.
789    pub fn max_hops(mut self, hops: u8) -> Self {
790        self.config.max_hops = hops;
791        self
792    }
793
794    /// Attach a shared [`TenantRegistry`] (for multi-tenant deployments).
795    pub fn with_registry(mut self, registry: Arc<TenantRegistry>) -> Self {
796        self.registry = Some(registry);
797        self
798    }
799
800    /// Build the K2K broker.
801    pub fn build(self) -> Arc<K2KBroker> {
802        match self.registry {
803            Some(registry) => K2KBroker::with_registry(self.config, registry),
804            None => K2KBroker::new(self.config),
805        }
806    }
807}
808
809impl Default for K2KBuilder {
810    fn default() -> Self {
811        Self::new()
812    }
813}
814
815// ============================================================================
816// K2K Message Type Registry (FR-3)
817// ============================================================================
818
819/// Registration information for a K2K-routable message type.
820///
821/// This struct is automatically generated by the `#[derive(RingMessage)]` macro
822/// when `k2k_routable = true` is specified. Registrations are collected at
823/// compile time using the `inventory` crate.
824///
825/// # Example
826///
827/// ```ignore
828/// #[derive(RingMessage)]
829/// #[ring_message(type_id = 1, domain = "OrderMatching", k2k_routable = true)]
830/// pub struct SubmitOrderInput { ... }
831///
832/// // Runtime discovery
833/// let registry = K2KTypeRegistry::discover();
834/// assert!(registry.is_routable(501)); // domain base (500) + type_id (1)
835/// ```
836#[derive(Debug, Clone)]
837pub struct K2KMessageRegistration {
838    /// Message type ID (from RingMessage::message_type()).
839    pub type_id: u64,
840    /// Full type name for debugging/logging.
841    pub type_name: &'static str,
842    /// Whether this message type is routable via K2K.
843    pub k2k_routable: bool,
844    /// Optional routing category for grouped routing.
845    pub category: Option<&'static str>,
846}
847
848// Collect all K2K message registrations at compile time
849inventory::collect!(K2KMessageRegistration);
850
851/// Registry for discovering K2K-routable message types at runtime.
852///
853/// The registry is built by scanning all `K2KMessageRegistration` entries
854/// submitted via the `inventory` crate. This enables runtime discovery of
855/// message types for routing, validation, and monitoring.
856///
857/// # Example
858///
859/// ```ignore
860/// let registry = K2KTypeRegistry::discover();
861///
862/// // Check if a type is routable
863/// if registry.is_routable(501) {
864///     // Allow K2K routing
865/// }
866///
867/// // Get all types in a category
868/// let order_types = registry.get_category("orders");
869/// for type_id in order_types {
870///     println!("Order message type: {}", type_id);
871/// }
872/// ```
873pub struct K2KTypeRegistry {
874    /// Type ID to registration mapping.
875    by_type_id: HashMap<u64, &'static K2KMessageRegistration>,
876    /// Type name to registration mapping.
877    by_type_name: HashMap<&'static str, &'static K2KMessageRegistration>,
878    /// Category to type IDs mapping.
879    by_category: HashMap<&'static str, Vec<u64>>,
880}
881
882impl K2KTypeRegistry {
883    /// Discover all registered K2K message types at runtime.
884    ///
885    /// This scans all `K2KMessageRegistration` entries that were submitted
886    /// via `inventory::submit!` during compilation.
887    pub fn discover() -> Self {
888        let mut registry = Self {
889            by_type_id: HashMap::new(),
890            by_type_name: HashMap::new(),
891            by_category: HashMap::new(),
892        };
893
894        for reg in inventory::iter::<K2KMessageRegistration>() {
895            registry.by_type_id.insert(reg.type_id, reg);
896            registry.by_type_name.insert(reg.type_name, reg);
897            if let Some(cat) = reg.category {
898                registry
899                    .by_category
900                    .entry(cat)
901                    .or_default()
902                    .push(reg.type_id);
903            }
904        }
905
906        registry
907    }
908
909    /// Check if a message type ID is K2K routable.
910    pub fn is_routable(&self, type_id: u64) -> bool {
911        self.by_type_id
912            .get(&type_id)
913            .map(|r| r.k2k_routable)
914            .unwrap_or(false)
915    }
916
917    /// Get registration by type ID.
918    pub fn get(&self, type_id: u64) -> Option<&'static K2KMessageRegistration> {
919        self.by_type_id.get(&type_id).copied()
920    }
921
922    /// Get registration by type name.
923    pub fn get_by_name(&self, type_name: &str) -> Option<&'static K2KMessageRegistration> {
924        self.by_type_name.get(type_name).copied()
925    }
926
927    /// Get all type IDs in a category.
928    pub fn get_category(&self, category: &str) -> &[u64] {
929        self.by_category
930            .get(category)
931            .map(|v| v.as_slice())
932            .unwrap_or(&[])
933    }
934
935    /// Get all registered categories.
936    pub fn categories(&self) -> impl Iterator<Item = &'static str> + '_ {
937        self.by_category.keys().copied()
938    }
939
940    /// Iterate all registered message types.
941    pub fn iter(&self) -> impl Iterator<Item = &'static K2KMessageRegistration> + '_ {
942        self.by_type_id.values().copied()
943    }
944
945    /// Get all routable type IDs.
946    pub fn routable_types(&self) -> Vec<u64> {
947        self.by_type_id
948            .iter()
949            .filter(|(_, r)| r.k2k_routable)
950            .map(|(id, _)| *id)
951            .collect()
952    }
953
954    /// Get total number of registered message types.
955    pub fn len(&self) -> usize {
956        self.by_type_id.len()
957    }
958
959    /// Check if the registry is empty.
960    pub fn is_empty(&self) -> bool {
961        self.by_type_id.is_empty()
962    }
963}
964
965impl Default for K2KTypeRegistry {
966    fn default() -> Self {
967        Self::discover()
968    }
969}
970
971impl std::fmt::Debug for K2KTypeRegistry {
972    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
973        f.debug_struct("K2KTypeRegistry")
974            .field("registered_types", &self.by_type_id.len())
975            .field("categories", &self.by_category.keys().collect::<Vec<_>>())
976            .finish()
977    }
978}
979
980// ============================================================================
981// K2K Message Encryption (Phase 4.2 - Enterprise Security)
982// ============================================================================
983
984/// Configuration for K2K message encryption.
985#[cfg(feature = "crypto")]
986#[derive(Debug, Clone)]
987pub struct K2KEncryptionConfig {
988    /// Enable message encryption.
989    pub enabled: bool,
990    /// Encryption algorithm.
991    pub algorithm: K2KEncryptionAlgorithm,
992    /// Enable forward secrecy via ephemeral keys.
993    pub forward_secrecy: bool,
994    /// Key rotation interval in seconds (0 = no rotation).
995    pub key_rotation_interval_secs: u64,
996    /// Whether to require encryption for all messages.
997    pub require_encryption: bool,
998}
999
1000#[cfg(feature = "crypto")]
1001impl Default for K2KEncryptionConfig {
1002    fn default() -> Self {
1003        Self {
1004            enabled: true,
1005            algorithm: K2KEncryptionAlgorithm::Aes256Gcm,
1006            forward_secrecy: true,
1007            key_rotation_interval_secs: 3600, // 1 hour
1008            require_encryption: false,
1009        }
1010    }
1011}
1012
1013#[cfg(feature = "crypto")]
1014impl K2KEncryptionConfig {
1015    /// Create a config with encryption disabled.
1016    pub fn disabled() -> Self {
1017        Self {
1018            enabled: false,
1019            ..Default::default()
1020        }
1021    }
1022
1023    /// Create a strict config requiring encryption for all messages.
1024    pub fn strict() -> Self {
1025        Self {
1026            enabled: true,
1027            require_encryption: true,
1028            forward_secrecy: true,
1029            ..Default::default()
1030        }
1031    }
1032}
1033
1034/// Supported K2K encryption algorithms.
1035#[cfg(feature = "crypto")]
1036#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1037pub enum K2KEncryptionAlgorithm {
1038    /// AES-256-GCM (NIST standard, hardware acceleration).
1039    Aes256Gcm,
1040    /// ChaCha20-Poly1305 (mobile/embedded friendly).
1041    ChaCha20Poly1305,
1042}
1043
1044/// Per-kernel encryption key material.
1045#[cfg(feature = "crypto")]
1046pub struct K2KKeyMaterial {
1047    /// Kernel ID this key belongs to.
1048    kernel_id: KernelId,
1049    /// Long-term key (for key exchange).
1050    long_term_key: [u8; 32],
1051    /// Current session key.
1052    session_key: parking_lot::RwLock<[u8; 32]>,
1053    /// Session key generation (for rotation).
1054    session_generation: std::sync::atomic::AtomicU64,
1055    /// Creation timestamp.
1056    created_at: std::time::Instant,
1057    /// Last rotation timestamp.
1058    last_rotated: parking_lot::RwLock<std::time::Instant>,
1059}
1060
1061#[cfg(feature = "crypto")]
1062impl K2KKeyMaterial {
1063    /// Create new key material for a kernel.
1064    pub fn new(kernel_id: KernelId) -> Self {
1065        use rand::RngCore;
1066        let mut rng = rand::thread_rng();
1067
1068        let mut long_term_key = [0u8; 32];
1069        let mut session_key = [0u8; 32];
1070        rng.fill_bytes(&mut long_term_key);
1071        rng.fill_bytes(&mut session_key);
1072
1073        let now = std::time::Instant::now();
1074        Self {
1075            kernel_id,
1076            long_term_key,
1077            session_key: parking_lot::RwLock::new(session_key),
1078            session_generation: std::sync::atomic::AtomicU64::new(1),
1079            created_at: now,
1080            last_rotated: parking_lot::RwLock::new(now),
1081        }
1082    }
1083
1084    /// Create key material from an existing long-term key.
1085    pub fn from_key(kernel_id: KernelId, key: [u8; 32]) -> Self {
1086        use rand::RngCore;
1087        let mut rng = rand::thread_rng();
1088
1089        let mut session_key = [0u8; 32];
1090        rng.fill_bytes(&mut session_key);
1091
1092        let now = std::time::Instant::now();
1093        Self {
1094            kernel_id,
1095            long_term_key: key,
1096            session_key: parking_lot::RwLock::new(session_key),
1097            session_generation: std::sync::atomic::AtomicU64::new(1),
1098            created_at: now,
1099            last_rotated: parking_lot::RwLock::new(now),
1100        }
1101    }
1102
1103    /// Get the kernel ID.
1104    pub fn kernel_id(&self) -> &KernelId {
1105        &self.kernel_id
1106    }
1107
1108    /// Get the current session key.
1109    pub fn session_key(&self) -> [u8; 32] {
1110        *self.session_key.read()
1111    }
1112
1113    /// Get the current session generation.
1114    pub fn session_generation(&self) -> u64 {
1115        self.session_generation
1116            .load(std::sync::atomic::Ordering::Acquire)
1117    }
1118
1119    /// Rotate the session key.
1120    pub fn rotate_session_key(&self) {
1121        use rand::RngCore;
1122        let mut rng = rand::thread_rng();
1123
1124        let mut new_key = [0u8; 32];
1125        rng.fill_bytes(&mut new_key);
1126
1127        *self.session_key.write() = new_key;
1128        self.session_generation
1129            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
1130        *self.last_rotated.write() = std::time::Instant::now();
1131    }
1132
1133    /// Derive a shared secret for a destination kernel.
1134    pub fn derive_shared_secret(&self, dest_public_key: &[u8; 32]) -> [u8; 32] {
1135        use sha2::{Digest, Sha256};
1136
1137        // Simple key derivation: HKDF-like construction
1138        // In production, use proper X25519 key exchange
1139        let mut hasher = Sha256::new();
1140        hasher.update(&self.long_term_key);
1141        hasher.update(dest_public_key);
1142        hasher.update(b"k2k-shared-secret-v1");
1143
1144        let result = hasher.finalize();
1145        let mut secret = [0u8; 32];
1146        secret.copy_from_slice(&result);
1147        secret
1148    }
1149
1150    /// Check if session key should be rotated.
1151    pub fn should_rotate(&self, interval_secs: u64) -> bool {
1152        if interval_secs == 0 {
1153            return false;
1154        }
1155        let elapsed = self.last_rotated.read().elapsed();
1156        elapsed.as_secs() >= interval_secs
1157    }
1158
1159    /// Get key material age.
1160    pub fn age(&self) -> std::time::Duration {
1161        self.created_at.elapsed()
1162    }
1163}
1164
1165#[cfg(feature = "crypto")]
1166impl Drop for K2KKeyMaterial {
1167    fn drop(&mut self) {
1168        // Securely zero key material
1169        use zeroize::Zeroize;
1170        self.long_term_key.zeroize();
1171        self.session_key.write().zeroize();
1172    }
1173}
1174
1175/// Encrypted K2K message wrapper.
1176#[cfg(feature = "crypto")]
1177#[derive(Debug, Clone)]
1178pub struct EncryptedK2KMessage {
1179    /// Original message metadata (unencrypted for routing).
1180    pub id: MessageId,
1181    /// Source kernel.
1182    pub source: KernelId,
1183    /// Destination kernel.
1184    pub destination: KernelId,
1185    /// Hop count.
1186    pub hops: u8,
1187    /// Timestamp when message was sent.
1188    pub sent_at: HlcTimestamp,
1189    /// Priority.
1190    pub priority: u8,
1191    /// Session key generation used for encryption.
1192    pub key_generation: u64,
1193    /// Encryption nonce (96 bits for AES-GCM).
1194    pub nonce: [u8; 12],
1195    /// Encrypted envelope data.
1196    pub ciphertext: Vec<u8>,
1197    /// Authentication tag.
1198    pub tag: [u8; 16],
1199}
1200
1201/// K2K encryption manager for a single kernel.
1202#[cfg(feature = "crypto")]
1203pub struct K2KEncryptor {
1204    /// Configuration.
1205    config: K2KEncryptionConfig,
1206    /// This kernel's key material.
1207    key_material: K2KKeyMaterial,
1208    /// Peer public keys.
1209    peer_keys: parking_lot::RwLock<HashMap<KernelId, [u8; 32]>>,
1210    /// Encryption stats.
1211    stats: K2KEncryptionStats,
1212}
1213
1214#[cfg(feature = "crypto")]
1215impl K2KEncryptor {
1216    /// Create a new encryptor for a kernel.
1217    pub fn new(kernel_id: KernelId, config: K2KEncryptionConfig) -> Self {
1218        Self {
1219            config,
1220            key_material: K2KKeyMaterial::new(kernel_id),
1221            peer_keys: parking_lot::RwLock::new(HashMap::new()),
1222            stats: K2KEncryptionStats::default(),
1223        }
1224    }
1225
1226    /// Create with existing key material.
1227    pub fn with_key(kernel_id: KernelId, key: [u8; 32], config: K2KEncryptionConfig) -> Self {
1228        Self {
1229            config,
1230            key_material: K2KKeyMaterial::from_key(kernel_id, key),
1231            peer_keys: parking_lot::RwLock::new(HashMap::new()),
1232            stats: K2KEncryptionStats::default(),
1233        }
1234    }
1235
1236    /// Get this kernel's public key.
1237    pub fn public_key(&self) -> [u8; 32] {
1238        // In production, derive public key properly (e.g., X25519)
1239        // For now, use a hash of the long-term key
1240        use sha2::{Digest, Sha256};
1241        let mut hasher = Sha256::new();
1242        hasher.update(&self.key_material.long_term_key);
1243        hasher.update(b"k2k-public-key-v1");
1244        let result = hasher.finalize();
1245        let mut public = [0u8; 32];
1246        public.copy_from_slice(&result);
1247        public
1248    }
1249
1250    /// Register a peer's public key.
1251    pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
1252        self.peer_keys.write().insert(kernel_id, public_key);
1253    }
1254
1255    /// Unregister a peer.
1256    pub fn unregister_peer(&self, kernel_id: &KernelId) {
1257        self.peer_keys.write().remove(kernel_id);
1258    }
1259
1260    /// Check and perform key rotation if needed.
1261    pub fn maybe_rotate(&self) {
1262        if self
1263            .key_material
1264            .should_rotate(self.config.key_rotation_interval_secs)
1265        {
1266            self.key_material.rotate_session_key();
1267            self.stats
1268                .key_rotations
1269                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1270        }
1271    }
1272
1273    /// Encrypt a K2K message.
1274    pub fn encrypt(&self, message: &K2KMessage) -> Result<EncryptedK2KMessage> {
1275        if !self.config.enabled {
1276            return Err(RingKernelError::K2KError(
1277                "K2K encryption is disabled".to_string(),
1278            ));
1279        }
1280
1281        // Get peer public key
1282        let peer_key = self
1283            .peer_keys
1284            .read()
1285            .get(&message.destination)
1286            .copied()
1287            .ok_or_else(|| {
1288                RingKernelError::K2KError(format!(
1289                    "No public key registered for destination kernel: {}",
1290                    message.destination
1291                ))
1292            })?;
1293
1294        // Derive encryption key
1295        let shared_secret = self.key_material.derive_shared_secret(&peer_key);
1296        let session_key = if self.config.forward_secrecy {
1297            // Mix in session key for forward secrecy
1298            use sha2::{Digest, Sha256};
1299            let mut hasher = Sha256::new();
1300            hasher.update(&shared_secret);
1301            hasher.update(&self.key_material.session_key());
1302            let result = hasher.finalize();
1303            let mut key = [0u8; 32];
1304            key.copy_from_slice(&result);
1305            key
1306        } else {
1307            shared_secret
1308        };
1309
1310        // Generate nonce
1311        use rand::RngCore;
1312        let mut nonce = [0u8; 12];
1313        rand::thread_rng().fill_bytes(&mut nonce);
1314
1315        // Serialize envelope
1316        let envelope_bytes = message.envelope.to_bytes();
1317
1318        // Encrypt based on algorithm
1319        let (ciphertext, tag) = match self.config.algorithm {
1320            K2KEncryptionAlgorithm::Aes256Gcm => {
1321                use aes_gcm::{
1322                    aead::{Aead, KeyInit},
1323                    Aes256Gcm, Nonce,
1324                };
1325                let cipher = Aes256Gcm::new_from_slice(&session_key)
1326                    .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
1327
1328                let nonce_obj = Nonce::from_slice(&nonce);
1329                let ciphertext = cipher
1330                    .encrypt(nonce_obj, envelope_bytes.as_slice())
1331                    .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
1332
1333                // AES-GCM appends tag to ciphertext
1334                let tag_start = ciphertext.len() - 16;
1335                let mut tag = [0u8; 16];
1336                tag.copy_from_slice(&ciphertext[tag_start..]);
1337                (ciphertext[..tag_start].to_vec(), tag)
1338            }
1339            K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
1340                use chacha20poly1305::{
1341                    aead::{Aead, KeyInit},
1342                    ChaCha20Poly1305, Nonce,
1343                };
1344                let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
1345                    .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
1346
1347                let nonce_obj = Nonce::from_slice(&nonce);
1348                let ciphertext = cipher
1349                    .encrypt(nonce_obj, envelope_bytes.as_slice())
1350                    .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
1351
1352                let tag_start = ciphertext.len() - 16;
1353                let mut tag = [0u8; 16];
1354                tag.copy_from_slice(&ciphertext[tag_start..]);
1355                (ciphertext[..tag_start].to_vec(), tag)
1356            }
1357        };
1358
1359        self.stats
1360            .messages_encrypted
1361            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1362        self.stats.bytes_encrypted.fetch_add(
1363            envelope_bytes.len() as u64,
1364            std::sync::atomic::Ordering::Relaxed,
1365        );
1366
1367        Ok(EncryptedK2KMessage {
1368            id: message.id,
1369            source: message.source.clone(),
1370            destination: message.destination.clone(),
1371            hops: message.hops,
1372            sent_at: message.sent_at,
1373            priority: message.priority,
1374            key_generation: self.key_material.session_generation(),
1375            nonce,
1376            ciphertext,
1377            tag,
1378        })
1379    }
1380
1381    /// Decrypt an encrypted K2K message.
1382    pub fn decrypt(&self, encrypted: &EncryptedK2KMessage) -> Result<K2KMessage> {
1383        if !self.config.enabled {
1384            return Err(RingKernelError::K2KError(
1385                "K2K encryption is disabled".to_string(),
1386            ));
1387        }
1388
1389        // Get peer public key
1390        let peer_key = self
1391            .peer_keys
1392            .read()
1393            .get(&encrypted.source)
1394            .copied()
1395            .ok_or_else(|| {
1396                RingKernelError::K2KError(format!(
1397                    "No public key registered for source kernel: {}",
1398                    encrypted.source
1399                ))
1400            })?;
1401
1402        // Derive decryption key
1403        let shared_secret = self.key_material.derive_shared_secret(&peer_key);
1404        let session_key = if self.config.forward_secrecy {
1405            use sha2::{Digest, Sha256};
1406            let mut hasher = Sha256::new();
1407            hasher.update(&shared_secret);
1408            hasher.update(&self.key_material.session_key());
1409            let result = hasher.finalize();
1410            let mut key = [0u8; 32];
1411            key.copy_from_slice(&result);
1412            key
1413        } else {
1414            shared_secret
1415        };
1416
1417        // Reconstruct ciphertext with tag appended
1418        let mut full_ciphertext = encrypted.ciphertext.clone();
1419        full_ciphertext.extend_from_slice(&encrypted.tag);
1420
1421        // Decrypt based on algorithm
1422        let plaintext = match self.config.algorithm {
1423            K2KEncryptionAlgorithm::Aes256Gcm => {
1424                use aes_gcm::{
1425                    aead::{Aead, KeyInit},
1426                    Aes256Gcm, Nonce,
1427                };
1428                let cipher = Aes256Gcm::new_from_slice(&session_key)
1429                    .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
1430
1431                let nonce = Nonce::from_slice(&encrypted.nonce);
1432                cipher
1433                    .decrypt(nonce, full_ciphertext.as_slice())
1434                    .map_err(|e| {
1435                        self.stats
1436                            .decryption_failures
1437                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1438                        RingKernelError::K2KError(format!("Decryption failed: {}", e))
1439                    })?
1440            }
1441            K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
1442                use chacha20poly1305::{
1443                    aead::{Aead, KeyInit},
1444                    ChaCha20Poly1305, Nonce,
1445                };
1446                let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
1447                    .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
1448
1449                let nonce = Nonce::from_slice(&encrypted.nonce);
1450                cipher
1451                    .decrypt(nonce, full_ciphertext.as_slice())
1452                    .map_err(|e| {
1453                        self.stats
1454                            .decryption_failures
1455                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1456                        RingKernelError::K2KError(format!("Decryption failed: {}", e))
1457                    })?
1458            }
1459        };
1460
1461        // Deserialize envelope
1462        let envelope = MessageEnvelope::from_bytes(&plaintext).map_err(|e| {
1463            RingKernelError::K2KError(format!("Envelope deserialization failed: {}", e))
1464        })?;
1465
1466        self.stats
1467            .messages_decrypted
1468            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1469        self.stats
1470            .bytes_decrypted
1471            .fetch_add(plaintext.len() as u64, std::sync::atomic::Ordering::Relaxed);
1472
1473        Ok(K2KMessage {
1474            id: encrypted.id,
1475            source: encrypted.source.clone(),
1476            destination: encrypted.destination.clone(),
1477            envelope,
1478            hops: encrypted.hops,
1479            sent_at: encrypted.sent_at,
1480            priority: encrypted.priority,
1481        })
1482    }
1483
1484    /// Get encryption statistics.
1485    pub fn stats(&self) -> K2KEncryptionStatsSnapshot {
1486        K2KEncryptionStatsSnapshot {
1487            messages_encrypted: self
1488                .stats
1489                .messages_encrypted
1490                .load(std::sync::atomic::Ordering::Relaxed),
1491            messages_decrypted: self
1492                .stats
1493                .messages_decrypted
1494                .load(std::sync::atomic::Ordering::Relaxed),
1495            bytes_encrypted: self
1496                .stats
1497                .bytes_encrypted
1498                .load(std::sync::atomic::Ordering::Relaxed),
1499            bytes_decrypted: self
1500                .stats
1501                .bytes_decrypted
1502                .load(std::sync::atomic::Ordering::Relaxed),
1503            key_rotations: self
1504                .stats
1505                .key_rotations
1506                .load(std::sync::atomic::Ordering::Relaxed),
1507            decryption_failures: self
1508                .stats
1509                .decryption_failures
1510                .load(std::sync::atomic::Ordering::Relaxed),
1511            peer_count: self.peer_keys.read().len(),
1512            session_generation: self.key_material.session_generation(),
1513        }
1514    }
1515
1516    /// Get configuration.
1517    pub fn config(&self) -> &K2KEncryptionConfig {
1518        &self.config
1519    }
1520}
1521
1522/// K2K encryption statistics (atomic counters).
1523#[cfg(feature = "crypto")]
1524#[derive(Default)]
1525struct K2KEncryptionStats {
1526    messages_encrypted: std::sync::atomic::AtomicU64,
1527    messages_decrypted: std::sync::atomic::AtomicU64,
1528    bytes_encrypted: std::sync::atomic::AtomicU64,
1529    bytes_decrypted: std::sync::atomic::AtomicU64,
1530    key_rotations: std::sync::atomic::AtomicU64,
1531    decryption_failures: std::sync::atomic::AtomicU64,
1532}
1533
1534/// Snapshot of K2K encryption statistics.
1535#[cfg(feature = "crypto")]
1536#[derive(Debug, Clone, Default)]
1537pub struct K2KEncryptionStatsSnapshot {
1538    /// Messages encrypted.
1539    pub messages_encrypted: u64,
1540    /// Messages decrypted.
1541    pub messages_decrypted: u64,
1542    /// Bytes encrypted.
1543    pub bytes_encrypted: u64,
1544    /// Bytes decrypted.
1545    pub bytes_decrypted: u64,
1546    /// Key rotations performed.
1547    pub key_rotations: u64,
1548    /// Decryption failures (authentication/integrity).
1549    pub decryption_failures: u64,
1550    /// Number of registered peers.
1551    pub peer_count: usize,
1552    /// Current session key generation.
1553    pub session_generation: u64,
1554}
1555
1556/// Encrypted K2K endpoint that wraps a standard endpoint with encryption.
1557#[cfg(feature = "crypto")]
1558pub struct EncryptedK2KEndpoint {
1559    /// Inner endpoint.
1560    inner: K2KEndpoint,
1561    /// Encryptor.
1562    encryptor: Arc<K2KEncryptor>,
1563}
1564
1565#[cfg(feature = "crypto")]
1566impl EncryptedK2KEndpoint {
1567    /// Create an encrypted endpoint wrapping a standard endpoint.
1568    pub fn new(inner: K2KEndpoint, encryptor: Arc<K2KEncryptor>) -> Self {
1569        Self { inner, encryptor }
1570    }
1571
1572    /// Get this kernel's public key for key exchange.
1573    pub fn public_key(&self) -> [u8; 32] {
1574        self.encryptor.public_key()
1575    }
1576
1577    /// Register a peer's public key.
1578    pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
1579        self.encryptor.register_peer(kernel_id, public_key);
1580    }
1581
1582    /// Send an encrypted message.
1583    pub async fn send_encrypted(
1584        &self,
1585        destination: KernelId,
1586        envelope: MessageEnvelope,
1587    ) -> Result<DeliveryReceipt> {
1588        self.encryptor.maybe_rotate();
1589
1590        let timestamp = envelope.header.timestamp;
1591        let message = K2KMessage::new(
1592            self.inner.kernel_id.clone(),
1593            destination.clone(),
1594            envelope,
1595            timestamp,
1596        );
1597
1598        // Encrypt the message
1599        let _encrypted = self.encryptor.encrypt(&message)?;
1600
1601        // For now, send the original message (encryption metadata would need protocol support)
1602        // In a full implementation, the broker would handle encrypted payloads
1603        self.inner.send(destination, message.envelope).await
1604    }
1605
1606    /// Receive and decrypt a message.
1607    pub async fn receive_decrypted(&mut self) -> Option<K2KMessage> {
1608        self.inner.receive().await
1609        // In a full implementation, decrypt the message here
1610    }
1611
1612    /// Get encryption stats.
1613    pub fn encryption_stats(&self) -> K2KEncryptionStatsSnapshot {
1614        self.encryptor.stats()
1615    }
1616}
1617
1618/// Builder for encrypted K2K broker infrastructure.
1619#[cfg(feature = "crypto")]
1620pub struct EncryptedK2KBuilder {
1621    k2k_config: K2KConfig,
1622    encryption_config: K2KEncryptionConfig,
1623}
1624
1625#[cfg(feature = "crypto")]
1626impl EncryptedK2KBuilder {
1627    /// Create a new builder.
1628    pub fn new() -> Self {
1629        Self {
1630            k2k_config: K2KConfig::default(),
1631            encryption_config: K2KEncryptionConfig::default(),
1632        }
1633    }
1634
1635    /// Set K2K configuration.
1636    pub fn k2k_config(mut self, config: K2KConfig) -> Self {
1637        self.k2k_config = config;
1638        self
1639    }
1640
1641    /// Set encryption configuration.
1642    pub fn encryption_config(mut self, config: K2KEncryptionConfig) -> Self {
1643        self.encryption_config = config;
1644        self
1645    }
1646
1647    /// Enable forward secrecy.
1648    pub fn with_forward_secrecy(mut self, enabled: bool) -> Self {
1649        self.encryption_config.forward_secrecy = enabled;
1650        self
1651    }
1652
1653    /// Set encryption algorithm.
1654    pub fn with_algorithm(mut self, algorithm: K2KEncryptionAlgorithm) -> Self {
1655        self.encryption_config.algorithm = algorithm;
1656        self
1657    }
1658
1659    /// Set key rotation interval.
1660    pub fn with_key_rotation(mut self, interval_secs: u64) -> Self {
1661        self.encryption_config.key_rotation_interval_secs = interval_secs;
1662        self
1663    }
1664
1665    /// Require encryption for all messages.
1666    pub fn require_encryption(mut self, required: bool) -> Self {
1667        self.encryption_config.require_encryption = required;
1668        self
1669    }
1670
1671    /// Build the encrypted K2K infrastructure.
1672    pub fn build(self) -> (Arc<K2KBroker>, K2KEncryptionConfig) {
1673        (K2KBroker::new(self.k2k_config), self.encryption_config)
1674    }
1675}
1676
1677#[cfg(feature = "crypto")]
1678impl Default for EncryptedK2KBuilder {
1679    fn default() -> Self {
1680        Self::new()
1681    }
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686    use super::*;
1687
1688    #[tokio::test]
1689    async fn test_k2k_broker_registration() {
1690        let broker = K2KBuilder::new().build();
1691
1692        let kernel1 = KernelId::new("kernel1");
1693        let kernel2 = KernelId::new("kernel2");
1694
1695        let _endpoint1 = broker.register(kernel1.clone());
1696        let _endpoint2 = broker.register(kernel2.clone());
1697
1698        assert!(broker.is_registered(&kernel1));
1699        assert!(broker.is_registered(&kernel2));
1700        assert_eq!(broker.registered_kernels().len(), 2);
1701    }
1702
1703    #[tokio::test]
1704    async fn test_k2k_message_delivery() {
1705        let broker = K2KBuilder::new().build();
1706
1707        let kernel1 = KernelId::new("kernel1");
1708        let kernel2 = KernelId::new("kernel2");
1709
1710        let endpoint1 = broker.register(kernel1.clone());
1711        let mut endpoint2 = broker.register(kernel2.clone());
1712
1713        // Create a test envelope
1714        let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1715
1716        // Send from kernel1 to kernel2
1717        let receipt = endpoint1.send(kernel2.clone(), envelope).await.unwrap();
1718        assert_eq!(receipt.status, DeliveryStatus::Delivered);
1719
1720        // Receive on kernel2
1721        let message = endpoint2.try_receive();
1722        assert!(message.is_some());
1723        assert_eq!(message.unwrap().source, kernel1);
1724    }
1725
1726    #[test]
1727    fn test_k2k_config_default() {
1728        let config = K2KConfig::default();
1729        assert_eq!(config.max_pending_messages, 1024);
1730        assert_eq!(config.delivery_timeout_ms, 5000);
1731    }
1732
1733    // ============================================================
1734    // Multi-tenant K2K tests (§3.5)
1735    // ============================================================
1736
1737    mod multi_tenant {
1738        use super::*;
1739        use crate::audit::MemorySink;
1740
1741        fn env() -> MessageEnvelope {
1742            MessageEnvelope::empty(1, 2, HlcTimestamp::now(1))
1743        }
1744
1745        // -- regression: single-tenant fast path unchanged --
1746
1747        #[tokio::test]
1748        async fn legacy_single_tenant_send_unchanged() {
1749            let broker = K2KBuilder::new().build();
1750            let k1 = KernelId::new("k1");
1751            let k2 = KernelId::new("k2");
1752            let e1 = broker.register(k1.clone());
1753            let mut e2 = broker.register(k2.clone());
1754
1755            let receipt = e1.send(k2.clone(), env()).await.unwrap();
1756            assert_eq!(receipt.status, DeliveryStatus::Delivered);
1757            let msg = e2.try_receive().unwrap();
1758            assert_eq!(msg.source, k1);
1759            assert_eq!(msg.envelope.tenant_id, UNSPECIFIED_TENANT);
1760        }
1761
1762        #[tokio::test]
1763        async fn single_tenant_fast_path_uses_unspecified_tenant() {
1764            let broker = K2KBuilder::new().build();
1765            let k = KernelId::new("k");
1766            let _e = broker.register(k.clone());
1767            assert_eq!(broker.tenant_of(&k), Some(UNSPECIFIED_TENANT));
1768            assert_eq!(broker.tenant_count(), 1);
1769        }
1770
1771        // -- cross-tenant rejection with audit --
1772
1773        #[tokio::test]
1774        async fn cross_tenant_send_rejected_with_tenant_mismatch() {
1775            let broker = K2KBuilder::new().build();
1776            broker
1777                .registry()
1778                .register(1, TenantQuota::default())
1779                .unwrap();
1780            broker
1781                .registry()
1782                .register(2, TenantQuota::default())
1783                .unwrap();
1784
1785            let ka = KernelId::new("tenant1_kernel");
1786            let kb = KernelId::new("tenant2_kernel");
1787            let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1788            let _eb = broker.register_tenant(2, AuditTag::new(20, 200), kb.clone());
1789
1790            let err = ea.send(kb.clone(), env()).await.unwrap_err();
1791            match err {
1792                RingKernelError::TenantMismatch { from, to } => {
1793                    assert_eq!(from, 1);
1794                    assert_eq!(to, 2);
1795                }
1796                other => panic!("expected TenantMismatch, got {:?}", other),
1797            }
1798            assert_eq!(broker.stats().cross_tenant_rejections, 1);
1799        }
1800
1801        #[tokio::test]
1802        async fn cross_tenant_attempt_recorded_in_audit_sink() {
1803            let sink = Arc::new(MemorySink::new(100));
1804            let registry = Arc::new(TenantRegistry::with_audit_sink(sink.clone()));
1805            registry.register(1, TenantQuota::default()).unwrap();
1806            registry.register(2, TenantQuota::default()).unwrap();
1807
1808            let broker = K2KBuilder::new().with_registry(registry).build();
1809            let ka = KernelId::new("ka");
1810            let kb = KernelId::new("kb");
1811            let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1812            let _eb = broker.register_tenant(2, AuditTag::new(20, 200), kb.clone());
1813
1814            let _ = ea.send(kb.clone(), env()).await.unwrap_err();
1815            let events = sink.events();
1816            assert_eq!(events.len(), 1);
1817            assert!(events[0].description.contains("cross-tenant"));
1818            let md: std::collections::HashMap<_, _> = events[0]
1819                .metadata
1820                .iter()
1821                .cloned()
1822                .collect::<std::collections::HashMap<_, _>>();
1823            assert_eq!(md.get("from_tenant"), Some(&"1".to_string()));
1824            assert_eq!(md.get("to_tenant"), Some(&"2".to_string()));
1825        }
1826
1827        // -- same-tenant sends stamped with audit_tag --
1828
1829        #[tokio::test]
1830        async fn same_tenant_send_succeeds_with_audit_tag() {
1831            let broker = K2KBuilder::new().build();
1832            broker
1833                .registry()
1834                .register(1, TenantQuota::default())
1835                .unwrap();
1836
1837            let ka = KernelId::new("a");
1838            let kb = KernelId::new("b");
1839            let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1840            let mut eb = broker.register_tenant(1, AuditTag::new(10, 100), kb.clone());
1841
1842            let receipt = ea.send(kb.clone(), env()).await.unwrap();
1843            assert_eq!(receipt.status, DeliveryStatus::Delivered);
1844
1845            let msg = eb.try_receive().unwrap();
1846            assert_eq!(msg.envelope.tenant_id, 1);
1847            assert_eq!(msg.envelope.audit_tag, AuditTag::new(10, 100));
1848        }
1849
1850        // -- engagement cost tracking --
1851
1852        #[tokio::test]
1853        async fn engagement_cost_accumulates_across_sends() {
1854            let broker = K2KBuilder::new().build();
1855            broker
1856                .registry()
1857                .register(1, TenantQuota::unlimited())
1858                .unwrap();
1859
1860            let ka = KernelId::new("a");
1861            let kb = KernelId::new("b");
1862            let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
1863            let _eb = broker.register_tenant(1, AuditTag::new(10, 100), kb.clone());
1864
1865            for _ in 0..4 {
1866                let _ = ea.send(kb.clone(), env()).await.unwrap();
1867                broker.registry().track_usage(
1868                    1,
1869                    AuditTag::new(10, 100),
1870                    std::time::Duration::from_millis(50),
1871                );
1872            }
1873            let cost = broker
1874                .registry()
1875                .get_engagement_cost_for(1, AuditTag::new(10, 100));
1876            assert_eq!(cost, std::time::Duration::from_millis(200));
1877        }
1878
1879        #[tokio::test]
1880        async fn engagement_cost_separate_across_audit_tags() {
1881            let broker = K2KBuilder::new().build();
1882            broker
1883                .registry()
1884                .register(1, TenantQuota::unlimited())
1885                .unwrap();
1886
1887            let tag_a = AuditTag::new(10, 1);
1888            let tag_b = AuditTag::new(10, 2);
1889            broker
1890                .registry()
1891                .track_usage(1, tag_a, std::time::Duration::from_millis(150));
1892            broker
1893                .registry()
1894                .track_usage(1, tag_b, std::time::Duration::from_millis(300));
1895
1896            assert_eq!(
1897                broker.registry().get_engagement_cost_for(1, tag_a),
1898                std::time::Duration::from_millis(150)
1899            );
1900            assert_eq!(
1901                broker.registry().get_engagement_cost_for(1, tag_b),
1902                std::time::Duration::from_millis(300)
1903            );
1904        }
1905
1906        // -- quota enforcement --
1907
1908        #[tokio::test]
1909        async fn quota_enforcement_rejects_over_rate_limit() {
1910            let broker = K2KBuilder::new().build();
1911            let mut quota = TenantQuota::default();
1912            quota.max_messages_per_sec = 2;
1913            broker.registry().register(1, quota).unwrap();
1914
1915            let ka = KernelId::new("a");
1916            let kb = KernelId::new("b");
1917            let ea = broker.register_tenant(1, AuditTag::new(10, 1), ka.clone());
1918            let _eb = broker.register_tenant(1, AuditTag::new(10, 1), kb.clone());
1919
1920            assert!(ea.send(kb.clone(), env()).await.is_ok());
1921            assert!(ea.send(kb.clone(), env()).await.is_ok());
1922            let err = ea.send(kb.clone(), env()).await.unwrap_err();
1923            assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
1924        }
1925
1926        // -- registration / deregistration --
1927
1928        #[tokio::test]
1929        async fn register_tenant_kernel_and_unregister() {
1930            let broker = K2KBuilder::new().build();
1931            broker
1932                .registry()
1933                .register(7, TenantQuota::default())
1934                .unwrap();
1935            let k = KernelId::new("k");
1936            let _ep = broker.register_tenant(7, AuditTag::new(1, 1), k.clone());
1937            assert_eq!(broker.tenant_of(&k), Some(7));
1938            assert_eq!(broker.registered_kernels_for(7), vec![k.clone()]);
1939
1940            broker.unregister(&k);
1941            assert!(!broker.is_registered(&k));
1942            assert!(broker.registered_kernels_for(7).is_empty());
1943        }
1944
1945        #[tokio::test]
1946        async fn tenant_stats_reports_per_tenant_counts() {
1947            let broker = K2KBuilder::new().build();
1948            broker
1949                .registry()
1950                .register(1, TenantQuota::default())
1951                .unwrap();
1952            broker
1953                .registry()
1954                .register(2, TenantQuota::default())
1955                .unwrap();
1956
1957            let _ea = broker.register_tenant(1, AuditTag::unspecified(), KernelId::new("a"));
1958            let _eb = broker.register_tenant(1, AuditTag::unspecified(), KernelId::new("b"));
1959            let _ec = broker.register_tenant(2, AuditTag::unspecified(), KernelId::new("c"));
1960
1961            let s1 = broker.tenant_stats(1).unwrap();
1962            let s2 = broker.tenant_stats(2).unwrap();
1963            assert_eq!(s1.registered_endpoints, 2);
1964            assert_eq!(s2.registered_endpoints, 1);
1965        }
1966
1967        #[tokio::test]
1968        async fn re_registering_kernel_moves_it_between_tenants() {
1969            let broker = K2KBuilder::new().build();
1970            broker
1971                .registry()
1972                .register(1, TenantQuota::default())
1973                .unwrap();
1974            broker
1975                .registry()
1976                .register(2, TenantQuota::default())
1977                .unwrap();
1978
1979            let k = KernelId::new("roaming");
1980            let _e1 = broker.register_tenant(1, AuditTag::unspecified(), k.clone());
1981            assert_eq!(broker.tenant_of(&k), Some(1));
1982
1983            let _e2 = broker.register_tenant(2, AuditTag::unspecified(), k.clone());
1984            assert_eq!(broker.tenant_of(&k), Some(2));
1985            assert!(broker.registered_kernels_for(1).is_empty());
1986            assert_eq!(broker.registered_kernels_for(2), vec![k]);
1987        }
1988
1989        #[tokio::test]
1990        async fn send_with_audit_overrides_registration_tag() {
1991            let broker = K2KBuilder::new().build();
1992            broker
1993                .registry()
1994                .register(1, TenantQuota::unlimited())
1995                .unwrap();
1996
1997            let ka = KernelId::new("a");
1998            let kb = KernelId::new("b");
1999            let _ea = broker.register_tenant(1, AuditTag::new(10, 1), ka.clone());
2000            let mut eb = broker.register_tenant(1, AuditTag::new(10, 1), kb.clone());
2001
2002            let receipt = broker
2003                .send_with_audit(ka.clone(), kb.clone(), env(), AuditTag::new(10, 99))
2004                .await
2005                .unwrap();
2006            assert_eq!(receipt.status, DeliveryStatus::Delivered);
2007            let msg = eb.try_receive().unwrap();
2008            assert_eq!(msg.envelope.audit_tag, AuditTag::new(10, 99));
2009        }
2010
2011        #[tokio::test]
2012        async fn default_audit_tag_behavior() {
2013            // Single-tenant deployment (tenant_id = 0) should preserve the
2014            // default unspecified audit tag when no explicit tag is set.
2015            let broker = K2KBuilder::new().build();
2016            let ka = KernelId::new("a");
2017            let kb = KernelId::new("b");
2018            let _ea = broker.register(ka.clone());
2019            let mut eb = broker.register(kb.clone());
2020
2021            let _ = broker.send(ka.clone(), kb.clone(), env()).await.unwrap();
2022            let msg = eb.try_receive().unwrap();
2023            assert_eq!(msg.envelope.tenant_id, UNSPECIFIED_TENANT);
2024            assert!(msg.envelope.audit_tag.is_unspecified());
2025        }
2026    }
2027
2028    // K2K Encryption tests (requires crypto feature)
2029    #[cfg(feature = "crypto")]
2030    mod crypto_tests {
2031        use super::*;
2032
2033        #[test]
2034        fn test_k2k_encryption_config_default() {
2035            let config = K2KEncryptionConfig::default();
2036            assert!(config.enabled);
2037            assert!(config.forward_secrecy);
2038            assert_eq!(config.algorithm, K2KEncryptionAlgorithm::Aes256Gcm);
2039            assert_eq!(config.key_rotation_interval_secs, 3600);
2040        }
2041
2042        #[test]
2043        fn test_k2k_encryption_config_disabled() {
2044            let config = K2KEncryptionConfig::disabled();
2045            assert!(!config.enabled);
2046        }
2047
2048        #[test]
2049        fn test_k2k_encryption_config_strict() {
2050            let config = K2KEncryptionConfig::strict();
2051            assert!(config.enabled);
2052            assert!(config.require_encryption);
2053            assert!(config.forward_secrecy);
2054        }
2055
2056        #[test]
2057        fn test_k2k_key_material_creation() {
2058            let kernel_id = KernelId::new("test_kernel");
2059            let key_material = K2KKeyMaterial::new(kernel_id.clone());
2060
2061            assert_eq!(key_material.kernel_id(), &kernel_id);
2062            assert_eq!(key_material.session_generation(), 1);
2063        }
2064
2065        #[test]
2066        fn test_k2k_key_material_rotation() {
2067            let kernel_id = KernelId::new("test_kernel");
2068            let key_material = K2KKeyMaterial::new(kernel_id);
2069
2070            let old_session_key = key_material.session_key();
2071            let old_generation = key_material.session_generation();
2072
2073            key_material.rotate_session_key();
2074
2075            let new_session_key = key_material.session_key();
2076            let new_generation = key_material.session_generation();
2077
2078            assert_ne!(old_session_key, new_session_key);
2079            assert_eq!(new_generation, old_generation + 1);
2080        }
2081
2082        #[test]
2083        fn test_k2k_key_material_shared_secret() {
2084            let kernel1 = K2KKeyMaterial::new(KernelId::new("kernel1"));
2085            let kernel2 = K2KKeyMaterial::new(KernelId::new("kernel2"));
2086
2087            // Get public keys (simulated)
2088            let pk1 = {
2089                use sha2::{Digest, Sha256};
2090                let mut hasher = Sha256::new();
2091                hasher.update(&kernel1.long_term_key);
2092                hasher.update(b"k2k-public-key-v1");
2093                let result = hasher.finalize();
2094                let mut public = [0u8; 32];
2095                public.copy_from_slice(&result);
2096                public
2097            };
2098            let pk2 = {
2099                use sha2::{Digest, Sha256};
2100                let mut hasher = Sha256::new();
2101                hasher.update(&kernel2.long_term_key);
2102                hasher.update(b"k2k-public-key-v1");
2103                let result = hasher.finalize();
2104                let mut public = [0u8; 32];
2105                public.copy_from_slice(&result);
2106                public
2107            };
2108
2109            // Shared secrets should be different for different pairs
2110            let secret1 = kernel1.derive_shared_secret(&pk2);
2111            let secret2 = kernel2.derive_shared_secret(&pk1);
2112
2113            // They won't be equal with this simplified implementation
2114            // In a real X25519 implementation, they would be
2115            assert_eq!(secret1.len(), 32);
2116            assert_eq!(secret2.len(), 32);
2117        }
2118
2119        #[test]
2120        fn test_k2k_encryptor_creation() {
2121            let kernel_id = KernelId::new("test_kernel");
2122            let config = K2KEncryptionConfig::default();
2123            let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
2124
2125            let public_key = encryptor.public_key();
2126            assert_eq!(public_key.len(), 32);
2127
2128            let stats = encryptor.stats();
2129            assert_eq!(stats.messages_encrypted, 0);
2130            assert_eq!(stats.messages_decrypted, 0);
2131            assert_eq!(stats.peer_count, 0);
2132        }
2133
2134        #[test]
2135        fn test_k2k_encryptor_peer_registration() {
2136            let kernel_id = KernelId::new("test_kernel");
2137            let config = K2KEncryptionConfig::default();
2138            let encryptor = K2KEncryptor::new(kernel_id, config);
2139
2140            let peer_id = KernelId::new("peer_kernel");
2141            let peer_key = [42u8; 32];
2142
2143            encryptor.register_peer(peer_id.clone(), peer_key);
2144            assert_eq!(encryptor.stats().peer_count, 1);
2145
2146            encryptor.unregister_peer(&peer_id);
2147            assert_eq!(encryptor.stats().peer_count, 0);
2148        }
2149
2150        #[test]
2151        fn test_k2k_encrypted_builder() {
2152            let (broker, config) = EncryptedK2KBuilder::new()
2153                .with_forward_secrecy(true)
2154                .with_algorithm(K2KEncryptionAlgorithm::ChaCha20Poly1305)
2155                .with_key_rotation(1800)
2156                .require_encryption(true)
2157                .build();
2158
2159            assert!(config.forward_secrecy);
2160            assert_eq!(config.algorithm, K2KEncryptionAlgorithm::ChaCha20Poly1305);
2161            assert_eq!(config.key_rotation_interval_secs, 1800);
2162            assert!(config.require_encryption);
2163
2164            // Broker should be functional
2165            let stats = broker.stats();
2166            assert_eq!(stats.registered_endpoints, 0);
2167        }
2168
2169        #[test]
2170        fn test_k2k_encryption_stats_snapshot() {
2171            let stats = K2KEncryptionStatsSnapshot::default();
2172            assert_eq!(stats.messages_encrypted, 0);
2173            assert_eq!(stats.messages_decrypted, 0);
2174            assert_eq!(stats.bytes_encrypted, 0);
2175            assert_eq!(stats.bytes_decrypted, 0);
2176            assert_eq!(stats.key_rotations, 0);
2177            assert_eq!(stats.decryption_failures, 0);
2178            assert_eq!(stats.peer_count, 0);
2179            assert_eq!(stats.session_generation, 0);
2180        }
2181
2182        #[test]
2183        fn test_k2k_encryption_algorithms() {
2184            // Test that both algorithms are distinct
2185            assert_ne!(
2186                K2KEncryptionAlgorithm::Aes256Gcm,
2187                K2KEncryptionAlgorithm::ChaCha20Poly1305
2188            );
2189        }
2190
2191        #[test]
2192        fn test_k2k_key_material_should_rotate() {
2193            let kernel_id = KernelId::new("test_kernel");
2194            let key_material = K2KKeyMaterial::new(kernel_id);
2195
2196            // Should not rotate with 0 interval
2197            assert!(!key_material.should_rotate(0));
2198
2199            // Should not rotate immediately with long interval
2200            assert!(!key_material.should_rotate(3600));
2201        }
2202
2203        #[test]
2204        fn test_k2k_encryptor_disabled_encryption() {
2205            let kernel_id = KernelId::new("test_kernel");
2206            let config = K2KEncryptionConfig::disabled();
2207            let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
2208
2209            // Create a test message
2210            let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
2211            let message = K2KMessage::new(
2212                kernel_id,
2213                KernelId::new("dest"),
2214                envelope,
2215                HlcTimestamp::now(1),
2216            );
2217
2218            // Should fail when encryption is disabled
2219            let result = encryptor.encrypt(&message);
2220            assert!(result.is_err());
2221        }
2222
2223        #[test]
2224        fn test_k2k_encryptor_missing_peer_key() {
2225            let kernel_id = KernelId::new("test_kernel");
2226            let config = K2KEncryptionConfig::default();
2227            let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
2228
2229            // Create a test message to unknown destination
2230            let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
2231            let message = K2KMessage::new(
2232                kernel_id,
2233                KernelId::new("unknown_dest"),
2234                envelope,
2235                HlcTimestamp::now(1),
2236            );
2237
2238            // Should fail due to missing peer key
2239            let result = encryptor.encrypt(&message);
2240            assert!(result.is_err());
2241            assert!(result.unwrap_err().to_string().contains("No public key"));
2242        }
2243    }
2244}