Skip to main content

ringkernel_core/
k2k.rs

1//! Kernel-to-Kernel (K2K) direct messaging.
2//!
3//! This module provides infrastructure for direct communication between
4//! GPU kernels without host-side mediation.
5
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::error::{Result, RingKernelError};
13use crate::hlc::HlcTimestamp;
14use crate::message::{MessageEnvelope, MessageId};
15use crate::runtime::KernelId;
16
17/// Configuration for K2K messaging.
18#[derive(Debug, Clone)]
19pub struct K2KConfig {
20    /// Maximum pending messages per kernel pair.
21    pub max_pending_messages: usize,
22    /// Timeout for delivery in milliseconds.
23    pub delivery_timeout_ms: u64,
24    /// Enable message tracing.
25    pub enable_tracing: bool,
26    /// Maximum hop count for routed messages.
27    pub max_hops: u8,
28}
29
30impl Default for K2KConfig {
31    fn default() -> Self {
32        Self {
33            max_pending_messages: 1024,
34            delivery_timeout_ms: 5000,
35            enable_tracing: false,
36            max_hops: 8,
37        }
38    }
39}
40
41/// A K2K message with routing information.
42#[derive(Debug, Clone)]
43pub struct K2KMessage {
44    /// Unique message ID.
45    pub id: MessageId,
46    /// Source kernel.
47    pub source: KernelId,
48    /// Destination kernel.
49    pub destination: KernelId,
50    /// The message envelope.
51    pub envelope: MessageEnvelope,
52    /// Hop count (for detecting routing loops).
53    pub hops: u8,
54    /// Timestamp when message was sent.
55    pub sent_at: HlcTimestamp,
56    /// Priority (higher = more urgent).
57    pub priority: u8,
58}
59
60impl K2KMessage {
61    /// Create a new K2K message.
62    pub fn new(
63        source: KernelId,
64        destination: KernelId,
65        envelope: MessageEnvelope,
66        timestamp: HlcTimestamp,
67    ) -> Self {
68        Self {
69            id: MessageId::generate(),
70            source,
71            destination,
72            envelope,
73            hops: 0,
74            sent_at: timestamp,
75            priority: 0,
76        }
77    }
78
79    /// Create with priority.
80    pub fn with_priority(mut self, priority: u8) -> Self {
81        self.priority = priority;
82        self
83    }
84
85    /// Increment hop count.
86    pub fn increment_hops(&mut self) -> Result<()> {
87        self.hops += 1;
88        if self.hops > 16 {
89            return Err(RingKernelError::K2KError(
90                "Maximum hop count exceeded".to_string(),
91            ));
92        }
93        Ok(())
94    }
95}
96
97/// Receipt for a K2K message delivery.
98#[derive(Debug, Clone)]
99pub struct DeliveryReceipt {
100    /// Message ID.
101    pub message_id: MessageId,
102    /// Source kernel.
103    pub source: KernelId,
104    /// Destination kernel.
105    pub destination: KernelId,
106    /// Delivery status.
107    pub status: DeliveryStatus,
108    /// Timestamp of delivery/failure.
109    pub timestamp: HlcTimestamp,
110}
111
112/// Status of message delivery.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum DeliveryStatus {
115    /// Message delivered successfully.
116    Delivered,
117    /// Message pending delivery.
118    Pending,
119    /// Destination kernel not found.
120    NotFound,
121    /// Destination queue full.
122    QueueFull,
123    /// Delivery timed out.
124    Timeout,
125    /// Maximum hops exceeded.
126    MaxHopsExceeded,
127}
128
129/// K2K endpoint for a single kernel.
130pub struct K2KEndpoint {
131    /// Kernel ID.
132    kernel_id: KernelId,
133    /// Incoming message channel.
134    receiver: mpsc::Receiver<K2KMessage>,
135    /// Reference to the broker.
136    broker: Arc<K2KBroker>,
137}
138
139impl K2KEndpoint {
140    /// Receive a K2K message (blocking).
141    pub async fn receive(&mut self) -> Option<K2KMessage> {
142        self.receiver.recv().await
143    }
144
145    /// Try to receive a K2K message (non-blocking).
146    pub fn try_receive(&mut self) -> Option<K2KMessage> {
147        self.receiver.try_recv().ok()
148    }
149
150    /// Send a message to another kernel.
151    pub async fn send(
152        &self,
153        destination: KernelId,
154        envelope: MessageEnvelope,
155    ) -> Result<DeliveryReceipt> {
156        self.broker
157            .send(self.kernel_id.clone(), destination, envelope)
158            .await
159    }
160
161    /// Send a high-priority message.
162    pub async fn send_priority(
163        &self,
164        destination: KernelId,
165        envelope: MessageEnvelope,
166        priority: u8,
167    ) -> Result<DeliveryReceipt> {
168        self.broker
169            .send_priority(self.kernel_id.clone(), destination, envelope, priority)
170            .await
171    }
172
173    /// Get pending message count.
174    pub fn pending_count(&self) -> usize {
175        // Note: This is an estimate since the channel may be modified concurrently
176        0 // mpsc doesn't provide len() directly
177    }
178}
179
180/// K2K message broker for routing messages between kernels.
181pub struct K2KBroker {
182    /// Configuration.
183    config: K2KConfig,
184    /// Registered endpoints (kernel_id -> sender).
185    endpoints: RwLock<HashMap<KernelId, mpsc::Sender<K2KMessage>>>,
186    /// Message counter.
187    message_counter: AtomicU64,
188    /// Delivery receipts (for acknowledgment).
189    receipts: RwLock<HashMap<MessageId, DeliveryReceipt>>,
190    /// Routing table for indirect delivery.
191    routing_table: RwLock<HashMap<KernelId, KernelId>>,
192}
193
194impl K2KBroker {
195    /// Create a new K2K broker.
196    pub fn new(config: K2KConfig) -> Arc<Self> {
197        Arc::new(Self {
198            config,
199            endpoints: RwLock::new(HashMap::new()),
200            message_counter: AtomicU64::new(0),
201            receipts: RwLock::new(HashMap::new()),
202            routing_table: RwLock::new(HashMap::new()),
203        })
204    }
205
206    /// Register a kernel endpoint.
207    pub fn register(self: &Arc<Self>, kernel_id: KernelId) -> K2KEndpoint {
208        let (sender, receiver) = mpsc::channel(self.config.max_pending_messages);
209
210        self.endpoints.write().insert(kernel_id.clone(), sender);
211
212        K2KEndpoint {
213            kernel_id,
214            receiver,
215            broker: Arc::clone(self),
216        }
217    }
218
219    /// Unregister a kernel endpoint.
220    pub fn unregister(&self, kernel_id: &KernelId) {
221        self.endpoints.write().remove(kernel_id);
222        self.routing_table.write().remove(kernel_id);
223    }
224
225    /// Check if a kernel is registered.
226    pub fn is_registered(&self, kernel_id: &KernelId) -> bool {
227        self.endpoints.read().contains_key(kernel_id)
228    }
229
230    /// Get all registered kernels.
231    pub fn registered_kernels(&self) -> Vec<KernelId> {
232        self.endpoints.read().keys().cloned().collect()
233    }
234
235    /// Send a message from one kernel to another.
236    pub async fn send(
237        &self,
238        source: KernelId,
239        destination: KernelId,
240        envelope: MessageEnvelope,
241    ) -> Result<DeliveryReceipt> {
242        self.send_priority(source, destination, envelope, 0).await
243    }
244
245    /// Send a priority message.
246    pub async fn send_priority(
247        &self,
248        source: KernelId,
249        destination: KernelId,
250        envelope: MessageEnvelope,
251        priority: u8,
252    ) -> Result<DeliveryReceipt> {
253        let timestamp = envelope.header.timestamp;
254        let mut message = K2KMessage::new(source.clone(), destination.clone(), envelope, timestamp);
255        message.priority = priority;
256
257        self.deliver(message).await
258    }
259
260    /// Deliver a message to its destination.
261    async fn deliver(&self, message: K2KMessage) -> Result<DeliveryReceipt> {
262        let message_id = message.id;
263        let source = message.source.clone();
264        let destination = message.destination.clone();
265        let timestamp = message.sent_at;
266
267        // Try direct delivery first
268        let endpoints = self.endpoints.read();
269        if let Some(sender) = endpoints.get(&destination) {
270            match sender.try_send(message) {
271                Ok(()) => {
272                    self.message_counter.fetch_add(1, Ordering::Relaxed);
273                    let receipt = DeliveryReceipt {
274                        message_id,
275                        source,
276                        destination,
277                        status: DeliveryStatus::Delivered,
278                        timestamp,
279                    };
280                    self.receipts.write().insert(message_id, receipt.clone());
281                    return Ok(receipt);
282                }
283                Err(mpsc::error::TrySendError::Full(_)) => {
284                    return Ok(DeliveryReceipt {
285                        message_id,
286                        source,
287                        destination,
288                        status: DeliveryStatus::QueueFull,
289                        timestamp,
290                    });
291                }
292                Err(mpsc::error::TrySendError::Closed(_)) => {
293                    return Ok(DeliveryReceipt {
294                        message_id,
295                        source,
296                        destination,
297                        status: DeliveryStatus::NotFound,
298                        timestamp,
299                    });
300                }
301            }
302        }
303        drop(endpoints);
304
305        // Try routing table
306        let next_hop = {
307            let routing = self.routing_table.read();
308            routing.get(&destination).cloned()
309        };
310
311        if let Some(next_hop) = next_hop {
312            let routed_message = K2KMessage {
313                id: message_id,
314                source,
315                destination: destination.clone(),
316                envelope: message.envelope,
317                hops: message.hops + 1,
318                sent_at: message.sent_at,
319                priority: message.priority,
320            };
321
322            if routed_message.hops > self.config.max_hops {
323                return Ok(DeliveryReceipt {
324                    message_id,
325                    source: routed_message.source,
326                    destination,
327                    status: DeliveryStatus::MaxHopsExceeded,
328                    timestamp,
329                });
330            }
331
332            // Try to deliver to next hop
333            let endpoints = self.endpoints.read();
334            if let Some(sender) = endpoints.get(&next_hop) {
335                if sender.try_send(routed_message).is_ok() {
336                    self.message_counter.fetch_add(1, Ordering::Relaxed);
337                    return Ok(DeliveryReceipt {
338                        message_id,
339                        source: message.source,
340                        destination,
341                        status: DeliveryStatus::Pending,
342                        timestamp,
343                    });
344                }
345            }
346        }
347
348        // Destination not found
349        Ok(DeliveryReceipt {
350            message_id,
351            source: message.source,
352            destination,
353            status: DeliveryStatus::NotFound,
354            timestamp,
355        })
356    }
357
358    /// Add a route to the routing table.
359    pub fn add_route(&self, destination: KernelId, next_hop: KernelId) {
360        self.routing_table.write().insert(destination, next_hop);
361    }
362
363    /// Remove a route from the routing table.
364    pub fn remove_route(&self, destination: &KernelId) {
365        self.routing_table.write().remove(destination);
366    }
367
368    /// Get statistics.
369    pub fn stats(&self) -> K2KStats {
370        K2KStats {
371            registered_endpoints: self.endpoints.read().len(),
372            messages_delivered: self.message_counter.load(Ordering::Relaxed),
373            routes_configured: self.routing_table.read().len(),
374        }
375    }
376
377    /// Get delivery receipt for a message.
378    pub fn get_receipt(&self, message_id: &MessageId) -> Option<DeliveryReceipt> {
379        self.receipts.read().get(message_id).cloned()
380    }
381}
382
383/// K2K messaging statistics.
384#[derive(Debug, Clone, Default)]
385pub struct K2KStats {
386    /// Number of registered endpoints.
387    pub registered_endpoints: usize,
388    /// Total messages delivered.
389    pub messages_delivered: u64,
390    /// Number of routes configured.
391    pub routes_configured: usize,
392}
393
394/// Builder for creating K2K infrastructure.
395pub struct K2KBuilder {
396    config: K2KConfig,
397}
398
399impl K2KBuilder {
400    /// Create a new builder.
401    pub fn new() -> Self {
402        Self {
403            config: K2KConfig::default(),
404        }
405    }
406
407    /// Set maximum pending messages.
408    pub fn max_pending_messages(mut self, count: usize) -> Self {
409        self.config.max_pending_messages = count;
410        self
411    }
412
413    /// Set delivery timeout.
414    pub fn delivery_timeout_ms(mut self, timeout: u64) -> Self {
415        self.config.delivery_timeout_ms = timeout;
416        self
417    }
418
419    /// Enable message tracing.
420    pub fn enable_tracing(mut self, enable: bool) -> Self {
421        self.config.enable_tracing = enable;
422        self
423    }
424
425    /// Set maximum hop count.
426    pub fn max_hops(mut self, hops: u8) -> Self {
427        self.config.max_hops = hops;
428        self
429    }
430
431    /// Build the K2K broker.
432    pub fn build(self) -> Arc<K2KBroker> {
433        K2KBroker::new(self.config)
434    }
435}
436
437impl Default for K2KBuilder {
438    fn default() -> Self {
439        Self::new()
440    }
441}
442
443// ============================================================================
444// K2K Message Type Registry (FR-3)
445// ============================================================================
446
447/// Registration information for a K2K-routable message type.
448///
449/// This struct is automatically generated by the `#[derive(RingMessage)]` macro
450/// when `k2k_routable = true` is specified. Registrations are collected at
451/// compile time using the `inventory` crate.
452///
453/// # Example
454///
455/// ```ignore
456/// #[derive(RingMessage)]
457/// #[ring_message(type_id = 1, domain = "OrderMatching", k2k_routable = true)]
458/// pub struct SubmitOrderInput { ... }
459///
460/// // Runtime discovery
461/// let registry = K2KTypeRegistry::discover();
462/// assert!(registry.is_routable(501)); // domain base (500) + type_id (1)
463/// ```
464#[derive(Debug, Clone)]
465pub struct K2KMessageRegistration {
466    /// Message type ID (from RingMessage::message_type()).
467    pub type_id: u64,
468    /// Full type name for debugging/logging.
469    pub type_name: &'static str,
470    /// Whether this message type is routable via K2K.
471    pub k2k_routable: bool,
472    /// Optional routing category for grouped routing.
473    pub category: Option<&'static str>,
474}
475
476// Collect all K2K message registrations at compile time
477inventory::collect!(K2KMessageRegistration);
478
479/// Registry for discovering K2K-routable message types at runtime.
480///
481/// The registry is built by scanning all `K2KMessageRegistration` entries
482/// submitted via the `inventory` crate. This enables runtime discovery of
483/// message types for routing, validation, and monitoring.
484///
485/// # Example
486///
487/// ```ignore
488/// let registry = K2KTypeRegistry::discover();
489///
490/// // Check if a type is routable
491/// if registry.is_routable(501) {
492///     // Allow K2K routing
493/// }
494///
495/// // Get all types in a category
496/// let order_types = registry.get_category("orders");
497/// for type_id in order_types {
498///     println!("Order message type: {}", type_id);
499/// }
500/// ```
501pub struct K2KTypeRegistry {
502    /// Type ID to registration mapping.
503    by_type_id: HashMap<u64, &'static K2KMessageRegistration>,
504    /// Type name to registration mapping.
505    by_type_name: HashMap<&'static str, &'static K2KMessageRegistration>,
506    /// Category to type IDs mapping.
507    by_category: HashMap<&'static str, Vec<u64>>,
508}
509
510impl K2KTypeRegistry {
511    /// Discover all registered K2K message types at runtime.
512    ///
513    /// This scans all `K2KMessageRegistration` entries that were submitted
514    /// via `inventory::submit!` during compilation.
515    pub fn discover() -> Self {
516        let mut registry = Self {
517            by_type_id: HashMap::new(),
518            by_type_name: HashMap::new(),
519            by_category: HashMap::new(),
520        };
521
522        for reg in inventory::iter::<K2KMessageRegistration>() {
523            registry.by_type_id.insert(reg.type_id, reg);
524            registry.by_type_name.insert(reg.type_name, reg);
525            if let Some(cat) = reg.category {
526                registry
527                    .by_category
528                    .entry(cat)
529                    .or_default()
530                    .push(reg.type_id);
531            }
532        }
533
534        registry
535    }
536
537    /// Check if a message type ID is K2K routable.
538    pub fn is_routable(&self, type_id: u64) -> bool {
539        self.by_type_id
540            .get(&type_id)
541            .map(|r| r.k2k_routable)
542            .unwrap_or(false)
543    }
544
545    /// Get registration by type ID.
546    pub fn get(&self, type_id: u64) -> Option<&'static K2KMessageRegistration> {
547        self.by_type_id.get(&type_id).copied()
548    }
549
550    /// Get registration by type name.
551    pub fn get_by_name(&self, type_name: &str) -> Option<&'static K2KMessageRegistration> {
552        self.by_type_name.get(type_name).copied()
553    }
554
555    /// Get all type IDs in a category.
556    pub fn get_category(&self, category: &str) -> &[u64] {
557        self.by_category
558            .get(category)
559            .map(|v| v.as_slice())
560            .unwrap_or(&[])
561    }
562
563    /// Get all registered categories.
564    pub fn categories(&self) -> impl Iterator<Item = &'static str> + '_ {
565        self.by_category.keys().copied()
566    }
567
568    /// Iterate all registered message types.
569    pub fn iter(&self) -> impl Iterator<Item = &'static K2KMessageRegistration> + '_ {
570        self.by_type_id.values().copied()
571    }
572
573    /// Get all routable type IDs.
574    pub fn routable_types(&self) -> Vec<u64> {
575        self.by_type_id
576            .iter()
577            .filter(|(_, r)| r.k2k_routable)
578            .map(|(id, _)| *id)
579            .collect()
580    }
581
582    /// Get total number of registered message types.
583    pub fn len(&self) -> usize {
584        self.by_type_id.len()
585    }
586
587    /// Check if the registry is empty.
588    pub fn is_empty(&self) -> bool {
589        self.by_type_id.is_empty()
590    }
591}
592
593impl Default for K2KTypeRegistry {
594    fn default() -> Self {
595        Self::discover()
596    }
597}
598
599impl std::fmt::Debug for K2KTypeRegistry {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        f.debug_struct("K2KTypeRegistry")
602            .field("registered_types", &self.by_type_id.len())
603            .field("categories", &self.by_category.keys().collect::<Vec<_>>())
604            .finish()
605    }
606}
607
608// ============================================================================
609// K2K Message Encryption (Phase 4.2 - Enterprise Security)
610// ============================================================================
611
612/// Configuration for K2K message encryption.
613#[cfg(feature = "crypto")]
614#[derive(Debug, Clone)]
615pub struct K2KEncryptionConfig {
616    /// Enable message encryption.
617    pub enabled: bool,
618    /// Encryption algorithm.
619    pub algorithm: K2KEncryptionAlgorithm,
620    /// Enable forward secrecy via ephemeral keys.
621    pub forward_secrecy: bool,
622    /// Key rotation interval in seconds (0 = no rotation).
623    pub key_rotation_interval_secs: u64,
624    /// Whether to require encryption for all messages.
625    pub require_encryption: bool,
626}
627
628#[cfg(feature = "crypto")]
629impl Default for K2KEncryptionConfig {
630    fn default() -> Self {
631        Self {
632            enabled: true,
633            algorithm: K2KEncryptionAlgorithm::Aes256Gcm,
634            forward_secrecy: true,
635            key_rotation_interval_secs: 3600, // 1 hour
636            require_encryption: false,
637        }
638    }
639}
640
641#[cfg(feature = "crypto")]
642impl K2KEncryptionConfig {
643    /// Create a config with encryption disabled.
644    pub fn disabled() -> Self {
645        Self {
646            enabled: false,
647            ..Default::default()
648        }
649    }
650
651    /// Create a strict config requiring encryption for all messages.
652    pub fn strict() -> Self {
653        Self {
654            enabled: true,
655            require_encryption: true,
656            forward_secrecy: true,
657            ..Default::default()
658        }
659    }
660}
661
662/// Supported K2K encryption algorithms.
663#[cfg(feature = "crypto")]
664#[derive(Debug, Clone, Copy, PartialEq, Eq)]
665pub enum K2KEncryptionAlgorithm {
666    /// AES-256-GCM (NIST standard, hardware acceleration).
667    Aes256Gcm,
668    /// ChaCha20-Poly1305 (mobile/embedded friendly).
669    ChaCha20Poly1305,
670}
671
672/// Per-kernel encryption key material.
673#[cfg(feature = "crypto")]
674pub struct K2KKeyMaterial {
675    /// Kernel ID this key belongs to.
676    kernel_id: KernelId,
677    /// Long-term key (for key exchange).
678    long_term_key: [u8; 32],
679    /// Current session key.
680    session_key: parking_lot::RwLock<[u8; 32]>,
681    /// Session key generation (for rotation).
682    session_generation: std::sync::atomic::AtomicU64,
683    /// Creation timestamp.
684    created_at: std::time::Instant,
685    /// Last rotation timestamp.
686    last_rotated: parking_lot::RwLock<std::time::Instant>,
687}
688
689#[cfg(feature = "crypto")]
690impl K2KKeyMaterial {
691    /// Create new key material for a kernel.
692    pub fn new(kernel_id: KernelId) -> Self {
693        use rand::RngCore;
694        let mut rng = rand::thread_rng();
695
696        let mut long_term_key = [0u8; 32];
697        let mut session_key = [0u8; 32];
698        rng.fill_bytes(&mut long_term_key);
699        rng.fill_bytes(&mut session_key);
700
701        let now = std::time::Instant::now();
702        Self {
703            kernel_id,
704            long_term_key,
705            session_key: parking_lot::RwLock::new(session_key),
706            session_generation: std::sync::atomic::AtomicU64::new(1),
707            created_at: now,
708            last_rotated: parking_lot::RwLock::new(now),
709        }
710    }
711
712    /// Create key material from an existing long-term key.
713    pub fn from_key(kernel_id: KernelId, key: [u8; 32]) -> Self {
714        use rand::RngCore;
715        let mut rng = rand::thread_rng();
716
717        let mut session_key = [0u8; 32];
718        rng.fill_bytes(&mut session_key);
719
720        let now = std::time::Instant::now();
721        Self {
722            kernel_id,
723            long_term_key: key,
724            session_key: parking_lot::RwLock::new(session_key),
725            session_generation: std::sync::atomic::AtomicU64::new(1),
726            created_at: now,
727            last_rotated: parking_lot::RwLock::new(now),
728        }
729    }
730
731    /// Get the kernel ID.
732    pub fn kernel_id(&self) -> &KernelId {
733        &self.kernel_id
734    }
735
736    /// Get the current session key.
737    pub fn session_key(&self) -> [u8; 32] {
738        *self.session_key.read()
739    }
740
741    /// Get the current session generation.
742    pub fn session_generation(&self) -> u64 {
743        self.session_generation
744            .load(std::sync::atomic::Ordering::Acquire)
745    }
746
747    /// Rotate the session key.
748    pub fn rotate_session_key(&self) {
749        use rand::RngCore;
750        let mut rng = rand::thread_rng();
751
752        let mut new_key = [0u8; 32];
753        rng.fill_bytes(&mut new_key);
754
755        *self.session_key.write() = new_key;
756        self.session_generation
757            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
758        *self.last_rotated.write() = std::time::Instant::now();
759    }
760
761    /// Derive a shared secret for a destination kernel.
762    pub fn derive_shared_secret(&self, dest_public_key: &[u8; 32]) -> [u8; 32] {
763        use sha2::{Digest, Sha256};
764
765        // Simple key derivation: HKDF-like construction
766        // In production, use proper X25519 key exchange
767        let mut hasher = Sha256::new();
768        hasher.update(&self.long_term_key);
769        hasher.update(dest_public_key);
770        hasher.update(b"k2k-shared-secret-v1");
771
772        let result = hasher.finalize();
773        let mut secret = [0u8; 32];
774        secret.copy_from_slice(&result);
775        secret
776    }
777
778    /// Check if session key should be rotated.
779    pub fn should_rotate(&self, interval_secs: u64) -> bool {
780        if interval_secs == 0 {
781            return false;
782        }
783        let elapsed = self.last_rotated.read().elapsed();
784        elapsed.as_secs() >= interval_secs
785    }
786
787    /// Get key material age.
788    pub fn age(&self) -> std::time::Duration {
789        self.created_at.elapsed()
790    }
791}
792
793#[cfg(feature = "crypto")]
794impl Drop for K2KKeyMaterial {
795    fn drop(&mut self) {
796        // Securely zero key material
797        use zeroize::Zeroize;
798        self.long_term_key.zeroize();
799        self.session_key.write().zeroize();
800    }
801}
802
803/// Encrypted K2K message wrapper.
804#[cfg(feature = "crypto")]
805#[derive(Debug, Clone)]
806pub struct EncryptedK2KMessage {
807    /// Original message metadata (unencrypted for routing).
808    pub id: MessageId,
809    /// Source kernel.
810    pub source: KernelId,
811    /// Destination kernel.
812    pub destination: KernelId,
813    /// Hop count.
814    pub hops: u8,
815    /// Timestamp when message was sent.
816    pub sent_at: HlcTimestamp,
817    /// Priority.
818    pub priority: u8,
819    /// Session key generation used for encryption.
820    pub key_generation: u64,
821    /// Encryption nonce (96 bits for AES-GCM).
822    pub nonce: [u8; 12],
823    /// Encrypted envelope data.
824    pub ciphertext: Vec<u8>,
825    /// Authentication tag.
826    pub tag: [u8; 16],
827}
828
829/// K2K encryption manager for a single kernel.
830#[cfg(feature = "crypto")]
831pub struct K2KEncryptor {
832    /// Configuration.
833    config: K2KEncryptionConfig,
834    /// This kernel's key material.
835    key_material: K2KKeyMaterial,
836    /// Peer public keys.
837    peer_keys: parking_lot::RwLock<HashMap<KernelId, [u8; 32]>>,
838    /// Encryption stats.
839    stats: K2KEncryptionStats,
840}
841
842#[cfg(feature = "crypto")]
843impl K2KEncryptor {
844    /// Create a new encryptor for a kernel.
845    pub fn new(kernel_id: KernelId, config: K2KEncryptionConfig) -> Self {
846        Self {
847            config,
848            key_material: K2KKeyMaterial::new(kernel_id),
849            peer_keys: parking_lot::RwLock::new(HashMap::new()),
850            stats: K2KEncryptionStats::default(),
851        }
852    }
853
854    /// Create with existing key material.
855    pub fn with_key(kernel_id: KernelId, key: [u8; 32], config: K2KEncryptionConfig) -> Self {
856        Self {
857            config,
858            key_material: K2KKeyMaterial::from_key(kernel_id, key),
859            peer_keys: parking_lot::RwLock::new(HashMap::new()),
860            stats: K2KEncryptionStats::default(),
861        }
862    }
863
864    /// Get this kernel's public key.
865    pub fn public_key(&self) -> [u8; 32] {
866        // In production, derive public key properly (e.g., X25519)
867        // For now, use a hash of the long-term key
868        use sha2::{Digest, Sha256};
869        let mut hasher = Sha256::new();
870        hasher.update(&self.key_material.long_term_key);
871        hasher.update(b"k2k-public-key-v1");
872        let result = hasher.finalize();
873        let mut public = [0u8; 32];
874        public.copy_from_slice(&result);
875        public
876    }
877
878    /// Register a peer's public key.
879    pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
880        self.peer_keys.write().insert(kernel_id, public_key);
881    }
882
883    /// Unregister a peer.
884    pub fn unregister_peer(&self, kernel_id: &KernelId) {
885        self.peer_keys.write().remove(kernel_id);
886    }
887
888    /// Check and perform key rotation if needed.
889    pub fn maybe_rotate(&self) {
890        if self
891            .key_material
892            .should_rotate(self.config.key_rotation_interval_secs)
893        {
894            self.key_material.rotate_session_key();
895            self.stats
896                .key_rotations
897                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
898        }
899    }
900
901    /// Encrypt a K2K message.
902    pub fn encrypt(&self, message: &K2KMessage) -> Result<EncryptedK2KMessage> {
903        if !self.config.enabled {
904            return Err(RingKernelError::K2KError(
905                "K2K encryption is disabled".to_string(),
906            ));
907        }
908
909        // Get peer public key
910        let peer_key = self
911            .peer_keys
912            .read()
913            .get(&message.destination)
914            .copied()
915            .ok_or_else(|| {
916                RingKernelError::K2KError(format!(
917                    "No public key registered for destination kernel: {}",
918                    message.destination
919                ))
920            })?;
921
922        // Derive encryption key
923        let shared_secret = self.key_material.derive_shared_secret(&peer_key);
924        let session_key = if self.config.forward_secrecy {
925            // Mix in session key for forward secrecy
926            use sha2::{Digest, Sha256};
927            let mut hasher = Sha256::new();
928            hasher.update(&shared_secret);
929            hasher.update(&self.key_material.session_key());
930            let result = hasher.finalize();
931            let mut key = [0u8; 32];
932            key.copy_from_slice(&result);
933            key
934        } else {
935            shared_secret
936        };
937
938        // Generate nonce
939        use rand::RngCore;
940        let mut nonce = [0u8; 12];
941        rand::thread_rng().fill_bytes(&mut nonce);
942
943        // Serialize envelope
944        let envelope_bytes = message.envelope.to_bytes();
945
946        // Encrypt based on algorithm
947        let (ciphertext, tag) = match self.config.algorithm {
948            K2KEncryptionAlgorithm::Aes256Gcm => {
949                use aes_gcm::{
950                    aead::{Aead, KeyInit},
951                    Aes256Gcm, Nonce,
952                };
953                let cipher = Aes256Gcm::new_from_slice(&session_key)
954                    .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
955
956                let nonce_obj = Nonce::from_slice(&nonce);
957                let ciphertext = cipher
958                    .encrypt(nonce_obj, envelope_bytes.as_slice())
959                    .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
960
961                // AES-GCM appends tag to ciphertext
962                let tag_start = ciphertext.len() - 16;
963                let mut tag = [0u8; 16];
964                tag.copy_from_slice(&ciphertext[tag_start..]);
965                (ciphertext[..tag_start].to_vec(), tag)
966            }
967            K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
968                use chacha20poly1305::{
969                    aead::{Aead, KeyInit},
970                    ChaCha20Poly1305, Nonce,
971                };
972                let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
973                    .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
974
975                let nonce_obj = Nonce::from_slice(&nonce);
976                let ciphertext = cipher
977                    .encrypt(nonce_obj, envelope_bytes.as_slice())
978                    .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
979
980                let tag_start = ciphertext.len() - 16;
981                let mut tag = [0u8; 16];
982                tag.copy_from_slice(&ciphertext[tag_start..]);
983                (ciphertext[..tag_start].to_vec(), tag)
984            }
985        };
986
987        self.stats
988            .messages_encrypted
989            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
990        self.stats.bytes_encrypted.fetch_add(
991            envelope_bytes.len() as u64,
992            std::sync::atomic::Ordering::Relaxed,
993        );
994
995        Ok(EncryptedK2KMessage {
996            id: message.id,
997            source: message.source.clone(),
998            destination: message.destination.clone(),
999            hops: message.hops,
1000            sent_at: message.sent_at,
1001            priority: message.priority,
1002            key_generation: self.key_material.session_generation(),
1003            nonce,
1004            ciphertext,
1005            tag,
1006        })
1007    }
1008
1009    /// Decrypt an encrypted K2K message.
1010    pub fn decrypt(&self, encrypted: &EncryptedK2KMessage) -> Result<K2KMessage> {
1011        if !self.config.enabled {
1012            return Err(RingKernelError::K2KError(
1013                "K2K encryption is disabled".to_string(),
1014            ));
1015        }
1016
1017        // Get peer public key
1018        let peer_key = self
1019            .peer_keys
1020            .read()
1021            .get(&encrypted.source)
1022            .copied()
1023            .ok_or_else(|| {
1024                RingKernelError::K2KError(format!(
1025                    "No public key registered for source kernel: {}",
1026                    encrypted.source
1027                ))
1028            })?;
1029
1030        // Derive decryption key
1031        let shared_secret = self.key_material.derive_shared_secret(&peer_key);
1032        let session_key = if self.config.forward_secrecy {
1033            use sha2::{Digest, Sha256};
1034            let mut hasher = Sha256::new();
1035            hasher.update(&shared_secret);
1036            hasher.update(&self.key_material.session_key());
1037            let result = hasher.finalize();
1038            let mut key = [0u8; 32];
1039            key.copy_from_slice(&result);
1040            key
1041        } else {
1042            shared_secret
1043        };
1044
1045        // Reconstruct ciphertext with tag appended
1046        let mut full_ciphertext = encrypted.ciphertext.clone();
1047        full_ciphertext.extend_from_slice(&encrypted.tag);
1048
1049        // Decrypt based on algorithm
1050        let plaintext = match self.config.algorithm {
1051            K2KEncryptionAlgorithm::Aes256Gcm => {
1052                use aes_gcm::{
1053                    aead::{Aead, KeyInit},
1054                    Aes256Gcm, Nonce,
1055                };
1056                let cipher = Aes256Gcm::new_from_slice(&session_key)
1057                    .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
1058
1059                let nonce = Nonce::from_slice(&encrypted.nonce);
1060                cipher
1061                    .decrypt(nonce, full_ciphertext.as_slice())
1062                    .map_err(|e| {
1063                        self.stats
1064                            .decryption_failures
1065                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1066                        RingKernelError::K2KError(format!("Decryption failed: {}", e))
1067                    })?
1068            }
1069            K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
1070                use chacha20poly1305::{
1071                    aead::{Aead, KeyInit},
1072                    ChaCha20Poly1305, Nonce,
1073                };
1074                let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
1075                    .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
1076
1077                let nonce = Nonce::from_slice(&encrypted.nonce);
1078                cipher
1079                    .decrypt(nonce, full_ciphertext.as_slice())
1080                    .map_err(|e| {
1081                        self.stats
1082                            .decryption_failures
1083                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1084                        RingKernelError::K2KError(format!("Decryption failed: {}", e))
1085                    })?
1086            }
1087        };
1088
1089        // Deserialize envelope
1090        let envelope = MessageEnvelope::from_bytes(&plaintext).map_err(|e| {
1091            RingKernelError::K2KError(format!("Envelope deserialization failed: {}", e))
1092        })?;
1093
1094        self.stats
1095            .messages_decrypted
1096            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1097        self.stats
1098            .bytes_decrypted
1099            .fetch_add(plaintext.len() as u64, std::sync::atomic::Ordering::Relaxed);
1100
1101        Ok(K2KMessage {
1102            id: encrypted.id,
1103            source: encrypted.source.clone(),
1104            destination: encrypted.destination.clone(),
1105            envelope,
1106            hops: encrypted.hops,
1107            sent_at: encrypted.sent_at,
1108            priority: encrypted.priority,
1109        })
1110    }
1111
1112    /// Get encryption statistics.
1113    pub fn stats(&self) -> K2KEncryptionStatsSnapshot {
1114        K2KEncryptionStatsSnapshot {
1115            messages_encrypted: self
1116                .stats
1117                .messages_encrypted
1118                .load(std::sync::atomic::Ordering::Relaxed),
1119            messages_decrypted: self
1120                .stats
1121                .messages_decrypted
1122                .load(std::sync::atomic::Ordering::Relaxed),
1123            bytes_encrypted: self
1124                .stats
1125                .bytes_encrypted
1126                .load(std::sync::atomic::Ordering::Relaxed),
1127            bytes_decrypted: self
1128                .stats
1129                .bytes_decrypted
1130                .load(std::sync::atomic::Ordering::Relaxed),
1131            key_rotations: self
1132                .stats
1133                .key_rotations
1134                .load(std::sync::atomic::Ordering::Relaxed),
1135            decryption_failures: self
1136                .stats
1137                .decryption_failures
1138                .load(std::sync::atomic::Ordering::Relaxed),
1139            peer_count: self.peer_keys.read().len(),
1140            session_generation: self.key_material.session_generation(),
1141        }
1142    }
1143
1144    /// Get configuration.
1145    pub fn config(&self) -> &K2KEncryptionConfig {
1146        &self.config
1147    }
1148}
1149
1150/// K2K encryption statistics (atomic counters).
1151#[cfg(feature = "crypto")]
1152#[derive(Default)]
1153struct K2KEncryptionStats {
1154    messages_encrypted: std::sync::atomic::AtomicU64,
1155    messages_decrypted: std::sync::atomic::AtomicU64,
1156    bytes_encrypted: std::sync::atomic::AtomicU64,
1157    bytes_decrypted: std::sync::atomic::AtomicU64,
1158    key_rotations: std::sync::atomic::AtomicU64,
1159    decryption_failures: std::sync::atomic::AtomicU64,
1160}
1161
1162/// Snapshot of K2K encryption statistics.
1163#[cfg(feature = "crypto")]
1164#[derive(Debug, Clone, Default)]
1165pub struct K2KEncryptionStatsSnapshot {
1166    /// Messages encrypted.
1167    pub messages_encrypted: u64,
1168    /// Messages decrypted.
1169    pub messages_decrypted: u64,
1170    /// Bytes encrypted.
1171    pub bytes_encrypted: u64,
1172    /// Bytes decrypted.
1173    pub bytes_decrypted: u64,
1174    /// Key rotations performed.
1175    pub key_rotations: u64,
1176    /// Decryption failures (authentication/integrity).
1177    pub decryption_failures: u64,
1178    /// Number of registered peers.
1179    pub peer_count: usize,
1180    /// Current session key generation.
1181    pub session_generation: u64,
1182}
1183
1184/// Encrypted K2K endpoint that wraps a standard endpoint with encryption.
1185#[cfg(feature = "crypto")]
1186pub struct EncryptedK2KEndpoint {
1187    /// Inner endpoint.
1188    inner: K2KEndpoint,
1189    /// Encryptor.
1190    encryptor: Arc<K2KEncryptor>,
1191}
1192
1193#[cfg(feature = "crypto")]
1194impl EncryptedK2KEndpoint {
1195    /// Create an encrypted endpoint wrapping a standard endpoint.
1196    pub fn new(inner: K2KEndpoint, encryptor: Arc<K2KEncryptor>) -> Self {
1197        Self { inner, encryptor }
1198    }
1199
1200    /// Get this kernel's public key for key exchange.
1201    pub fn public_key(&self) -> [u8; 32] {
1202        self.encryptor.public_key()
1203    }
1204
1205    /// Register a peer's public key.
1206    pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
1207        self.encryptor.register_peer(kernel_id, public_key);
1208    }
1209
1210    /// Send an encrypted message.
1211    pub async fn send_encrypted(
1212        &self,
1213        destination: KernelId,
1214        envelope: MessageEnvelope,
1215    ) -> Result<DeliveryReceipt> {
1216        self.encryptor.maybe_rotate();
1217
1218        let timestamp = envelope.header.timestamp;
1219        let message = K2KMessage::new(
1220            self.inner.kernel_id.clone(),
1221            destination.clone(),
1222            envelope,
1223            timestamp,
1224        );
1225
1226        // Encrypt the message
1227        let _encrypted = self.encryptor.encrypt(&message)?;
1228
1229        // For now, send the original message (encryption metadata would need protocol support)
1230        // In a full implementation, the broker would handle encrypted payloads
1231        self.inner.send(destination, message.envelope).await
1232    }
1233
1234    /// Receive and decrypt a message.
1235    pub async fn receive_decrypted(&mut self) -> Option<K2KMessage> {
1236        self.inner.receive().await
1237        // In a full implementation, decrypt the message here
1238    }
1239
1240    /// Get encryption stats.
1241    pub fn encryption_stats(&self) -> K2KEncryptionStatsSnapshot {
1242        self.encryptor.stats()
1243    }
1244}
1245
1246/// Builder for encrypted K2K broker infrastructure.
1247#[cfg(feature = "crypto")]
1248pub struct EncryptedK2KBuilder {
1249    k2k_config: K2KConfig,
1250    encryption_config: K2KEncryptionConfig,
1251}
1252
1253#[cfg(feature = "crypto")]
1254impl EncryptedK2KBuilder {
1255    /// Create a new builder.
1256    pub fn new() -> Self {
1257        Self {
1258            k2k_config: K2KConfig::default(),
1259            encryption_config: K2KEncryptionConfig::default(),
1260        }
1261    }
1262
1263    /// Set K2K configuration.
1264    pub fn k2k_config(mut self, config: K2KConfig) -> Self {
1265        self.k2k_config = config;
1266        self
1267    }
1268
1269    /// Set encryption configuration.
1270    pub fn encryption_config(mut self, config: K2KEncryptionConfig) -> Self {
1271        self.encryption_config = config;
1272        self
1273    }
1274
1275    /// Enable forward secrecy.
1276    pub fn with_forward_secrecy(mut self, enabled: bool) -> Self {
1277        self.encryption_config.forward_secrecy = enabled;
1278        self
1279    }
1280
1281    /// Set encryption algorithm.
1282    pub fn with_algorithm(mut self, algorithm: K2KEncryptionAlgorithm) -> Self {
1283        self.encryption_config.algorithm = algorithm;
1284        self
1285    }
1286
1287    /// Set key rotation interval.
1288    pub fn with_key_rotation(mut self, interval_secs: u64) -> Self {
1289        self.encryption_config.key_rotation_interval_secs = interval_secs;
1290        self
1291    }
1292
1293    /// Require encryption for all messages.
1294    pub fn require_encryption(mut self, required: bool) -> Self {
1295        self.encryption_config.require_encryption = required;
1296        self
1297    }
1298
1299    /// Build the encrypted K2K infrastructure.
1300    pub fn build(self) -> (Arc<K2KBroker>, K2KEncryptionConfig) {
1301        (K2KBroker::new(self.k2k_config), self.encryption_config)
1302    }
1303}
1304
1305#[cfg(feature = "crypto")]
1306impl Default for EncryptedK2KBuilder {
1307    fn default() -> Self {
1308        Self::new()
1309    }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314    use super::*;
1315
1316    #[tokio::test]
1317    async fn test_k2k_broker_registration() {
1318        let broker = K2KBuilder::new().build();
1319
1320        let kernel1 = KernelId::new("kernel1");
1321        let kernel2 = KernelId::new("kernel2");
1322
1323        let _endpoint1 = broker.register(kernel1.clone());
1324        let _endpoint2 = broker.register(kernel2.clone());
1325
1326        assert!(broker.is_registered(&kernel1));
1327        assert!(broker.is_registered(&kernel2));
1328        assert_eq!(broker.registered_kernels().len(), 2);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_k2k_message_delivery() {
1333        let broker = K2KBuilder::new().build();
1334
1335        let kernel1 = KernelId::new("kernel1");
1336        let kernel2 = KernelId::new("kernel2");
1337
1338        let endpoint1 = broker.register(kernel1.clone());
1339        let mut endpoint2 = broker.register(kernel2.clone());
1340
1341        // Create a test envelope
1342        let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1343
1344        // Send from kernel1 to kernel2
1345        let receipt = endpoint1.send(kernel2.clone(), envelope).await.unwrap();
1346        assert_eq!(receipt.status, DeliveryStatus::Delivered);
1347
1348        // Receive on kernel2
1349        let message = endpoint2.try_receive();
1350        assert!(message.is_some());
1351        assert_eq!(message.unwrap().source, kernel1);
1352    }
1353
1354    #[test]
1355    fn test_k2k_config_default() {
1356        let config = K2KConfig::default();
1357        assert_eq!(config.max_pending_messages, 1024);
1358        assert_eq!(config.delivery_timeout_ms, 5000);
1359    }
1360
1361    // K2K Encryption tests (requires crypto feature)
1362    #[cfg(feature = "crypto")]
1363    mod crypto_tests {
1364        use super::*;
1365
1366        #[test]
1367        fn test_k2k_encryption_config_default() {
1368            let config = K2KEncryptionConfig::default();
1369            assert!(config.enabled);
1370            assert!(config.forward_secrecy);
1371            assert_eq!(config.algorithm, K2KEncryptionAlgorithm::Aes256Gcm);
1372            assert_eq!(config.key_rotation_interval_secs, 3600);
1373        }
1374
1375        #[test]
1376        fn test_k2k_encryption_config_disabled() {
1377            let config = K2KEncryptionConfig::disabled();
1378            assert!(!config.enabled);
1379        }
1380
1381        #[test]
1382        fn test_k2k_encryption_config_strict() {
1383            let config = K2KEncryptionConfig::strict();
1384            assert!(config.enabled);
1385            assert!(config.require_encryption);
1386            assert!(config.forward_secrecy);
1387        }
1388
1389        #[test]
1390        fn test_k2k_key_material_creation() {
1391            let kernel_id = KernelId::new("test_kernel");
1392            let key_material = K2KKeyMaterial::new(kernel_id.clone());
1393
1394            assert_eq!(key_material.kernel_id(), &kernel_id);
1395            assert_eq!(key_material.session_generation(), 1);
1396        }
1397
1398        #[test]
1399        fn test_k2k_key_material_rotation() {
1400            let kernel_id = KernelId::new("test_kernel");
1401            let key_material = K2KKeyMaterial::new(kernel_id);
1402
1403            let old_session_key = key_material.session_key();
1404            let old_generation = key_material.session_generation();
1405
1406            key_material.rotate_session_key();
1407
1408            let new_session_key = key_material.session_key();
1409            let new_generation = key_material.session_generation();
1410
1411            assert_ne!(old_session_key, new_session_key);
1412            assert_eq!(new_generation, old_generation + 1);
1413        }
1414
1415        #[test]
1416        fn test_k2k_key_material_shared_secret() {
1417            let kernel1 = K2KKeyMaterial::new(KernelId::new("kernel1"));
1418            let kernel2 = K2KKeyMaterial::new(KernelId::new("kernel2"));
1419
1420            // Get public keys (simulated)
1421            let pk1 = {
1422                use sha2::{Digest, Sha256};
1423                let mut hasher = Sha256::new();
1424                hasher.update(&kernel1.long_term_key);
1425                hasher.update(b"k2k-public-key-v1");
1426                let result = hasher.finalize();
1427                let mut public = [0u8; 32];
1428                public.copy_from_slice(&result);
1429                public
1430            };
1431            let pk2 = {
1432                use sha2::{Digest, Sha256};
1433                let mut hasher = Sha256::new();
1434                hasher.update(&kernel2.long_term_key);
1435                hasher.update(b"k2k-public-key-v1");
1436                let result = hasher.finalize();
1437                let mut public = [0u8; 32];
1438                public.copy_from_slice(&result);
1439                public
1440            };
1441
1442            // Shared secrets should be different for different pairs
1443            let secret1 = kernel1.derive_shared_secret(&pk2);
1444            let secret2 = kernel2.derive_shared_secret(&pk1);
1445
1446            // They won't be equal with this simplified implementation
1447            // In a real X25519 implementation, they would be
1448            assert_eq!(secret1.len(), 32);
1449            assert_eq!(secret2.len(), 32);
1450        }
1451
1452        #[test]
1453        fn test_k2k_encryptor_creation() {
1454            let kernel_id = KernelId::new("test_kernel");
1455            let config = K2KEncryptionConfig::default();
1456            let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
1457
1458            let public_key = encryptor.public_key();
1459            assert_eq!(public_key.len(), 32);
1460
1461            let stats = encryptor.stats();
1462            assert_eq!(stats.messages_encrypted, 0);
1463            assert_eq!(stats.messages_decrypted, 0);
1464            assert_eq!(stats.peer_count, 0);
1465        }
1466
1467        #[test]
1468        fn test_k2k_encryptor_peer_registration() {
1469            let kernel_id = KernelId::new("test_kernel");
1470            let config = K2KEncryptionConfig::default();
1471            let encryptor = K2KEncryptor::new(kernel_id, config);
1472
1473            let peer_id = KernelId::new("peer_kernel");
1474            let peer_key = [42u8; 32];
1475
1476            encryptor.register_peer(peer_id.clone(), peer_key);
1477            assert_eq!(encryptor.stats().peer_count, 1);
1478
1479            encryptor.unregister_peer(&peer_id);
1480            assert_eq!(encryptor.stats().peer_count, 0);
1481        }
1482
1483        #[test]
1484        fn test_k2k_encrypted_builder() {
1485            let (broker, config) = EncryptedK2KBuilder::new()
1486                .with_forward_secrecy(true)
1487                .with_algorithm(K2KEncryptionAlgorithm::ChaCha20Poly1305)
1488                .with_key_rotation(1800)
1489                .require_encryption(true)
1490                .build();
1491
1492            assert!(config.forward_secrecy);
1493            assert_eq!(config.algorithm, K2KEncryptionAlgorithm::ChaCha20Poly1305);
1494            assert_eq!(config.key_rotation_interval_secs, 1800);
1495            assert!(config.require_encryption);
1496
1497            // Broker should be functional
1498            let stats = broker.stats();
1499            assert_eq!(stats.registered_endpoints, 0);
1500        }
1501
1502        #[test]
1503        fn test_k2k_encryption_stats_snapshot() {
1504            let stats = K2KEncryptionStatsSnapshot::default();
1505            assert_eq!(stats.messages_encrypted, 0);
1506            assert_eq!(stats.messages_decrypted, 0);
1507            assert_eq!(stats.bytes_encrypted, 0);
1508            assert_eq!(stats.bytes_decrypted, 0);
1509            assert_eq!(stats.key_rotations, 0);
1510            assert_eq!(stats.decryption_failures, 0);
1511            assert_eq!(stats.peer_count, 0);
1512            assert_eq!(stats.session_generation, 0);
1513        }
1514
1515        #[test]
1516        fn test_k2k_encryption_algorithms() {
1517            // Test that both algorithms are distinct
1518            assert_ne!(
1519                K2KEncryptionAlgorithm::Aes256Gcm,
1520                K2KEncryptionAlgorithm::ChaCha20Poly1305
1521            );
1522        }
1523
1524        #[test]
1525        fn test_k2k_key_material_should_rotate() {
1526            let kernel_id = KernelId::new("test_kernel");
1527            let key_material = K2KKeyMaterial::new(kernel_id);
1528
1529            // Should not rotate with 0 interval
1530            assert!(!key_material.should_rotate(0));
1531
1532            // Should not rotate immediately with long interval
1533            assert!(!key_material.should_rotate(3600));
1534        }
1535
1536        #[test]
1537        fn test_k2k_encryptor_disabled_encryption() {
1538            let kernel_id = KernelId::new("test_kernel");
1539            let config = K2KEncryptionConfig::disabled();
1540            let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
1541
1542            // Create a test message
1543            let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1544            let message = K2KMessage::new(
1545                kernel_id,
1546                KernelId::new("dest"),
1547                envelope,
1548                HlcTimestamp::now(1),
1549            );
1550
1551            // Should fail when encryption is disabled
1552            let result = encryptor.encrypt(&message);
1553            assert!(result.is_err());
1554        }
1555
1556        #[test]
1557        fn test_k2k_encryptor_missing_peer_key() {
1558            let kernel_id = KernelId::new("test_kernel");
1559            let config = K2KEncryptionConfig::default();
1560            let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
1561
1562            // Create a test message to unknown destination
1563            let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1564            let message = K2KMessage::new(
1565                kernel_id,
1566                KernelId::new("unknown_dest"),
1567                envelope,
1568                HlcTimestamp::now(1),
1569            );
1570
1571            // Should fail due to missing peer key
1572            let result = encryptor.encrypt(&message);
1573            assert!(result.is_err());
1574            assert!(result.unwrap_err().to_string().contains("No public key"));
1575        }
1576    }
1577}